In [None]:
import polars as pl
import numpy as np
from typing import List, Dict, Union
from concurrent.futures import ThreadPoolExecutor, as_completed

def generate_metadata_fundamental(df_metadata: pl.DataFrame, independent_var_name: str) -> dict:
    sedol, *attribute_parts = independent_var_name.split("_")
    attribute_name = "_".join(attribute_parts)

    # Filter the DataFrame once and select all needed columns
    filtered_df = df_metadata.filter(pl.col("SEDOL") == sedol).select(
        ["companyName", "tickerSymbol", "sector", "subsector", "ISOCode"]
    )

    # Use .item() to get scalar values safely
    company_name = filtered_df.select("companyName").item()
    company_ticker = filtered_df.select("tickerSymbol").item()
    sector = filtered_df.select("sector").item()
    subsector = filtered_df.select("subsector").item()
    country = filtered_df.select("ISOCode").item()

    return {
        "Variable Name": independent_var_name,
        "Company Name": company_name,
        "Company Ticker": company_ticker,
        "sedol": sedol,
        "sector": sector,
        "subsector": subsector,
        "country": country,
        "Attribute": attribute_name
    }

In [None]:
import polars as pl

def get_rolling_correlation(independent_var_name: str, series1: pl.Series, series2: pl.Series, window_size: int = 12) -> tuple[pl.DataFrame, pl.DataFrame]:
    # Ensure series have a datetime index
    if not isinstance(series1.index, pl.datatypes.Date) and not isinstance(series1.index, pl.datatypes.Datetime):
        raise ValueError("series1 must have a datetime index")
    
    # Create a DataFrame from the two series
    df = pl.DataFrame({
        "series1": series1,
        "series2": series2,
        "date": series1.index
    })
    
    # Calculate rolling correlation
    corr_series = df.select([
        pl.col("date"),
        pl.corr("series1", "series2").over("date").rolling(window_size).alias(independent_var_name)
    ])
    
    # Calculate statistics
    stats = corr_series.select([
        pl.lit(independent_var_name.split("_")[0]).alias("SEDOL"),
        pl.lit("_".join(independent_var_name.split("_")[1:])).alias("Attribute"),
        pl.col(independent_var_name).quantile(0.25).alias("Quantile 25%"),
        pl.col(independent_var_name).mean().alias("Mean"),
        pl.col(independent_var_name).median().alias("Median"),
        pl.col(independent_var_name).quantile(0.75).alias("Quantile 75%"),
        pl.col(independent_var_name).last().alias("Latest Value")
    ])
    
    return corr_series, stats

In [None]:
def get_correlation(df: pl.DataFrame, dependent_var_name: str, independent_var_name: str,
                    rolling_corr: bool = True, rolling_windows: List[int] = [1,3,5,7]) -> Dict[str, Union[Dict, pl.DataFrame]]:
    # Initialize result store
    dict_result = {
        "Time Series": {"Yearly Rolling Corr": {}},
        "Yearly Rolling Windows": rolling_windows,
        "Stats": {}
    }
    
    for year in rolling_windows:
        dict_result["Time Series"]["Yearly Rolling Corr"][year] = {}
        dict_result["Stats"][f"{year} Rolling Corr"] = {}
    
    # Extract relevant columns and remove missing history between IV and DV
    df_corr = df.select([dependent_var_name, independent_var_name])
    df_corr = df_corr.filter(
        ~pl.any_horizontal(pl.all().is_infinite()) & 
        ~pl.any_horizontal(pl.all().is_null())
    )

    dict_result["Stats"]["Num Obs"] = df_corr.height

    if df_corr.height <= 5:
        if rolling_corr:
            for year in rolling_windows:
                dict_result["Time Series"][f"{year} Rolling Corr"] = pl.DataFrame()
                dict_result["Stats"][f"{year} Rolling Corr"] = pl.DataFrame()
        return dict_result

    if rolling_corr:
        for year in rolling_windows:
            window_size = year * 52  # assume weekly data
            df_rolling_corr, df_stats = get_rolling_correlation(
                independent_var_name, 
                df_corr.get_column(dependent_var_name), 
                df_corr.get_column(independent_var_name), 
                window_size=window_size
            )
            dict_result["Time Series"][f"{year} Rolling Corr"] = df_rolling_corr
            dict_result["Stats"][f"{year} Rolling Corr"] = df_stats

    return dict_result


