In [7]:
import os
import pandas as pd
import warnings
# Suppress all warnings
warnings.filterwarnings("ignore")

def merge_stock_data(stock_dir):
    """Merge all bbo and trade files for a single stock and save the result."""
    bbo_dir = os.path.join(stock_dir, 'bbo')
    trade_dir = os.path.join(stock_dir, 'trade')
    
    # Ensure subdirectories exist
    if not os.path.exists(bbo_dir) or not os.path.exists(trade_dir):
        print(f"Skipping {stock_dir}: Missing 'bbo' or 'trade' subdirectories.")
        return None
    
    daily_dfs = []
    
    # Get sorted list of files in both directories
    bbo_files = sorted(os.listdir(bbo_dir))
    trade_files = sorted(os.listdir(trade_dir))
    
    # Match files by name (assuming they correspond, e.g., '1.csv', '2.csv')
    for bbo_file, trade_file in zip(bbo_files, trade_files):
        bbo_path = os.path.join(bbo_dir, bbo_file)
        trade_path = os.path.join(trade_dir, trade_file)
        
        if os.path.isfile(bbo_path) and os.path.isfile(trade_path):
            # Load the files
            bbo_df = pd.read_parquet(bbo_path)
            trade_df = pd.read_parquet(trade_path)
            
            # Merge on a common key (adjust the key column name as necessary)
            merged_df = pd.merge(bbo_df, trade_df, on='xltime', how='inner')  # Replace 'timestamp' if needed

            # Check if 'xltime' column exists and transform it to datetime
            if 'xltime' in merged_df.columns:
                merged_df['datetime'] = pd.to_datetime('1899-12-30') + pd.to_timedelta(merged_df['xltime'], unit='D')
            else:
                print("'xltime' column not found in the dataset.")

            aggregated_df = (
                merged_df.groupby(['datetime'])  # Grouping by 'datetime' or any key to identify duplicates
                .apply(lambda g: pd.Series({
                    'weighted_bid_price': (g['bid-price'] * g['bid-volume']).sum() / g['bid-volume'].sum(),
                    'total_bid_quantity': g['bid-volume'].sum(),
                    'max_bid_quantity': g['bid-volume'].max(),
                    'weighted_ask_price': (g['ask-price'] * g['ask-volume']).sum() / g['ask-volume'].sum(),
                    'total_ask_quantity': g['ask-volume'].sum(),
                    'max_ask_quantity': g['ask-volume'].max(),
                    'total_trade_volume': g['trade-volume'].sum(),


                }))
                .reset_index()
            )
            
            # Append the merged DataFrame to the list
            daily_dfs.append(aggregated_df)
        else:
            print(f"Skipping unmatched files: {bbo_file}, {trade_file}")
    
    # Concatenate all daily DataFrames into one for the stock
    if daily_dfs:
        stock_merged_df = pd.concat(daily_dfs, ignore_index=True)
        
        # Save the merged DataFrame in the stock's folder
        output_path = os.path.join(stock_dir, 'merged_data.csv')  # Save as CSV
        stock_merged_df.to_csv(output_path, index=False)
        print(f"Merged data saved for stock: {os.path.basename(stock_dir)}")
    else:
        print(f"No data to merge for stock: {os.path.basename(stock_dir)}")

def process_all_stocks(base_dir):
    """Process all stocks in the base directory."""
    for stock in sorted(os.listdir(base_dir)):
        stock_dir = os.path.join(base_dir, stock)
        if os.path.isdir(stock_dir):
            print(f"Processing stock: {stock}")
            merge_stock_data(stock_dir)
        else:
            print(f"Skipping non-directory item: {stock}")

# Example usage
base_directory = "/Users/othmaneio/Documents/financial_big_data/data"  # Replace with the path to your stock directories
process_all_stocks(base_directory)


Skipping non-directory item: .DS_Store
Processing stock: A
Merged data saved for stock: A
Processing stock: AA
Merged data saved for stock: AA


