# For all the parquet files

In [2]:
import pandas as pd
import numpy as np
import polars as pl
import tarfile
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path

In [5]:
import polars as pl
import numpy as np
from pathlib import Path

def already_processed(ticker: str, out_file: Path) -> bool:
    """
    Check if a given ticker already exists in the output_parquet.
    Returns True if the ticker is found, False otherwise.
    """
    if not out_file.exists():
        # If output file doesn't exist yet, obviously not processed
        return False
    
    # Read only the ticker column to save memory
    df_ticker = pl.read_parquet(str(out_file), columns=["ticker"]).unique()
    return ticker in df_ticker["ticker"].to_list()


def compute_response_function(df: pl.DataFrame, tau_max: int) -> np.ndarray:
    """
    Computes the response function R(tau) for all lags up to tau_max.
    Expects df to have columns: 'bid-price', 'ask-price', 'trade-price'.

    Args:
        df (pl.DataFrame): DataFrame with 'bid-price', 'ask-price', 'trade-price'.
        tau_max (int): Maximum lag for which to compute the response function.

    Returns:
        np.ndarray: Array of response values for each tau from 1 to tau_max.
    """
    # Compute mid-price
    df = df.with_columns(
        ((pl.col("bid-price") + pl.col("ask-price")) / 2).alias("mid-price")
    )

    # Compute the sign s = sign(trade-price - mid-price), fill trade-price nulls with 0
    df = df.with_columns(
        (pl.col("trade-price").fill_null(0) - pl.col("mid-price")).sign().alias("s")
    )

    # Convert to NumPy arrays
    m = df["mid-price"].to_numpy()
    s = df["s"].to_numpy()

    # Ensure tau_max does not exceed data length
    tau_max = min(tau_max, len(m) - 1)
    response = []

    for tau in range(1, tau_max + 1):
        # Only compute if we have enough data after shifting
        if len(m[tau:]) > 0:
            shifted_diff = m[tau:] - m[:-tau]  # m_{n+tau} - m_n
            resp = np.mean(s[:-tau] * shifted_diff)
            response.append(resp)
        else:
            response.append(np.nan)

    return np.array(response)


def compute_response_by_date_for_folder(
    folder_path: str,
    tau_max: int,
    output_parquet: str
):
    """
    For each *.parquet in folder_path:
      - If the ticker is already in output_parquet, skip it.
      - Otherwise, group by date, compute the response function, and append results to output_parquet.

    The output file will have columns: [ticker, date, tau, response].
    """
    folder = Path(folder_path)
    out_file = Path(output_parquet)

    # Iterate over all parquet files in the folder
    for file_path in folder.glob("*.parquet"):
        ticker = file_path.stem  # e.g., "AAPL.OQ_combined"

        # Check if we already have this ticker in the final parquet
        if already_processed(ticker, out_file):
            print(f"Skipping {ticker}, already processed.")
            continue

        print(f"Processing {ticker}...")

        # Read in the data
        df = pl.read_parquet(str(file_path))

        # Gather results in a Python list of dicts
        results = []

        # Get all unique dates
        unique_dates = df["date"].unique().to_list()

        for date_val in unique_dates:
            # Filter to just that date
            group_df = df.filter(pl.col("date") == date_val)
            if group_df.is_empty():
                continue

            # Compute the response function
            response_arr = compute_response_function(group_df, tau_max)

            # Collect each tau's response
            for tau_idx, resp_val in enumerate(response_arr, start=1):
                results.append({
                    "ticker": ticker,
                    "date": date_val,
                    "tau": tau_idx,
                    "response": resp_val,
                })

        # Skip writing if no results (shouldn't happen if the file had data)
        if not results:
            print(f"No valid response data for {ticker}.")
            continue

        # Convert to Polars DataFrame
        final_df = pl.DataFrame(results)

        # If out_file doesn't exist, just write it
        if not out_file.exists():
            final_df.write_parquet(str(out_file))
        else:
            # If it does exist, read it and append
            existing_df = pl.read_parquet(str(out_file))
            combined = pl.concat([existing_df, final_df], how="vertical")
            combined.write_parquet(str(out_file))

        print(f"Finished {ticker}, appended to {out_file}.")


