In [1]:
# Importing common libraries and variables
%run "./init.ipynb"

# Creating tables

The actual creation is done through CLI & `schema_creator.sql`

In [None]:
with open("../../../secrets/secrets.json") as f:
    secrets = json.load(f)
    
with psycopg.connect(CONNECTION_STRING) as conn:
    res = conn.execute("""
        SELECT
        	i.table_name as name,
        	t.tableowner as table_owner,
        	i.is_insertable_into as is_insertable,
        	coalesce(t.tablespace, 'pg_default') as "tablespace"
        FROM 
        	information_schema.tables i 
        	left join pg_tables t on i.table_name = t.tablename
        where i.table_schema = 'public'
        order by i.table_name;
    """)
res

In [None]:
head = [tuple(i.name for i in res.description)]
content = res.fetchall()
head.extend(content)
query_result = head

In [None]:
print(
    tabulate(query_result[1:], headers=query_result[0], tablefmt="pipe")
)

In [None]:
logger.info(
    "Tables have been created.\n" +
    tabulate(query_result[1:], headers=query_result[0], tablefmt="pipe")
)

# Bitcoin data

Twelve data failed at test-time due to restrictions of the free plan

In [None]:
# import
os.environ.update({"KAGGLEHUB_CACHE":"/Users/Misha/Documents/python_projects/data_analysis/bitcoin_analysis/temp/"})
dir_ = Path(kagglehub.dataset_download("mczielinski/bitcoin-historical-data"))
file_name = os.listdir(dir_)[0]
path = dir_ / file_name

print(f"Path to dataset: {path}")

In [None]:
df = pd.read_csv(path)

In [None]:
try:
    assert bool((df >= 0).all(axis=None)) is True, "Some entries are negative"    
    assert bool(df.notna().any(axis="columns").all()) is True, "Some rows are completelly NA"
except AssertionError as err:
    logger.critical(f"Validation of the Bitcon data failed: {err}")
else:
    logger.info(f"Validation of the Bitoin data is successful")

In [None]:
df["Timestamp"] = df["Timestamp"].map(datetime.fromtimestamp)
df.rename(columns=str.lower, inplace=True)
df.set_index("timestamp", inplace=True)

df.drop(columns="volume",inplace=True)

df = df.resample("1h").agg({
    "open":"first",
    "high":"max",
    "low":"min",
    "close":"last"
})
df

In [None]:
df = df.loc[:datetime.fromisoformat("2025-07-01 23:00:00"),:]
df = df.reset_index()
df["timestamp"] = "'" + df["timestamp"].astype(str) + "'"
df.fillna("NULL", inplace=True)
df

In [None]:
dry_insert_into_db(
    CONNECTION_STRING,
    "bitcoin_ohlc",
    df.to_numpy()
)

In [None]:
insert_into_db(
    CONNECTION_STRING,
    "bitcoin_ohlc",
    df.to_numpy()
)

In [None]:
print(describe_table(CONNECTION_STRING, "bitcoin_ohlc", "ts"))

In [None]:
logger.info(
    f"Bitcoin OHLC successfully acquired from Kaggle, transformed and loaded\n"
    f"{describe_table(CONNECTION_STRING, "bitcoin_ohlc", "ts")}"
)

# NASDAQ, S&P 500, Dow Jones data

In [None]:
df = yf.download(
    tickers=["^IXIC", "^GSPC", "^DJI"],
    interval="1d",
    start="2010-01-01",
    end="2025-08-01",
    auto_adjust=False,
    keepna=True
)
df.head(3)

In [None]:
df = (df
    .drop(columns=["Adj Close","Volume"])
    .reindex(columns=["Open","High","Low","Close"],level=0)
    .swaplevel(0,1,axis="columns")
)
df.head(3)

In [None]:
assets = dict()
for asset in ["^IXIC", "^GSPC", "^DJI"]: # nasdaq, snp, dow jones
    assets.update({asset:df[asset]})

