In [6]:
import requests
import pandas as pd
from bs4 import BeautifulSoup
from dags.module.object_client import MinioClient
from minio import Minio
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import json
import time
from lxml import html

In [49]:
import psycopg2

conn = psycopg2.connect(
    host="localhost",
    database="userdb",
    user="username",
    password="password",
    port=5439
)

cursor = conn.cursor()

# Fact table queries

fact_table_sql = '''
    CREATE TABLE IF NOT EXISTS fact_stock_data(
        id SERIAL PRIMARY KEY,
        symbol VARCHAR(30) NOT NULL UNIQUE,
        info_id VARCHAR(30) NOT NULL,
        type VARCHAR(30) NOT NULL,
        description VARCHAR(1000),
        status VARCHAR(30)
    )
'''

# Dim tables

# dim_info_corpo_sql = '''
#     CREATE TABLE IF NOT EXISTS dim_info_corpo(
#         info_id VARCHAR(30) NOT NULL PRIMARY KEY,
#         sector VARCHAR(50),
#         industry VARCHAR(50)
#     );
# '''

dim_info_stock_sql = '''
    CREATE TABLE IF NOT EXISTS dim_info_stock(
        info_id VARCHAR(20) NOT NULL PRIMARY KEY,
        category VARCHAR(50),
        fund_family VARCHAR(100),
        net_assets VARCHAR(10),
        legal_type VARCHAR(50),
        sector VARCHAR(50),
        industry VARCHAR(50)
    );
'''

dim_hist_data_sql = '''
    CREATE TABLE IF NOT EXISTS dim_hist_data(
        record_id SERIAL,
        "Date" DATE NOT NULL,
        "Open" DECIMAL(8,1),
        "High" DECIMAL(8,1),
        "Low" DECIMAL(8,1),
        "Close" DECIMAL(8,1),
        "Adj Close" DECIMAL(8,1),
        "Volume" DECIMAL(20,0),
        "Symbol" VARCHAR(30) NOT NULL,
        PRIMARY KEY ("Symbol", "Date")
    );
'''

fk_setup_sql = '''
    ALTER TABLE dim_hist_data ADD CONSTRAINT fk_fact_stock_data FOREIGN KEY ("Symbol") REFERENCES fact_stock_data(symbol);
    ALTER TABLE fact_stock_data ADD CONSTRAINT fk_dim_info_stock FOREIGN KEY (info_id) REFERENCES dim_info_stock(info_id);
'''

cursor.execute(fact_table_sql)
cursor.execute(dim_info_stock_sql)
cursor.execute(dim_hist_data_sql)
cursor.execute(fk_setup_sql)

index_sql = """
    CREATE UNIQUE INDEX index_stock_symbol on fact_stock_data(symbol);
    CREATE UNIQUE INDEX index_stock_info_id on fact_stock_data(info_id);  
"""

cursor.execute(f'''
                CREATE TABLE IF NOT EXISTS temp_dim_hist_data(
                record_id SERIAL,
                "Date" DATE NOT NULL,
                "Open" DECIMAL(8,1),
                "High" DECIMAL(8,1),
                "Low" DECIMAL(8,1),
                "Close" DECIMAL(8,1),
                "Adj Close" DECIMAL(8,1),
                "Volume" DECIMAL(20,0),
                "Symbol" VARCHAR(30) NOT NULL,
                PRIMARY KEY ("Symbol", "Date")
                );
                ''')
cursor.execute(index_sql)
conn.commit()

In [84]:
conn.rollback()
cursor.execute("Alter table dim_info_stock alter column category type varchar(50), alter column fund_family type varchar(50);")
conn.commit()

In [38]:
import requests
from dotenv import dotenv_values
from lxml import html
import json

config = dotenv_values("dags/.env")

s = requests.Session()

headers = json.loads(config['header_csv'])
url = f'http://localhost:9001/browser/hist-data/A.csv'

response = s.get(url, headers=headers)
tree = html.fromstring(response.content)
data = tree.xpath(
                '/html/body/div/div')
html.tostring(data[0])
# data
# response.content

b'<div id="preload"><img src="./images/background.svg"> <img src="./images/background-wave-orig2.svg"></div>'

In [39]:
with open("dags/error_log_extract/error_log.txt", "r") as f:
    error_symbols = f.readlines()
    first_error_symbol = error_symbols[-1]
    print(first_error_symbol)    

FSNB-U



In [74]:
import pandas as pd
from dags.module.object_client import MinioClient