In [8]:
compute_response_by_date_for_folder(
        folder_path="data",
        tau_max=1000,
        output_parquet="response_functions_all_stocks.parquet"
    )

Processing AAPL.OQ_combined...
Processing AMGN.OQ_combined...
Processing AXP.N_combined...
Processing BA.N_combined...
Processing CAT.N_combined...
Processing CSCO.OQ_combined...
Processing CVX.N_combined...
Processing DOW.N_combined...
Processing GS.N_combined...
Processing HD.N_combined...
Processing IBM.N_combined...
Processing INTC.OQ_combined...
Processing JNJ.N_combined...
Processing JPM.N_combined...
Processing KO.N_combined...
Processing MCD.N_combined...
Processing MMM.N_combined...
Processing MRK.N_combined...
Processing MSFT.OQ_combined...
Processing NKE.N_combined...
Processing PFE.N_combined...
Processing PG.N_combined...
Processing TRV.N_combined...
Processing UNH.N_combined...
Processing UTX.N_combined...
Processing V.N_combined...
Processing VZ.N_combined...
Processing WMT.N_combined...
Processing XOM.N_combined...
Done! Results written to response_functions_all_stocks.parquet


In [22]:
def determine_percentile_threshold(
    response_parquet: str, 
    tau_max: int, 
    percentile: float = 99.0
) -> float:
    """
    Determines the threshold based on a specified percentile of the max_response values.
    This implementation avoids using `groupby` by utilizing window functions.
    
    Parameters:
    -----------
    response_parquet : str
        Path to the Parquet file with columns [ticker, date, tau, response].
    tau_max : int
        The maximum lag to consider.
    percentile : float
        The percentile to use for thresholding (e.g., 99.0 for the 99th percentile).

    Returns:
    --------
    float
        The computed threshold value.
    """
    # 1. Read the response data
    try:
        df = pl.read_parquet(response_parquet)
        print(f"Successfully read {response_parquet}")
    except Exception as e:
        raise IOError(f"Failed to read {response_parquet}: {e}")

    # 4. Filter by tau_max
    df_filtered = df.filter(pl.col("tau") <= tau_max)

    # 5. Compute max_response per (ticker, date) using window functions
    df_with_max = df_filtered.with_columns(
        pl.col("response").max().over(["ticker", "date"]).alias("max_response")
    )

    # 6. Extract unique (ticker, date, max_response) combinations
    df_daily_max = df_with_max.select(["ticker", "date", "max_response"]).unique()
    print(f"Daily Max DataFrame shape: {df_daily_max.shape}")

    # 7. Calculate the desired percentile
    try:
        threshold = df_daily_max["max_response"].quantile(percentile / 100.0)
        print(f"Determined threshold at the {percentile}th percentile: {threshold}")
    except Exception as e:
        raise RuntimeError(f"Failed to compute quantile: {e}")

    return threshold