In [8]:
!pip install polars


Collecting polars
  Downloading polars-1.17.1-cp39-abi3-macosx_10_12_x86_64.whl (33.0 MB)
[K     |████████████████████████████████| 33.0 MB 3.9 kB/s eta 0:00:011     |█████████████████▊              | 18.3 MB 14.0 MB/s eta 0:00:02MB 11.1 MB/s eta 0:00:01
[?25hInstalling collected packages: polars
Successfully installed polars-1.17.1


In [34]:
import polars as pl
import os

def process_stock_with_polars(stock_dir):
    """Merge bbo and trade files for a single stock using Polars and perform aggregations."""
    bbo_dir = os.path.join(stock_dir, 'bbo')
    trade_dir = os.path.join(stock_dir, 'trade')

    # Ensure subdirectories exist
    if not os.path.exists(bbo_dir) or not os.path.exists(trade_dir):
        print(f"Skipping {stock_dir}: Missing 'bbo' or 'trade' subdirectories.")
        return

    # Get sorted list of files in both directories
    bbo_files = sorted(os.listdir(bbo_dir))
    trade_files = sorted(os.listdir(trade_dir))

    lazy_dfs = []

    for bbo_file, trade_file in zip(bbo_files, trade_files):
        bbo_path = os.path.join(bbo_dir, bbo_file)
        trade_path = os.path.join(trade_dir, trade_file)

        if os.path.isfile(bbo_path) and os.path.isfile(trade_path):
            # Load the Parquet files as Polars lazy DataFrames
            bbo_df = pl.read_parquet(bbo_path).lazy()
            trade_df = pl.read_parquet(trade_path).lazy()


            # Merge on xltime
            merged_df = bbo_df.join(trade_df, on="xltime", how="inner")

            # Ensure consistent types
            merged_df = merged_df.with_columns([
                pl.col("xltime").cast(pl.Float64),         # Ensure `xltime` is Float64
                pl.col("bid-price").cast(pl.Float64),      # Ensure `bid-price` is Float64
                pl.col("bid-volume").cast(pl.Float64),     # Ensure `bid-volume` is Float64
                pl.col("ask-price").cast(pl.Float64),      # Ensure `ask-price` is Float64
                pl.col("ask-volume").cast(pl.Float64),     # Ensure `ask-volume` is Float64
                pl.col("trade-price").cast(pl.Float64),    # Ensure `trade-price` is Float64
                pl.col("trade-volume").cast(pl.Float64),   # Ensure `trade-volume` is Float64
            ])

            # Perform initial aggregation by `xltime`
            aggregated_df = (
                merged_df
                .group_by("xltime")
                .agg([
                    ((pl.col("bid-price") * pl.col("bid-volume")).sum() / pl.col("bid-volume").sum()).alias("weighted_bid_price"),
                    pl.col("bid-volume").sum().alias("total_bid_quantity"),
                    pl.col("bid-volume").max().alias("max_bid_quantity"),
                    ((pl.col("ask-price") * pl.col("ask-volume")).sum() / pl.col("ask-volume").sum()).alias("weighted_ask_price"),
                    pl.col("ask-volume").sum().alias("total_ask_quantity"),
                    pl.col("ask-volume").max().alias("max_ask_quantity"),
                    pl.col("trade-volume").sum().alias("total_trade_volume"),
                    ((pl.col("trade-price") * pl.col("trade-volume")).sum() / pl.col("trade-volume").sum()).alias("weighted_trade_price")
                ])
            )

            # Convert `xltime` to `datetime`
            excel_base_date = pl.datetime(1899, 12, 30)  # Excel starts counting from 1900-01-01, but Polars needs 1899-12-30
            aggregated_df = aggregated_df.with_columns(
                (pl.col("xltime") * pl.duration(days=1) + excel_base_date).alias("datetime")
            )
            aggregated_df = aggregated_df.with_columns(pl.col("datetime").dt.convert_time_zone("America/New_York"))

            # Append the lazy DataFrame for processing
            lazy_dfs.append(aggregated_df)
        else:
            print(f"Skipping unmatched files: {bbo_file}, {trade_file}")

    if lazy_dfs:
        # Combine all daily lazy DataFrames into one
        all_data = pl.concat(lazy_dfs)

        # Collect and write to CSV
        final_df = all_data.collect()
        output_path = os.path.join(stock_dir, "merged_data.csv")
        final_df.write_csv(output_path)
        print(f"Merged data saved as CSV for stock: {os.path.basename(stock_dir)}")
    else:
        print(f"No data to merge for stock: {os.path.basename(stock_dir)}")

def process_all_stocks_with_polars(base_dir):
    """Process all stocks in the base directory using Polars."""
    for stock in sorted(os.listdir(base_dir)):
        stock_dir = os.path.join(base_dir, stock)
        if os.path.isdir(stock_dir):
            print(f"Processing stock: {stock}")
            process_stock_with_polars(stock_dir)
        else:
            print(f"Skipping non-directory item: {stock}")

# Example usage
base_directory = "/Users/othmaneio/Documents/financial_big_data/data"  # Replace with the path to your stock directories
process_all_stocks_with_polars(base_directory)


Skipping non-directory item: .DS_Store
Processing stock: A
Merged data saved as CSV for stock: A
Processing stock: AA
Merged data saved as CSV for stock: AA


In [35]:
merged_A_data = pd.read_csv("/Users/othmaneio/Documents/financial_big_data/data/A/merged_data.csv")

In [37]:
merged_A_data

Unnamed: 0,xltime,weighted_bid_price,total_bid_quantity,max_bid_quantity,weighted_ask_price,total_ask_quantity,max_ask_quantity,total_trade_volume,weighted_trade_price,datetime
0,40301.773480,36.75000,102.0,9.0,36.760000,108.0,9.0,1400.0,36.76,2010-05-03T14:33:48.651000-0400
1,40301.572999,36.58000,10.0,10.0,36.590000,10.0,10.0,400.0,36.59,2010-05-03T09:45:07.117000-0400
2,40301.646308,36.52000,76.0,19.0,36.530000,12.0,3.0,500.0,36.53,2010-05-03T11:30:40.994000-0400
3,40301.575024,36.62000,30.0,15.0,36.630000,6.0,3.0,200.0,36.62,2010-05-03T09:48:02.040000-0400
4,40301.740158,36.85000,72.0,18.0,36.860000,22.0,7.0,400.0,36.86,2010-05-03T13:45:49.634000-0400
...,...,...,...,...,...,...,...,...,...,...
132080,40326.632116,32.45000,8.0,4.0,32.460000,4.0,2.0,200.0,32.45,2010-05-28T11:10:14.850000-0400
132081,40326.830700,32.46000,4.0,4.0,32.460000,2.0,2.0,100.0,32.46,2010-05-28T15:56:12.464000-0400
132082,40326.823511,32.50020,100.0,10.0,32.514316,380.0,41.0,1800.0,32.51,2010-05-28T15:45:51.328000-0400
132083,40326.659751,32.51375,8.0,3.0,32.529677,62.0,15.0,500.0,32.52,2010-05-28T11:50:02.492000-0400


In [36]:
import pandas as pd

def count_duplicate_xltime_rows(df):
    """Count the number of rows that have a duplicate 'xltime' value."""
    if 'xltime' not in df.columns:
        raise ValueError("'xltime' column not found in the DataFrame.")
    
    # Find duplicated rows based on 'xltime'
    duplicated_count = df['xltime'].duplicated(keep=False).sum()
    
    return duplicated_count

# Example usage with your DataFrame
duplicated_rows_count = count_duplicate_xltime_rows(merged_A_data)  # Replace 'your_dataframe' with your actual DataFrame
print(f"Number of rows with duplicate 'xltime': {duplicated_rows_count}")


Number of rows with duplicate 'xltime': 0