In [None]:
def run_rolling_corr_calc(df_universe: pl.DataFrame, df_dependent: pl.DataFrame, dependent_variable_name: str,
                          run_rolling_corr: bool = True, rolling_windows: list[int] = [1,3,5,7]) -> list[pl.DataFrame]:
    
    chunk_size = 2500
    num_chunks = (df_universe.width + chunk_size - 1) // chunk_size

    df_timeseries_dict = {str(year): [] for year in rolling_windows}
    df_stats_dict = {str(year): [] for year in rolling_windows}

    def process_chunk(chunk_idx):
        start_ind = chunk_idx * chunk_size
        end_ind = min((chunk_idx + 1) * chunk_size, df_universe.width)
        
        df_independent = df_universe.select(pl.col(df_universe.columns[start_ind:end_ind]))
        independent_variable_names = df_independent.columns
        
        df_temp = pl.concat([df_dependent, df_independent], how="horizontal")

        results = []
        for independent_variable_name in independent_variable_names:
            result = get_correlation(df_temp, dependent_variable_name, independent_variable_name, run_rolling_corr, rolling_windows)
            results.append(result)
        
        return results

    # Use ThreadPoolExecutor for parallelism
    with ThreadPoolExecutor() as executor:
        future_to_chunk = {executor.submit(process_chunk, i): i for i in range(num_chunks)}
        for future in as_completed(future_to_chunk):
            results = future.result()
            for year in rolling_windows:
                year_str = str(year)
                timeseries_data = [item["Time Series"][f"{year} Rolling Corr"] for item in results]
                df_timeseries_dict[year_str].append(pl.concat(timeseries_data, how="horizontal"))
                
                stats_data = [item["Stats"][f"{year} Rolling Corr"] for item in results]
                df_stats_dict[year_str].append(pl.concat(stats_data, how="vertical"))

    final_results = []
    for year in rolling_windows:
        year_str = str(year)
        
        df_timeseries = pl.concat(df_timeseries_dict[year_str], how="horizontal")
        df_timeseries = df_timeseries.filter(~pl.all(pl.all().is_infinite() | pl.all().is_null()))
        df_timeseries = df_timeseries.with_row_count("Variable Name")
        
        df_stats = pl.concat(df_stats_dict[year_str], how="vertical")
        df_stats = df_stats.filter(~pl.all(pl.all().is_infinite() | pl.all().is_null()))
        df_stats = df_stats.with_row_count("Variable Name")
        
        final_results.extend([df_timeseries, df_stats])

    return final_results

Efficient Data Processing:
Polars is designed for high-performance data processing. It uses Arrow memory format and leverages CPU cache more effectively, resulting in faster operations, particularly for large datasets.
Vectorized Operations:
Polars uses vectorized operations extensively, which are generally faster than the equivalent operations in Pandas, especially for complex calculations like rolling correlations.
Parallelism:
In the run_rolling_corr_calc function, we introduced parallelism using ThreadPoolExecutor. This allows the processing of different chunks of data concurrently, potentially leading to significant speedups on multi-core systems.
Lazy Evaluation:
Although not explicitly used in these conversions, Polars supports lazy evaluation, which can be leveraged for even more performance gains in complex data pipelines.
Efficient Memory Usage:
Polars is generally more memory-efficient than Pandas, which can lead to better performance, especially when working with large datasets that push the limits of available RAM.
Optimized Filtering and Selection:
Operations like filtering rows and selecting columns are highly optimized in Polars, which we've utilized in our conversions.