In [26]:
def detect_flash_crashes_from_response_file(
    response_parquet: str, 
    tau_max: int, 
    threshold: float, 
    output_parquet: str
):
    """
    Reads a Parquet file with precomputed response functions, detects potential flash-crash
    days by looking at the maximum response for each (ticker, date), and writes a filtered 
    Parquet of flagged days.

    This implementation avoids using `groupby` by utilizing window functions.

    Parameters
    ----------
    response_parquet : str
        Path to the Parquet file that has columns [ticker, date, tau, response].
    tau_max : int
        The maximum lag to consider; rows with tau > tau_max will be ignored.
    threshold : float
        The cut-off above which we label a day as having a potential flash crash.
    output_parquet : str
        Path to the Parquet file in which to store flash-crash detections. This file 
        will have columns [ticker, date, max_response].
    """

    try:
        df = pl.read_parquet(response_parquet)
        print(f"Successfully read {response_parquet}")
    except Exception as e:
        raise IOError(f"Failed to read {response_parquet}: {e}")

    # 4. Filter by tau_max
    df_filtered = df.filter(pl.col("tau") <= tau_max)

    # 5. Compute max_response per (ticker, date) using window functions
    df_with_max = df_filtered.with_columns(
        pl.col("response").max().over(["ticker", "date"]).alias("max_response")
    )
    # 6. Extract unique (ticker, date, max_response) combinations
    df_daily_max = df_with_max.select(["ticker", "date", "max_response"]).unique()
    # 7. Filter days where max_response exceeds the threshold
    df_flash_crash = df_daily_max.filter(pl.col("max_response") > threshold)

    # 8. Write the flagged days to the output Parquet file
    if df_flash_crash.is_empty():
        print("No potential flash-crash days found. Writing empty file.")
    else:
        print(f"Writing flash-crash detections to {output_parquet} ...")

    df_flash_crash.write_parquet(output_parquet)
    print("Flash-crash detection completed.")


In [32]:
test = pl.read_parquet("response_functions_all_stocks.parquet")
test.shape

(5754000, 4)

In [27]:
tau_max = 100  
percentile = 99.0  # Starting with the 99th percentile
threshold = determine_percentile_threshold("response_functions_all_stocks.parquet", tau_max)

Successfully read response_functions_all_stocks.parquet
Daily Max DataFrame shape: (5754, 3)
Determined threshold at the 99.0th percentile: 0.008211838832022249


In [28]:
detect_flash_crashes_from_response_file(
        response_parquet="response_functions_all_stocks.parquet", 
        tau_max=100,            # or any integer limit on tau
        threshold=threshold,         # or any threshold that suits your detection logic
        output_parquet="flash_crashes_detected.parquet"
    )

Successfully read response_functions_all_stocks.parquet
Writing flash-crash detections to flash_crashes_detected.parquet ...
Flash-crash detection completed.


In [31]:
test2 = pl.read_parquet("flash_crashes_detected.parquet")
test2

ticker,date,max_response
str,str,f64
"""AAPL.OQ_combined""","""2010-11-03""",0.011162
"""IBM.N_combined""","""2010-01-19""",
"""AAPL.OQ_combined""","""2010-11-29""",0.012902
"""AAPL.OQ_combined""","""2010-06-08""",0.011305
"""GS.N_combined""","""2010-10-26""",
…,…,…
"""WMT.N_combined""","""2010-12-15""",
"""AAPL.OQ_combined""","""2010-12-07""",0.010424
"""JPM.N_combined""","""2010-01-19""",
"""AAPL.OQ_combined""","""2010-09-30""",0.009613


# Test Covariance matrix

In [3]:
import pandas as pd
import numpy as np
import polars as pl
import tarfile
import pandas as pd
import matplotlib.pyplot as plt

In [4]:
AAPL = pl.read_parquet("data/AAPL.OQ_combined.parquet")
AMGN = pl.read_parquet("data/AMGN.OQ_combined.parquet")

In [9]:
target_date = "2010-05-06"
start_time = "14:45"
AAPL_flash_crash_day = filter_by_date(AAPL, "2010-05-06")
AMGN_flash_crash_day = filter_by_date(AMGN, "2010-05-06")

AAPL_flash_crash_day = AAPL_flash_crash_day.with_columns(
    pl.col('index').str.slice(11, 12).alias('truncated_index')  # Extract "HH:MM:SS.ssssss"
)
AMGN_flash_crash_day = AMGN_flash_crash_day.with_columns(
    pl.col('index').str.slice(11, 12).alias('truncated_index')
)