In [None]:
for asset in ["^IXIC", "^GSPC", "^DJI"]:
    assets[asset] = (assets[asset]
        .rename(columns=str.lower)
        .reindex(columns=["open","high","low","close"])
        )

In [None]:
for asset in ["^IXIC", "^GSPC", "^DJI"]:
    frame = assets[asset].copy()
    
    frame = frame.reset_index()
    frame = frame.astype(str)
    frame.fillna("NULL", inplace=True)
    frame["Date"] = "'" + frame["Date"].astype(str) + "'"
    
    assets[asset] = frame.copy()

In [None]:
for asset, data in assets.items():
    match asset:
        case "^IXIC":
            table = "nasdaq_ohlc"
        case "^GSPC":
            table = "snp_ohlc"
        case "^DJI":
            table = "dow_jones_ohlc"
        case x:
            raise ValueError(f"{x} is invalid.")
        
    dry_insert_into_db(
        CONNECTION_STRING,
        table,
        data.to_numpy()
    )
    print("\n")

In [None]:
for asset, data in assets.items():
    match asset:
        case "^IXIC":
            table = "nasdaq_ohlc"
        case "^GSPC":
            table = "snp_ohlc"
        case "^DJI":
            table = "dow_jones_ohlc"
        case x:
            raise ValueError(f"{x} is invalid.")
            
    insert_into_db(
        CONNECTION_STRING,
        table,
        data.to_numpy()
    )
    
    logger.info(
        f"{table} successfully acquired from Yahoo! finance, transformed and loaded\n"
        f"{describe_table(CONNECTION_STRING, table, "ts")}"
    )

# Crude oil

In [None]:
logger.error("Due to missing OHLC data for oil, `oil_ohlc` table has been droped and replaced with `oil_price`")

In [None]:
with open("../../../temp/oil_daily_price.csv") as f:
    for i in range(5):
        f.readline()
    df = pd.read_csv(
        f, 
        header=None,
        index_col=0,
        parse_dates=True
    )
df.index.name = None
srs = df.squeeze()
srs = srs.sort_index()
srs = srs.loc[datetime.fromisoformat("2010-01-01"):datetime.fromisoformat("2025-08-01")]
srs

In [None]:
srs = srs.reset_index().astype(str)
srs["index"] = "'" + srs["index"] + "'"
srs

In [None]:
dry_insert_into_db(
    CONNECTION_STRING,
    "oil_price",
    srs.to_numpy()
)

In [None]:
insert_into_db(
    CONNECTION_STRING,
    "oil_price",
    srs.to_numpy()
)

In [None]:
logger.info(
        f"Oil price successfully acquired from EIA, transformed and loaded\n"
        f"{describe_table(CONNECTION_STRING, "oil_price", "dt")}"
    )

# Gold

In [10]:
df = yf.download(
    tickers="GC=F",
    interval="1d",
    start="2010-01-01",
    end="2025-08-01",
    auto_adjust=False,
    keepna=True
)
df.head(3)

[*********************100%***********************]  1 of 1 completed


Price,Adj Close,Close,High,Low,Open,Volume
Ticker,GC=F,GC=F,GC=F,GC=F,GC=F,GC=F
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
2010-01-04,1117.7,1117.7,1122.3,1097.1,1117.7,184
2010-01-05,1118.1,1118.1,1126.5,1115.0,1118.1,53
2010-01-06,1135.9,1135.9,1139.2,1120.7,1135.9,363


In [11]:
df = (df
    .droplevel(1,axis="columns")
    .drop(columns=["Adj Close","Volume"])
    .reindex(columns=["Open","High","Low","Close"])
    .rename(columns=str.lower)
)
df.head(3)

Price,open,high,low,close
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2010-01-04,1117.7,1122.3,1097.1,1117.7
2010-01-05,1118.1,1126.5,1115.0,1118.1
2010-01-06,1135.9,1139.2,1120.7,1135.9


