In [None]:
import os
import gzip
import random
import pandas as pd

# Settings
columns_to_keep = ['ticker', 'exchange', 'participant_timestamp', 'price', 'size']
valid_exchanges = {2, 12, 17, 202, 203}
output_file = './datafiles/sample_output.csv'

# Ensure data directory exists
os.makedirs('./datafiles', exist_ok=True)

# Remove output file if it exists
if os.path.exists(output_file):
    os.remove(output_file)

# Sort files chronologically (alphabetically works for YYYY-MM-DD.csv.gz)
data_dir = './datafiles'
file_list = sorted([os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.csv.gz')])

# Choose up to 10 random files
n = min(10, len(file_list))
sample_files = random.sample(file_list, n)
sample_files = sorted(sample_files)  # Sort the sample files for consistent output

# State for del_t and del_p
last_seen = {}  # {ticker: (last_timestamp, last_price)}
header_written = False

for file in sample_files:
    print("Processing:", file)
    with gzip.open(file, 'rt') as f:
        df = pd.read_csv(f, usecols=columns_to_keep)
    # Filter valid exchanges and deduplicate
    df = df[df['exchange'].isin(valid_exchanges)].drop_duplicates()
    # Sort by participant_timestamp
    df = df.sort_values('participant_timestamp').reset_index(drop=True)
    # Compute del_t and del_p
    del_t_list = []
    del_p_list = []
    for idx, row in df.iterrows():
        ticker = row['ticker']
        timestamp = row['participant_timestamp']
        price = row['price']
        if ticker in last_seen:
            last_time, last_price = last_seen[ticker]
            del_t = timestamp - last_time
            del_p = price - last_price
        else:
            del_t = 0
            del_p = 0
        last_seen[ticker] = (timestamp, price)
        del_t_list.append(del_t)
        del_p_list.append(del_p)
    df['del_t'] = del_t_list
    df['del_p'] = del_p_list
    # Write to sample output file
    df.to_csv(output_file, mode='a', header=not header_written, index=False)
    header_written = True

print(f"Sample written to {output_file} ({n} files)")

Processing: ./datafiles/2016-10-10.csv.gz
Processing: ./datafiles/2018-06-26.csv.gz
Processing: ./datafiles/2019-04-09.csv.gz


In [None]:
import os
import gzip
import pandas as pd

# Hardcoded date file
DATE = "2019-08-26"
date_file = f"./datafiles/{DATE}.csv.gz"

# Files
output_file = "./datafiles/output.csv"
output2_tmp = "./datafiles/output2.csv.tmp"

# Parameters
CHUNK_SIZE = 10_000_000  # rows per chunk

# --- 1) load the date file, sort by timestamp, get first timestamp
if not os.path.exists(date_file):
    print(f"Date file not found: {date_file}")
    raise SystemExit(1)

print("Reading date file:", date_file)
with gzip.open(date_file, "rt") as f:
    df_date = pd.read_csv(f, usecols=["participant_timestamp"])

# sort and get first timestamp (ascending)
df_date = df_date.sort_values("participant_timestamp").reset_index(drop=True)
first_ts = int(df_date.iloc[0]["participant_timestamp"])
print("Cutoff timestamp (first entry):", first_ts)

# --- 2) iterate output.csv in chunks and write rows with timestamp < first_ts into output2_tmp
if not os.path.exists(output_file):
    print(f"Output file not found: {output_file}")
    raise SystemExit(1)

# Ensure any previous tmp is removed
if os.path.exists(output2_tmp):
    os.remove(output2_tmp)

writer_header_written = False
processed_rows = 0

print("Streaming", output_file, "->", output2_tmp)
for chunk in pd.read_csv(output_file, chunksize=CHUNK_SIZE):
    # ensure participant_timestamp is int-like
    if chunk["participant_timestamp"].dtype != "int64" and chunk["participant_timestamp"].dtype != "int32":
        chunk["participant_timestamp"] = pd.to_numeric(chunk["participant_timestamp"], errors="coerce").astype("Int64")
    # select rows before cutoff
    valid = chunk[chunk["participant_timestamp"] < first_ts]
    if not valid.empty:
        # write header only for first write
        valid.to_csv(output2_tmp, mode="a", header=not writer_header_written, index=False)
        writer_header_written = True
        processed_rows += len(valid)
    # if any row in this chunk has timestamp >= first_ts, we stop processing further chunks
    if (chunk["participant_timestamp"] >= first_ts).any():
        print("Encountered rows >= cutoff in chunk; stopping after writing valid rows from this chunk.")
        break

print(f"Rows written to temporary file: {processed_rows}")

# --- 3) replace original output.csv with output2 (delete and rename as requested)
# remove original and atomically replace with tmp
try:
    os.remove(output_file)
    print("Deleted original output file.")
except Exception as e:
    print("Warning: could not delete original output file:", e)

# If no rows were written, create an empty CSV with header from original (optional)
if not writer_header_written:
    # try to get header from original by reading small sample
    sample = pd.read_csv(output_file, nrows=0) if os.path.exists(output_file) else None
    if sample is not None:
        sample.to_csv(output2_tmp, index=False)
    else:
        # nothing to do; create empty file
        open(output2_tmp, "w").close()

# rename tmp to output
os.replace(output2_tmp, output_file)
print("Replaced output with pruned temporary file.")