In [8]:
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pyarrow as pa
import pandas as pd
import os
import shutil
import glob
from pathlib import Path


In [9]:
!touch ~/workspace/data/parquet_filesystem/us_equities

In [72]:
# Optimize parquet files

import uuid
from tqdm import tqdm

year = '2024'
timeframe = '1-min'
parquet_path = Path(f'~/workspace/data/parquet_filesystem/us_equities/year={year}/timeframe={timeframe}').expanduser()
ticker_paths = [entry.path for entry in os.scandir(parquet_path) if entry.is_dir()]

filter_tickers = ["SRVR", "OP", "BCpC"]  # ✅ List of target tickers


for i, ticker_path in tqdm(enumerate(ticker_paths)):
    try:
        if not any(ticker == ticker_path.split("/")[-1].split("=")[-1] for ticker in filter_tickers):
            continue
        print(f"Processing {ticker_path}")
        dataset = ds.dataset(ticker_path, format="parquet", partitioning="hive", ignore_prefixes=[".", "_"])
        table = dataset.to_table(
            use_threads=True
        )
        df = table.to_pandas().drop_duplicates()
        merged_table = pa.Table.from_pandas(df)
        for file in glob.glob(f"{ticker_path}/*.parquet"):
            os.remove(file)
        pq.write_table(merged_table, os.path.join(ticker_path, f"merged_{uuid.uuid4()}.parquet"), compression="snappy")
    except Exception as e:
        print(f"🚨 Corrupted file detected: {ticker_path} {e}")


                       
        
        
    


0it [00:00, ?it/s]

Processing /Users/bank/workspace/data/parquet_filesystem/us_equities/year=2024/timeframe=1-min/ticker=SRVR


5015it [00:00, 15127.91it/s]

Processing /Users/bank/workspace/data/parquet_filesystem/us_equities/year=2024/timeframe=1-min/ticker=OP
Processing /Users/bank/workspace/data/parquet_filesystem/us_equities/year=2024/timeframe=1-min/ticker=BCpC


12629it [00:00, 24131.54it/s]


In [78]:
50 % 50

0

In [80]:
#deleting corrupted files
foo = ["/Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=KW",
      "/Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=SRVR",
      "/Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=PLMR",
          "/Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=ALpA",
      "/Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=BCpC"]

for path in foo:
    print(f"Removing parquets in {path}")
    for file in glob.glob(f"{path}/*.parquet"):
        print(f"     removing {file}")
        os.remove(file)


Removing parquets in /Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=KW
     removing /Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=KW/aa643ac2bbbc49bdaac851fedb5ee5c1-1.parquet
     removing /Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=KW/bd4c281be26a4c57ab7581fb173e02ae-0.parquet
     removing /Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=KW/4a64a831267a492db2022cba0854e63f-0.parquet
     removing /Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=KW/bba38f9c999646d79de21981ea12bd22-0.parquet
     removing /Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=KW/78ec6ebd805149d799e47b1667741cf9-0.parquet
     removing /Users/bank/workspace/data/parquet_filesystem/us_equities/year=2023/timeframe=1-min/ticker=KW/fb63be173860424daa4831

In [89]:
CSV_DIR= '~/workspace/data/us_stocks/'
all_csv_path = Path(CSV_DIR).expanduser()
for j, path in tqdm(enumerate(all_csv_path.rglob(f"2023-*.csv.gz"))):
    print(path.stem.split(".")[0])
    



250it [00:00, 16307.05it/s]

2023-01-19
2023-02-28
2023-12-11
2023-11-20
2023-02-24
2023-05-16
2023-06-27
2023-09-26
2023-01-23
2023-05-08
2023-05-04
2023-11-08
2023-12-27
2023-01-31
2023-11-16
2023-03-06
2023-10-10
2023-03-14
2023-10-02
2023-07-21
2023-04-10
2023-03-22
2023-08-16
2023-07-17
2023-08-08
2023-04-26
2023-10-26
2023-03-30
2023-08-04
2023-07-05
2023-03-08
2023-04-12
2023-03-16
2023-08-22
2023-07-31
2023-10-12
2023-08-30
2023-07-07
2023-08-18
2023-07-19
2023-10-24
2023-04-28
2023-04-24
2023-03-20
2023-08-14
2023-09-12
2023-12-01
2023-01-17
2023-11-30
2023-01-09
2023-05-22
2023-06-13
2023-12-13
2023-01-05
2023-11-22
2023-05-30
2023-06-01
2023-05-18
2023-11-14
2023-02-02
2023-06-29
2023-09-28
2023-11-06
2023-02-10
2023-12-29
2023-04-20
2023-07-11
2023-08-10
2023-03-24
2023-03-28
2023-07-03
2023-08-02
2023-10-20
2023-04-04
2023-10-16
2023-07-27
2023-10-04
2023-11-02
2023-02-14
2023-01-25
2023-09-20
2023-06-21
2023-05-10
2023-11-10
2023-02-06
2023-12-21
2023-05-02
2023-06-05
2023-06-09
2023-02-22
2023-12-05