AAPL_flash_crash_day = AAPL_flash_crash_day.with_columns(((AAPL_flash_crash_day['bid-price'] + AAPL_flash_crash_day['ask-price']) / 2).alias('mid_price'))
AMGN_flash_crash_day = AMGN_flash_crash_day.with_columns(((AMGN_flash_crash_day['bid-price'] + AMGN_flash_crash_day['ask-price']) / 2).alias('mid_price'))

AAPL_flash_crash_day = AAPL_flash_crash_day.with_columns(AAPL_flash_crash_day['mid_price'].pct_change().alias('mid_price_return'))
AMGN_flash_crash_day = AMGN_flash_crash_day.with_columns(AMGN_flash_crash_day['mid_price'].pct_change().alias('mid_price_return'))

# Remove rows where returns are zero or NaN
AAPL_flash_crash_day = AAPL_flash_crash_day.filter(AAPL_flash_crash_day['mid_price_return'] != 0).drop_nulls()
AMGN_flash_crash_day = AMGN_flash_crash_day.filter(AMGN_flash_crash_day['mid_price_return'] != 0).drop_nulls()

# 3. Join the dataframes with full join and sort
result_pl_df = AAPL_flash_crash_day.join(AMGN_flash_crash_day, on='truncated_index', how='full').sort('truncated_index')

# Forward-fill directly on the existing columns
result_pl_df_filled = result_pl_df.with_columns([
    pl.col('mid_price').fill_null(strategy='forward'),
    pl.col('mid_price_right').fill_null(strategy='forward')
])

df_before, df_during, df_after = split_dataframe_by_quarter(result_pl_df_filled, target_date, start_time )

# Call the function on your dataframe
result = hayashi_yoshida_covariance(df_before)
result_2 = hayashi_yoshida_covariance(df_during)
result_3 = hayashi_yoshida_covariance(df_after)
# Print results
print(f"Hayashi-Yoshida Covariance: {result['hayashi_yoshida_covariance']}")
print(f"Variance of X: {result['variance_x']}")
print(f"Variance of Y: {result['variance_y']}")
print(f"Correlation: {result['correlation']}")
print()
# Print results
print(f"Hayashi-Yoshida Covariance: {result_2['hayashi_yoshida_covariance']}")
print(f"Variance of X: {result_2['variance_x']}")
print(f"Variance of Y: {result_2['variance_y']}")
print(f"Correlation: {result_2['correlation']}")
print()
# Print results
print(f"Hayashi-Yoshida Covariance: {result_3['hayashi_yoshida_covariance']}")
print(f"Variance of X: {result_3['variance_x']}")
print(f"Variance of Y: {result_3['variance_y']}")
print(f"Correlation: {result_3['correlation']}")


Hayashi-Yoshida Covariance: 4.6598161306899626e-07
Variance of X: 3.0557134919094433e-06
Variance of Y: 5.604403826875911e-06
Correlation: 0.1126024277275251

Hayashi-Yoshida Covariance: -1.796399056251842e-06
Variance of X: 0.00019806774824034315
Variance of Y: 2.6253316839041454e-05
Correlation: -0.024911727585088677

Hayashi-Yoshida Covariance: 3.3933135538271396e-08
Variance of X: 8.264664089038107e-06
Variance of Y: 1.1241841328063137e-05
Correlation: 0.003520405273266799


## Functions used : 

In [18]:
import pandas as pd
import numpy as np
import polars as pl
import tarfile
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from typing import Tuple
import os
import itertools

In [19]:
def filter_by_date(df: pl.DataFrame, target_date: str) -> pl.DataFrame:
    """
    Filters the Polars DataFrame by the given date string
    and returns a new DataFrame.
    """
    return df.filter(pl.col("date") == target_date)

In [30]:
from intervaltree import Interval, IntervalTree

