In [1]:
%pip install pandas

Note: you may need to restart the kernel to use updated packages.


In [2]:
%pip install pyarrow

Note: you may need to restart the kernel to use updated packages.


In [3]:
import io
import pandas as pd
from minio import Minio

# -------------------------------
# MinIO connection (matches your working DAG style)
# -------------------------------
client = Minio(
    "minio:9000",
    access_key="minioadmin",
    secret_key="minioadmin",
    secure=False,
)

BUCKET = "curated"
PREFIX = "tabular/market_ohlcv_daily/exchange=ASX/"   # narrow scope as you did

# -------------------------------
# List parquet objects
# -------------------------------
parquet_keys = [
    obj.object_name
    for obj in client.list_objects(BUCKET, prefix=PREFIX, recursive=True)
    if obj.object_name.endswith(".parquet")
]

print(f"Found {len(parquet_keys)} parquet files under s3://{BUCKET}/{PREFIX}")
print("Sample keys:", parquet_keys[:5])

# -------------------------------
# Read each parquet and concatenate
# (force trade_date to a consistent type to avoid Arrow merge errors)
# -------------------------------
dfs = []
for key in parquet_keys:
    resp = client.get_object(BUCKET, key)
    try:
        data = resp.read()  # bytes
    finally:
        resp.close()
        resp.release_conn()

    part = pd.read_parquet(io.BytesIO(data), engine="pyarrow")

    # Normalise problematic columns if present
    if "trade_date" in part.columns:
        part["trade_date"] = part["trade_date"].astype("string")
    if "exchange" in part.columns:
        part["exchange"] = part["exchange"].astype("string")

    dfs.append(part)

df = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()

# -------------------------------
# Basic EDA / sanity checks
# -------------------------------
print("\n=== SHAPE ===")
print(df.shape)

print("\n=== DTYPES ===")
print(df.dtypes)

print("\n=== INFO ===")
df.info()

print("\n=== HEAD ===")
display(df.head())

print("\n=== NULL COUNTS ===")
print(df.isna().sum())

print("\n=== NULL PERCENTAGE ===")
print((df.isna().mean() * 100).round(2))


Found 1264 parquet files under s3://curated/tabular/market_ohlcv_daily/exchange=ASX/
Sample keys: ['tabular/market_ohlcv_daily/exchange=ASX/trade_date=2020-12-17/snapshot.parquet', 'tabular/market_ohlcv_daily/exchange=ASX/trade_date=2020-12-18/snapshot.parquet', 'tabular/market_ohlcv_daily/exchange=ASX/trade_date=2020-12-21/snapshot.parquet', 'tabular/market_ohlcv_daily/exchange=ASX/trade_date=2020-12-22/snapshot.parquet', 'tabular/market_ohlcv_daily/exchange=ASX/trade_date=2020-12-23/snapshot.parquet']

=== SHAPE ===
(122963, 16)

=== DTYPES ===
dataset_id                        object
exchange                  string[python]
ticker                            object
vendor_symbol                     object
currency                          object
trade_date                string[python]
open                             float64
high                             float64
low                              float64
close                            float64
volume                             in

Unnamed: 0,dataset_id,exchange,ticker,vendor_symbol,currency,trade_date,open,high,low,close,volume,adj_close,ingest_ts,ingest_batch_id,source_object_key,row_hash
0,market_ohlcv_daily,ASX,360,360.AX,AUD,2020-12-17,3.85,3.99,3.81,3.95,91294,3.95,2025-12-16 08:49:59.041432+00:00,scheduled__2025-12-16T08:40:00+00:00,tabular/market_ohlcv_daily/exchange=ASX/trade_...,e22b5e4c6c2695413658963759b32c1b7220c49b26e1a7...
1,market_ohlcv_daily,ASX,RIO,RIO.AX,AUD,2020-12-17,114.470001,116.440002,114.309998,116.440002,1420174,81.684738,2025-12-16 08:49:59.041432+00:00,scheduled__2025-12-16T08:40:00+00:00,tabular/market_ohlcv_daily/exchange=ASX/trade_...,a83adf5393cbee77f090f0b6c6e87f0c08c908b324fa4a...
2,market_ohlcv_daily,ASX,RHC,RHC.AX,AUD,2020-12-17,63.389999,64.669998,63.220001,64.410004,534856,58.891674,2025-12-16 08:49:59.041432+00:00,scheduled__2025-12-16T08:40:00+00:00,tabular/market_ohlcv_daily/exchange=ASX/trade_...,1b261391b60fe20d11cdc0d06806c56642537df72d9d4b...
3,market_ohlcv_daily,ASX,REH,REH.AX,AUD,2020-12-17,15.85,16.620001,15.6,16.5,2136732,15.50421,2025-12-16 08:49:59.041432+00:00,scheduled__2025-12-16T08:40:00+00:00,tabular/market_ohlcv_daily/exchange=ASX/trade_...,fb829094ef888e3b4a83d0c8e35567a567063c395bafb4...
4,market_ohlcv_daily,ASX,REA,REA.AX,AUD,2020-12-17,146.800003,148.850006,146.5,148.589996,251588,141.032791,2025-12-16 08:49:59.041432+00:00,scheduled__2025-12-16T08:40:00+00:00,tabular/market_ohlcv_daily/exchange=ASX/trade_...,0c3434b7ba29d461be1b7b882ab57c053f6f1e373a7b3a...