In [66]:
pd.read_parquet("/Users/bank/workspace/data/parquet_filesystem/us_equities/year=2024/timeframe=1-min/" + 
                "ticker=HOPE/merged_eabb7bb1-293e-4e59-ab7f-6eea8061959f.parquet")

Unnamed: 0,volume,open,close,high,low,window_start,transactions
644657,21698,11.1200,11.1700,11.2300,11.100,2024-01-31 09:30:00-05:00,45
644658,10136,11.1700,11.1650,11.2100,11.110,2024-01-31 09:31:00-05:00,95
644659,21817,11.1400,11.0000,11.1400,10.970,2024-01-31 09:32:00-05:00,101
644660,5767,10.9800,10.9300,11.0050,10.930,2024-01-31 09:33:00-05:00,81
644661,2931,10.9400,11.0050,11.0130,10.940,2024-01-31 09:34:00-05:00,41
...,...,...,...,...,...,...,...
614354,8050,12.4900,12.5150,12.5200,12.490,2024-09-24 15:55:00-04:00,120
614355,5688,12.5200,12.5150,12.5200,12.515,2024-09-24 15:56:00-04:00,54
614356,10279,12.5150,12.5289,12.5290,12.515,2024-09-24 15:57:00-04:00,153
614357,8809,12.5201,12.5100,12.5201,12.510,2024-09-24 15:58:00-04:00,164


In [48]:
# Define dataset path
year = '2025'
timeframe = '1-min'
test_path = Path(f'~/workspace/data/parquet_filesystem/us_equities/year={year}/timeframe={timeframe}')
# Load the dataset dynamically
dataset = ds.dataset(test_path, format="parquet", partitioning="hive", ignore_prefixes=[".", "_"])



# Identify all unique partitions (automatically detected)
#partitions = dataset.files

#table = dataset.to_table(filter=(ds.field("year") == "2025") & (ds.field("ticker") == "AAPL"))



<pyarrow._dataset.HivePartitioning object at 0x11962ef10>


In [52]:
# Optimized query
table = dataset.to_table(
    filter=(ds.field("ticker") == "AMZN"),
    use_threads=True  # ✅ Enables parallel processing
)


In [53]:
table.to_pandas()

Unnamed: 0,volume,open,close,high,low,window_start,transactions,ticker
77863,1192608,230.5450,229.7300,230.6600,229.3200,2025-02-10 09:30:00-05:00,19138,AMZN
77864,245941,229.7400,229.6100,230.2400,229.4350,2025-02-10 09:31:00-05:00,3588,AMZN
77865,273738,229.6000,229.4400,229.7394,229.3400,2025-02-10 09:32:00-05:00,3524,AMZN
77866,352179,229.4200,230.1300,230.1598,229.2000,2025-02-10 09:33:00-05:00,3712,AMZN
77867,293055,230.0900,230.7600,230.8500,229.6800,2025-02-10 09:34:00-05:00,3724,AMZN
...,...,...,...,...,...,...,...,...
79550,204527,228.9300,228.7000,228.9600,228.7000,2025-02-12 15:55:00-05:00,2835,AMZN
79551,252257,228.7003,228.5650,228.7003,228.5550,2025-02-12 15:56:00-05:00,3061,AMZN
79552,327622,228.5600,228.8994,228.9600,228.4500,2025-02-12 15:57:00-05:00,4339,AMZN
79553,273494,228.8900,228.7700,228.9400,228.7600,2025-02-12 15:58:00-05:00,3776,AMZN