def hayashi_yoshida_covariance(dataframe, default_columns=("mid_price_return", "mid_price_return_right")):
    """
    Calculate the Hayashi-Yoshida covariance estimator for two numeric columns in a dataframe.

    Parameters:
        dataframe (pl.DataFrame): The input dataframe.
        default_columns (tuple): A tuple specifying default column names for the calculation.

    Returns:
        dict: A dictionary containing the Hayashi-Yoshida covariance, variances, and correlation.
    """
    if all(col in dataframe.columns for col in default_columns):
        col_x, col_y = default_columns
    else:
        raise ValueError(
            f"The dataframe must contain the specified columns: {default_columns}"
        )

    # Convert Polars DataFrame to Pandas for interval handling
    df = dataframe.to_pandas()

    # Extract necessary columns and drop NaNs
    df_x = df[["truncated_index", col_x]].dropna().sort_values(by="truncated_index").drop_duplicates(subset="truncated_index")
    df_y = df[["truncated_index_right", col_y]].dropna().sort_values(by="truncated_index_right").drop_duplicates(subset="truncated_index_right")

    # Parse truncated_index into proper datetime with millisecond precision
    df_x["truncated_index"] = pd.to_datetime(df_x["truncated_index"], format="%H:%M:%S.%f")
    df_y["truncated_index_right"] = pd.to_datetime(df_y["truncated_index_right"], format="%H:%M:%S.%f")

    # Create intervals for both series
    intervals_x = pd.IntervalIndex.from_arrays(
        df_x["truncated_index"][:-1], df_x["truncated_index"][1:], closed="right"
    )
    intervals_y = pd.IntervalIndex.from_arrays(
        df_y["truncated_index_right"][:-1], df_y["truncated_index_right"][1:], closed="right"
    )

    # Build an interval tree for Y
    tree_y = IntervalTree(
        Interval(begin.value, end.value, idx)
        for idx, (begin, end) in enumerate(zip(df_y["truncated_index_right"][:-1], df_y["truncated_index_right"][1:]))
    )

    # Calculate covariance
    # Calculate average of the product of returns
    covariance = 0
    #count = 0  # To keep track of the number of products
    for i, interval_x in enumerate(intervals_x):
        overlaps = tree_y.overlap(interval_x.left.value, interval_x.right.value)  # Use overlap instead of search
        for overlap in overlaps:
            j = overlap.data  # Index in df_y
            covariance += df_x[col_x].iloc[i] * df_y[col_y].iloc[j]
            #count += 1

    # Calculate average
    #average_product = covariance / count if count > 0 else 0

        
    # Calculate variances
    variance_x = np.sum(df_x[col_x] ** 2)
    variance_y = np.sum(df_y[col_y] ** 2)

    # Calculate correlation
    correlation = covariance / (np.sqrt(variance_x * variance_y) +1e-8)

    return {
        "hayashi_yoshida_covariance": average_product,
        "variance_x": variance_x,
        "variance_y": variance_y,
        "correlation": correlation,
    }


In [35]:
def split_dataframe_by_quarter(df: pl.DataFrame, date: str, start_time_str: str) -> Tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]:
    clean_df = df.filter(
        (pl.col('mid_price_return').is_not_null()) & 
        (pl.col('mid_price_return_right').is_not_null())
    )
    
    start_datetime_str = f"{date} {start_time_str}:00"
    start_datetime = datetime.strptime(start_datetime_str, "%Y-%m-%d %H:%M:%S")
    
    quarter_duration = timedelta(minutes=15)
    end_datetime = start_datetime + quarter_duration
    
    start_quarter = start_datetime.strftime("%H:%M:%S")
    end_quarter = end_datetime.strftime("%H:%M:%S")
    
    df_before_quarter = clean_df.filter(pl.col("truncated_index") < start_quarter)
    df_during_quarter = clean_df.filter(
        (pl.col("truncated_index") >= start_quarter) & 
        (pl.col("truncated_index") <= end_quarter)
    )
    df_after_quarter = clean_df.filter(pl.col("truncated_index") > end_quarter)
    
    return df_before_quarter, df_during_quarter, df_after_quarter