minio_client = MinioClient("localhost", 9000, "minio", "minio123")
csv_content = minio_client.get_object(
                    bucket_name="hist-data", obj_name="A.csv")
df = pd.read_csv(csv_content)
df["Symbol"] = 'A'
batch_df = df.iloc[:1000]
rows = [tuple(x) for x in batch_df.values.tolist()]

In [79]:
args_str = ','.join(cursor.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in rows)
print(args_str[-1])

)


In [52]:
df.columns

Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume',
       'Symbol'],
      dtype='object')

In [68]:
cursor.execute("TRUNCATE dim_hist_data")
conn.commit()

In [69]:
conn = psycopg2.connect(
    host="localhost",
    database="userdb",
    user="username",
    password="password",
    port=5439
)
cursor = conn.cursor()
upsert_query = '''
                    INSERT INTO dim_hist_data ("Date", "Open", "High", "Low", "Close", "Adj Close", "Volume", "Symbol")
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT ("Symbol", "Date")
                    DO UPDATE SET
                    "Open" = EXCLUDED."Open",
                    "High" = EXCLUDED."High",
                    "Low" = EXCLUDED."Low",
                    "Close" = EXCLUDED."Close",
                    "Adj Close" = EXCLUDED."Adj Close",
                    "Volume" = EXCLUDED."Volume"
                '''
cursor.executemany(upsert_query, rows)
conn.commit()

In [38]:
conn.rollback()
cursor.execute("SELECT count(*) FROM public.fact_stock_data")
result = cursor.fetchall()
result

[(0,)]

In [65]:
for index, row in batch_df.iterrows():
    upsert_query = '''
            INSERT INTO dim_hist_data ("Date", "Open", "High", "Low", "Close", "Adj Close", "Volume", "Symbol")
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT ("Symbol", "Date")
            DO UPDATE SET
            "Open" = EXCLUDED."Open",
            "High" = EXCLUDED."High",
            "Low" = EXCLUDED."Low",
            "Close" = EXCLUDED."Close",
            "Adj Close" = EXCLUDED."Adj Close",
            "Volume" = EXCLUDED."Volume"    
            '''
    cursor.execute(upsert_query, tuple(row))
    conn.commit()
# Case encountering error again, rollback then log error and continue

In [1]:

import requests
from dotenv import dotenv_values
import json
from datetime import datetime, timedelta, timezone
import random

config = dotenv_values("dags/.env")
header_csv = json.loads(config['header_csv'])
midnight = datetime.now(timezone.utc).replace(
    hour=0, minute=0, second=0, microsecond=0)
a_week_ago = midnight - timedelta(days=7)
current_day = int(midnight.timestamp())
a_week_ago = int(a_week_ago.timestamp())



In [2]:
proxies={
        "http": "http://jfbtyuwd-rotate:ngbp08vig3b8@p.webshare.io:80/",
        "https": "http://jfbtyuwd-rotate:ngbp08vig3b8@p.webshare.io:80/"
    }
response = requests.get(f"https://query2.finance.yahoo.com/v7/finance/download/A?period1={a_week_ago}&period2={current_day}&interval=1d&events=history&includeAdjustedClose=true", headers=header_csv)
response.status_code

200

In [48]:
response.content

b'Date,Open,High,Low,Close,Adj Close,Volume\n2024-08-09,136.839996,137.860001,136.100006,136.899994,136.899994,737600\n2024-08-12,136.809998,136.809998,134.479996,135.600006,135.600006,1347200\n2024-08-13,135.830002,137.699997,135.330002,137.419998,137.419998,1158000\n2024-08-14,137.929993,138.690002,135.880005,136.029999,136.029999,1090000\n2024-08-15,137.660004,139.990005,137.660004,139.889999,139.889999,1120300'

In [1]:
from dags.module.object_client import MinioClient

minio_client = MinioClient('localhost', 9000, "minio", "minio123")

In [2]:
list_filenames = minio_client.list_objects("hist-data")

In [3]:
import os
os.cpu_count()

8

In [2]:
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
print(executor._max_workers)

12


In [11]:
import requests
from dotenv import dotenv_values
import json

config = dotenv_values("dags/.env")
header_html = json.loads(config['header_html'])
username = config['proxy_username']
password = config['proxy_password']

proxies = {
        "http": f"http://{username}:{password}@p.webshare.io:80/",
        "https": f"http://{username}:{password}@p.webshare.io:80/"
    }

requests.get("https://finance.yahoo.com/quote/AA/profile", headers=header_html).content.decode('utf-8')