In [14]:
df = df.reset_index()
df.fillna("NULL", inplace=True)
df = df.astype(str)
df["Date"] = "'" + df["Date"].astype(str) + "'"
df.head(3)

  df.fillna("NULL", inplace=True)


Price,Date,open,high,low,close
0,'2010-01-04',1117.699951171875,1122.300048828125,1097.0999755859375,1117.699951171875
1,'2010-01-05',1118.0999755859375,1126.5,1115.0,1118.0999755859375
2,'2010-01-06',1135.9000244140625,1139.199951171875,1120.699951171875,1135.9000244140625


In [15]:
dry_insert_into_db(
    CONNECTION_STRING,
    "gold_ohlc",
    df.to_numpy()
)

Insert values into gold_ohlc:
| dt           | open               | high              | low                | close              |
|:-------------|:-------------------|:------------------|:-------------------|:-------------------|
| '2010-01-04' | 1117.699951171875  | 1122.300048828125 | 1097.0999755859375 | 1117.699951171875  |
| '2010-01-05' | 1118.0999755859375 | 1126.5            | 1115.0             | 1118.0999755859375 |
| '2010-01-06' | 1135.9000244140625 | 1139.199951171875 | 1120.699951171875  | 1135.9000244140625 |
| ...          | ...                | ...               | ...                | ...                |

Insertion shape is valid.


In [16]:
insert_into_db(
    CONNECTION_STRING,
    "gold_ohlc",
    df.to_numpy()
)

In [17]:
logger.info(
        f"Gold OHLC successfully acquired from Yahoo! Finanace, transformed and loaded\n"
        f"{describe_table(CONNECTION_STRING, "gold_ohlc", "dt")}"
    )

Logged INFO in /phase1/preprocessing/simple


# CPI

In [42]:
df = pd.read_csv(
    "../../../temp/CPI.csv",
    parse_dates=True
)
df = df.sort_values("observation_date")
df

Unnamed: 0,observation_date,CPIAUCSL
0,2010-01-01,217.4880
1,2010-02-01,217.2810
2,2010-03-01,217.3530
3,2010-04-01,217.4030
4,2010-05-01,217.2900
...,...,...
183,2025-04-01,320.3210
184,2025-05-01,320.5800
185,2025-06-01,321.5000
186,2025-07-01,322.1320


In [43]:
df["CPIAUCSL"] = df["CPIAUCSL"] / df.iloc[0,1]

In [44]:
df = df.astype(str)
df["observation_date"] = "'" + df["observation_date"] + "'"
df

Unnamed: 0,observation_date,CPIAUCSL
0,'2010-01-01',1.0
1,'2010-02-01',0.9990482233502538
2,'2010-03-01',0.9993792760979917
3,'2010-04-01',0.9996091738394761
4,'2010-05-01',0.999089604943721
...,...,...
183,'2025-04-01',1.4728214890016922
184,'2025-05-01',1.4740123593025822
185,'2025-06-01',1.4782424777458987
186,'2025-07-01',1.4811483851982639


In [45]:
dry_insert_into_db(
    CONNECTION_STRING,
    "cpi",
    df.to_numpy()
)

Insert values into cpi:
| dt           | value              |
|:-------------|:-------------------|
| '2010-01-01' | 1.0                |
| '2010-02-01' | 0.9990482233502538 |
| '2010-03-01' | 0.9993792760979917 |
| ...          | ...                |

Insertion shape is valid.


In [46]:
insert_into_db(
    CONNECTION_STRING,
    "cpi",
    df.to_numpy()
)

In [47]:
logger.info(
        f"CPI successfully acquired, transformed and loaded\n"
        f"{describe_table(CONNECTION_STRING, "cpi", "dt")}"
    )

Logged INFO in /phase1/preprocessing/simple


In [48]:
logger.info(f"Data for phase 1, successfully aqcuired, transformed and loaded")

Logged INFO in /phase1/preprocessing/simple