In [32]:
def process_pair(asset1_file, asset2_file, date, start_time):
    """
    Process two parquet files to compute the Hayashi-Yoshida covariance, variances, and correlations.

    Parameters:
        asset1_file (str): Path to the first asset's parquet file.
        asset2_file (str): Path to the second asset's parquet file.
        date (str): The date to filter on (e.g., "2010-05-06").
        start_time (str): The start time of the quarter (e.g., "09:30").

    Returns:
        dict: A dictionary containing the results for each period (before, during, after).
    """

    # Load the parquet files
    asset1 = pl.read_parquet(asset1_file)
    asset2 = pl.read_parquet(asset2_file)

    # Filter by date
    asset1 = filter_by_date(asset1, date)
    asset2 = filter_by_date(asset2, date)

    # Add truncated index
    asset1 = asset1.with_columns(pl.col('index').str.slice(11, 12).alias('truncated_index'))
    asset2 = asset2.with_columns(pl.col('index').str.slice(11, 12).alias('truncated_index'))

    # Compute mid-price and mid-price returns
    asset1 = asset1.with_columns(((asset1['bid-price'] + asset1['ask-price']) / 2).alias('mid_price'))
    asset1 = asset1.with_columns(asset1['mid_price'].pct_change().alias('mid_price_return'))
    asset1 = asset1.filter(asset1['mid_price_return'] != 0).drop_nulls()

    asset2 = asset2.with_columns(((asset2['bid-price'] + asset2['ask-price']) / 2).alias('mid_price'))
    asset2 = asset2.with_columns(asset2['mid_price'].pct_change().alias('mid_price_return'))
    asset2 = asset2.filter(asset2['mid_price_return'] != 0).drop_nulls()

    # Remove rows where returns are zero or NaN
    asset1 = asset1.filter(asset1['mid_price_return'] != 0).drop_nulls()
    asset2 = asset2.filter(asset2['mid_price_return'] != 0).drop_nulls()


    # Join the dataframes
    result_df = asset1.join(asset2, on='truncated_index', how='full').sort('truncated_index')

    # Forward fill
    result_df = result_df.with_columns([
        pl.col('mid_price').fill_null(strategy='forward'),
        pl.col('mid_price_right').fill_null(strategy='forward')
    ])

    # Split the dataframe into periods
    df_before, df_during, df_after = split_dataframe_by_quarter(result_df, date, start_time)

    # Compute covariance and variances for each period
    results = {
        "before": hayashi_yoshida_covariance(df_before),
        "during": hayashi_yoshida_covariance(df_during),
        "after": hayashi_yoshida_covariance(df_after),
    }
    
    return results


## ALL the files covariance matrix

In [33]:
results2 = process_pair("data_clean/AAPL.OQ_combined.parquet","data_clean/AMGN.OQ_combined.parquet","2010-05-06","14:45")