=== NULL COUNTS ===
dataset_id              0
exchange                0
ticker                  0
vendor_symbol           0
currency                0
trade_date              0
open                    0
high                    0
low                     0
close                   0
volume                  0
adj_close            3000
ingest_ts               0
ingest_batch_id         0
source_object_key       0
row_hash                0
dtype: int64

=== NULL PERCENTAGE ===
dataset_id           0.00
exchange             0.00
ticker               0.00
vendor_symbol        0.00
currency             0.00
trade_date           0.00
open                 0.00
high                 0.00
low                  0.00
close                0.00
volume               0.00
adj_close            2.44
ingest_ts            0.00
ingest_batch_id      0.00
source_object_key    0.00
row_hash             0.00
dtype: float64


In [4]:
bytes_used = df.memory_usage(deep=True).sum()
mb = bytes_used / (1024 ** 2)
gb = bytes_used / (1024 ** 3)

mb, gb

(np.float64(92.41347694396973), np.float64(0.09024753607809544))

In [5]:
df.memory_usage(deep=True).sort_values(ascending=False)


source_object_key    16356605
row_hash             14878523
ingest_batch_id      11435559
dataset_id            9222225
trade_date            8238521
vendor_symbol         7749195
ticker                7380306
currency              7377780
exchange              7377780
adj_close              983704
open                   983704
high                   983704
low                    983704
close                  983704
volume                 983704
ingest_ts              983704
Index                     132
dtype: int64

In [6]:
df.info(memory_usage="deep")


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 122963 entries, 0 to 122962
Data columns (total 16 columns):
 #   Column             Non-Null Count   Dtype              
---  ------             --------------   -----              
 0   dataset_id         122963 non-null  object             
 1   exchange           122963 non-null  string             
 2   ticker             122963 non-null  object             
 3   vendor_symbol      122963 non-null  object             
 4   currency           122963 non-null  object             
 5   trade_date         122963 non-null  string             
 6   open               122963 non-null  float64            
 7   high               122963 non-null  float64            
 8   low                122963 non-null  float64            
 9   close              122963 non-null  float64            
 10  volume             122963 non-null  int64              
 11  adj_close          119963 non-null  float64            
 12  ingest_ts          122963 non-

In [7]:
bytes_per_row = df.memory_usage(deep=True).sum() / len(df)
bytes_per_row


np.float64(788.0627017883428)

In [8]:
df.shape


(122963, 16)

In [9]:

df.dtypes

dataset_id                        object
exchange                  string[python]
ticker                            object
vendor_symbol                     object
currency                          object
trade_date                string[python]
open                             float64
high                             float64
low                              float64
close                            float64
volume                             int64
adj_close                        float64
ingest_ts            datetime64[ns, UTC]
ingest_batch_id                   object
source_object_key                 object
row_hash                          object
dtype: object

In [10]:


list(df.columns)

['dataset_id',
 'exchange',
 'ticker',
 'vendor_symbol',
 'currency',
 'trade_date',
 'open',
 'high',
 'low',
 'close',
 'volume',
 'adj_close',
 'ingest_ts',
 'ingest_batch_id',
 'source_object_key',
 'row_hash']

In [11]:


df.head(3)

Unnamed: 0,dataset_id,exchange,ticker,vendor_symbol,currency,trade_date,open,high,low,close,volume,adj_close,ingest_ts,ingest_batch_id,source_object_key,row_hash
0,market_ohlcv_daily,ASX,360,360.AX,AUD,2020-12-17,3.85,3.99,3.81,3.95,91294,3.95,2025-12-16 08:49:59.041432+00:00,scheduled__2025-12-16T08:40:00+00:00,tabular/market_ohlcv_daily/exchange=ASX/trade_...,e22b5e4c6c2695413658963759b32c1b7220c49b26e1a7...
1,market_ohlcv_daily,ASX,RIO,RIO.AX,AUD,2020-12-17,114.470001,116.440002,114.309998,116.440002,1420174,81.684738,2025-12-16 08:49:59.041432+00:00,scheduled__2025-12-16T08:40:00+00:00,tabular/market_ohlcv_daily/exchange=ASX/trade_...,a83adf5393cbee77f090f0b6c6e87f0c08c908b324fa4a...
2,market_ohlcv_daily,ASX,RHC,RHC.AX,AUD,2020-12-17,63.389999,64.669998,63.220001,64.410004,534856,58.891674,2025-12-16 08:49:59.041432+00:00,scheduled__2025-12-16T08:40:00+00:00,tabular/market_ohlcv_daily/exchange=ASX/trade_...,1b261391b60fe20d11cdc0d06806c56642537df72d9d4b...