In [23]:
def compute_covariance_matrices(data_folder, dates, output_folder):
    """
    Compute and append covariance matrices for all asset pairs across multiple dates.

    Parameters:
        data_folder (str): Path to the folder containing asset parquet files.
        dates (list[str]): List of dates and times to process (e.g., ["2010-05-06-14:45"]).
        output_folder (str): Path to the folder to save covariance matrices as parquet files.

    Returns:
        None
    """
    # List all parquet files in the data folder
    asset_files = [os.path.join(data_folder, f) for f in os.listdir(data_folder) if f.endswith(".parquet")]

    # Extract asset names from filenames
    asset_names = [os.path.basename(f).split('.')[0] for f in asset_files]
    n_assets = len(asset_names)

    # Create the output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    # Initialize or read existing parquet files for each period
    def initialize_or_read_parquet(file_path, asset_names):
        if os.path.exists(file_path):
            return pl.read_parquet(file_path)
        else:
            schema = {"date": pl.Utf8}
            schema.update({f"{name}_{name2}": pl.Float64 for name in asset_names for name2 in asset_names})
            return pl.DataFrame(schema=schema)

    before_path = os.path.join(output_folder, "before.parquet")
    during_path = os.path.join(output_folder, "during.parquet")
    after_path = os.path.join(output_folder, "after.parquet")

    before_df = initialize_or_read_parquet(before_path, asset_names)
    during_df = initialize_or_read_parquet(during_path, asset_names)
    after_df = initialize_or_read_parquet(after_path, asset_names)

    # Process each date
    for datestring in dates:
        try:
            date, start_time = datestring.rsplit("-", 1)
        except ValueError:
            print(f"Invalid date format: {datestring}. Expected format: YYYY-MM-DD-HH:MM")
            continue

        print(f"Processing date: {date} with start time: {start_time}")

        # Initialize covariance matrices for this date
        cov_matrix_before = np.zeros((n_assets, n_assets))
        cov_matrix_during = np.zeros((n_assets, n_assets))
        cov_matrix_after = np.zeros((n_assets, n_assets))

        # Iterate over all asset pairs
        for i, j in itertools.combinations_with_replacement(range(n_assets), 2):
            asset1_file = asset_files[i]
            asset2_file = asset_files[j]

            try:
                # Compute covariance results for the pair
                results = process_pair(asset1_file, asset2_file, date, start_time)

                # Fill covariance matrices
                cov_matrix_before[i, j] = results["before"]["hayashi_yoshida_covariance"]
                cov_matrix_during[i, j] = results["during"]["hayashi_yoshida_covariance"]
                cov_matrix_after[i, j] = results["after"]["hayashi_yoshida_covariance"]

                # Symmetric matrix: fill the lower triangle
                if i != j:
                    cov_matrix_before[j, i] = cov_matrix_before[i, j]
                    cov_matrix_during[j, i] = cov_matrix_during[i, j]
                    cov_matrix_after[j, i] = cov_matrix_after[i, j]
            except Exception as e:
                print(f"Error processing pair ({asset_names[i]}, {asset_names[j]}): {e}")
                continue

        # Convert covariance matrices to dataframes
        date_label = {"date": [date] * (n_assets * n_assets)}
        col_names = [f"{name}_{name2}" for name in asset_names for name2 in asset_names]

        before_matrix_df = pl.DataFrame({**date_label, **dict(zip(col_names, cov_matrix_before.flatten()))})
        during_matrix_df = pl.DataFrame({**date_label, **dict(zip(col_names, cov_matrix_during.flatten()))})
        after_matrix_df = pl.DataFrame({**date_label, **dict(zip(col_names, cov_matrix_after.flatten()))})

        # Append to the respective parquet files
        before_df = pl.concat([before_df, before_matrix_df], how="vertical").unique(subset=["date"])
        during_df = pl.concat([during_df, during_matrix_df], how="vertical").unique(subset=["date"])
        after_df = pl.concat([after_df, after_matrix_df], how="vertical").unique(subset=["date"])

    # Write the updated parquet files back
    before_df.write_parquet(before_path)
    during_df.write_parquet(during_path)
    after_df.write_parquet(after_path)

    print(f"Covariance matrices saved for all dates in {output_folder}.")


In [24]:
compute_covariance_matrices(
    data_folder="data_clean",
    dates=["2010-05-06-14:45"],
    output_folder="output_cov_matrices"
)

Processing date: 2010-05-06 with start time: 14:45
Covariance matrices saved for all dates in output_cov_matrices.


In [25]:
def read_covariances(parquet_file, asset_pairs, date=None):
    """
    Reads the covariances for specific asset pairs from a parquet file.

    Parameters:
        parquet_file (str): Path to the parquet file (before, during, or after).
        asset_pairs (list[tuple]): List of tuples specifying asset pairs (e.g., [("AAPL.OQ", "AMGN.OQ")]).
        date (str, optional): If provided, filter results for a specific date (e.g., "2010-05-06").

    Returns:
        pl.DataFrame: DataFrame containing covariances for the specified asset pairs.
    """
    # Load the parquet file
    df = pl.read_parquet(parquet_file)

    # Construct the column names for the pairs
    pair_columns = [f"{pair[0]}_{pair[1]}" for pair in asset_pairs]

    # Filter for the specific date if provided
    if date:
        df = df.filter(pl.col("date") == date)

    # Select only the relevant columns
    columns_to_select = ["date"] + pair_columns
    result_df = df.select(columns_to_select)

    return result_df


# Example usage
parquet_folder = "output_cov_matrices"
asset_pairs = [
    ("AAPL", "AMGN"),
    ("MCD", "MMM"),
]
date = "2010-05-06"

# Read covariances for each period
before_covariances = read_covariances(f"{parquet_folder}/before.parquet", asset_pairs, date)
during_covariances = read_covariances(f"{parquet_folder}/during.parquet", asset_pairs, date)
after_covariances = read_covariances(f"{parquet_folder}/after.parquet", asset_pairs, date)

# Print the results
print("Covariances (Before):")
print(before_covariances)

print("\nCovariances (During):")
print(during_covariances)

print("\nCovariances (After):")
print(after_covariances)


Covariances (Before):
shape: (1, 3)
┌────────────┬───────────┬───────────┐
│ date       ┆ AAPL_AMGN ┆ MCD_MMM   │
│ ---        ┆ ---       ┆ ---       │
│ str        ┆ f64       ┆ f64       │
╞════════════╪═══════════╪═══════════╡
│ 2010-05-06 ┆ 4.6598e-7 ┆ 5.2858e-7 │
└────────────┴───────────┴───────────┘

Covariances (During):
shape: (1, 3)
┌────────────┬───────────┬────────────┐
│ date       ┆ AAPL_AMGN ┆ MCD_MMM    │
│ ---        ┆ ---       ┆ ---        │
│ str        ┆ f64       ┆ f64        │
╞════════════╪═══════════╪════════════╡
│ 2010-05-06 ┆ -0.000002 ┆ -3.7522e-7 │
└────────────┴───────────┴────────────┘

Covariances (After):
shape: (1, 3)
┌────────────┬───────────┬───────────┐
│ date       ┆ AAPL_AMGN ┆ MCD_MMM   │
│ ---        ┆ ---       ┆ ---       │
│ str        ┆ f64       ┆ f64       │
╞════════════╪═══════════╪═══════════╡
│ 2010-05-06 ┆ 3.3933e-8 ┆ 7.9647e-7 │
└────────────┴───────────┴───────────┘


In [28]:
def check_covariance_shapes(output_folder):
    """
    Check the shape of the covariance matrices stored in the parquet files.

    Parameters:
        output_folder (str): Path to the folder containing the covariance parquet files.

    Returns:
        dict: A dictionary with the shapes of the covariance matrices.
    """
    file_paths = {
        "before": f"{output_folder}/before.parquet",
        "during": f"{output_folder}/during.parquet",
        "after": f"{output_folder}/after.parquet",
    }

    shapes = {}

    for period, file_path in file_paths.items():
        try:
            df = pl.read_parquet(file_path)
            if len(df) > 0:
                # Number of unique assets by splitting column names
                asset_columns = [col for col in df.columns if "_" in col]
                n_assets = int(len(asset_columns) ** 0.5)  # Square root of total columns
                shapes[period] = {"rows": len(df), "columns": len(asset_columns), "n_assets": n_assets}
            else:
                shapes[period] = {"rows": 0, "columns": 0, "n_assets": 0}
        except Exception as e:
            shapes[period] = {"error": str(e)}

    return shapes

# Example usage
output_folder = "output_cov_matrices"
shapes = check_covariance_shapes(output_folder)

# Print the results
for period, info in shapes.items():
    print(f"{period.capitalize()} Covariance File:")
    print(info)


Before Covariance File:
{'rows': 1, 'columns': 484, 'n_assets': 22}
During Covariance File:
{'rows': 1, 'columns': 484, 'n_assets': 22}
After Covariance File:
{'rows': 1, 'columns': 484, 'n_assets': 22}
