### read data and calc returns

In [None]:
# Todo 
# 1. add dividends
# 2. add fundamental factors 
# 3. run and save daily and weekly

In [1]:
import pandas as pd

In [2]:
def data_raw_ingest(filepath):
    # read in raw data
    df = pd.read_csv(filepath, parse_dates = True).set_index(['ID','DATE'])
    
    # clean up duplicates
    df = df[~df.index.duplicated()].reset_index() # causes multiindex issues later
    
    # drop rows with missing close prices
    df = df.dropna(subset = ['close'])
    if 'Unnamed: 0' in df.columns:
        df = df.drop(columns = ['Unnamed: 0'])

    # convert date to datetime
    df['DATE'] = pd.to_datetime(df['DATE'])
    
    # sort by ID and DATE
    df = df.sort_values(['ID', 'DATE'])
    
    return df

#### read univ and idx data

In [3]:
df = data_raw_ingest('data_raw_univ_rtyohlc_2014.csv')
df.head()

Unnamed: 0,ID,DATE,open,high,low,close,volume,market_cap,bid,ask,baspd
3529456,2551003D US Equity,2019-03-29,29.0,29.95,29.0,29.95,61185.0,,29.98,30.1,0.131854
3529459,2551003D US Equity,2019-04-01,31.0,31.13,30.5438,30.98,222568.0,,30.98,31.0,0.239853
3529460,2551003D US Equity,2019-04-02,31.0,31.5,30.61,31.0,40901.0,1273198000.0,30.85,31.0,0.339595
3529461,2551003D US Equity,2019-04-03,31.0,31.0,29.9,31.0,28411.0,1273198000.0,30.99,31.0,0.398489
3529462,2551003D US Equity,2019-04-04,31.14,31.23,30.97,31.0,369391.0,1273198000.0,30.99,31.0,0.196222


In [4]:
# read SPX data

In [5]:
idxdf = data_raw_ingest('data_raw_idx.csv')
spxdf = idxdf.query('ID == "SPX Index"')

In [6]:
spxdf.head()

Unnamed: 0,ID,DATE,open,high,low,close,volume
0,SPX Index,2004-12-31,1213.55,1217.33,1211.65,1211.92,675498000.0
3,SPX Index,2005-01-03,1211.92,1217.9,1200.3,1202.08,1331432000.0
4,SPX Index,2005-01-04,1202.08,1205.84,1185.39,1188.05,1549158000.0
5,SPX Index,2005-01-05,1188.05,1192.75,1183.72,1183.74,1425108000.0
6,SPX Index,2005-01-06,1183.74,1191.63,1183.23,1187.89,1323134000.0


### merge univ and idx

In [None]:
mdf = df.merge(spxdf[['DATE','close']].rename(columns ={'close':'idx_close'}), on = 'DATE', how = 'left')

In [8]:
mdf.shape

(3975025, 12)

In [9]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from joblib import Parallel, delayed


In [16]:
# for the resample to work properly need this
mdf = mdf.set_index(['ID','DATE'])

KeyError: "None of ['ID', 'DATE'] are in the columns"

In [None]:
def data_raw_resample(df):
    sum_columns = ['volume']  # Specify columns to sum (can be multiple, e.g., ['close', 'high'])

    # Get all columns except 'ID' and 'DATE' (assuming these shouldn't be aggregated)
    all_columns = [col for col in df.columns if col not in ['ID', 'DATE']]

    # Create aggregation dictionary: default 'last' for all, override with 'sum' for specified columns
    agg_dict = {col: 'last' for col in all_columns}
    agg_dict.update({col: 'sum' for col in sum_columns})

    def resample_group(id_group):
        id_val, group = id_group
        resampled = group.resample('M', level='DATE').agg(agg_dict)
        resampled['ID'] = id_val  # Add back the ID
        return resampled

    # Parallel processing function
    def parallel_resample(df):
        # Split the dataframe by ID
        groups = list(df.groupby(level='ID'))
        
        # Process groups in parallel
        results = Parallel(n_jobs=-1)(
            delayed(resample_group)(group) for group in groups
        )
        
        # Combine results
        final_df = pd.concat(results)
        
        return final_df

    # Perform the parallel resampling
    result = parallel_resample(df)

    return result.reset_index()


In [20]:
mdf = data_raw_resample()




### functions - rolling reg / var

In [22]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from joblib import Parallel, delayed


def calculate_simple_returns(df):
    """Helper function to calculate returns and excess returns"""
    df = df.copy()
    df['stock_return'] = df.groupby('ID')['close'].pct_change()
    df['idx_return'] = df.groupby('ID')['idx_close'].pct_change()
    df['stock_excess_return'] = df['stock_return'] - df['idx_return']

    return df

def rolling_residual_variance(df, window_size, dependent_var, independent_vars):
    """
    Performs rolling regression in parallel using joblib.

    Args:
        df: Pandas DataFrame containing the data.
        window_size: Size of the rolling window.
        dependent_var: Name of the dependent variable column.
        independent_vars: List of names of independent variable columns.

    Returns:
        Pandas DataFrame with the regression coefficients for each window.
        Returns None if there are issues.
    """

    n_rows = len(df)
    results = []

    def _regress_window(i):
        if i < window_size -1:
            return None # Handle edge cases at beginning of dataframe
        window_data = df.iloc[i - window_size + 1:i + 1]

        X = window_data[independent_vars].values
        y = window_data[dependent_var].values.reshape(-1,1) # Reshape y for sklearn

        if len(window_data) < window_size or np.any(np.isnan(X)) or np.any(np.isnan(y)):
            return None  # Handle cases where the window is incomplete or contains NaNs.
        
        model = LinearRegression()
        model.fit(X, y)
        y_pred = model.predict(X)

        # Step 3: Calculate the residuals
        residuals = y - y_pred

        # Step 4: Compute the residual variance
        residual_variance = np.var(residuals, ddof=1) 

        return {'index': df.index[i], 'residual_variance': residual_variance} # Include index for proper merging


    results = Parallel(n_jobs=-1)(delayed(_regress_window)(i) for i in range(n_rows))
    
    

    # Filter out None results (from edge cases or NaN windows)
    valid_results = [r for r in results if r is not None]

    if not valid_results: # Check if all results are invalid
        return None

    results_df = pd.DataFrame(valid_results)#.set_index('index')
    
    # Handle multiindex
    if isinstance(results_df['index'].iloc[0], tuple):
        results_df[list(df.index.names)] = results_df['index'].apply(pd.Series)
        results_df = results_df.drop(columns='index').set_index(list(df.index.names))
    else:
        results_df = results_df.set_index('index')
        
    return results_df

def rolling_regression(df, window_size, dependent_var, independent_vars, reg_type='OLS', alpha=1.0):
    """
    Performs rolling regression in parallel using joblib, with options for OLS, Ridge, and Lasso.
    Returns np.nan for coefficients when X or y contains NaN values.

    Args:
        df: Pandas DataFrame containing the data.
        window_size: Size of the rolling window.
        dependent_var: Name of the dependent variable column.
        independent_vars: List of names of independent variable columns.
        reg_type: Type of regression to perform ('OLS', 'Ridge', 'Lasso'). Default is 'OLS'.
        alpha: Regularization strength for Ridge and Lasso. Default is 1.0.

    Returns:
        Pandas DataFrame with the regression coefficients for each window, indexed by the original DataFrame's index.
        Returns None if there are issues.
    """

    n_rows = len(df)
    results = []

    def _regress_window(i):
        if i < window_size - 1:
            return {'index': df.index[i],
                   'intercept': np.nan,
                   **dict(zip(independent_vars, [np.nan] * len(independent_vars)))}

        window_data = df.iloc[i - window_size + 1:i + 1]

        X = window_data[independent_vars].values
        y = window_data[dependent_var].values.reshape(-1,1)

        # Return NaN coefficients if window contains NaN or is incomplete
        if len(window_data) < window_size or np.any(np.isnan(X)) or np.any(np.isnan(y)):
            return {'index': df.index[i],
                   'intercept': np.nan,
                   **dict(zip(independent_vars, [np.nan] * len(independent_vars)))}

        if reg_type.upper() == 'OLS':
            model = LinearRegression()
        elif reg_type.upper() == 'RIDGE':
            model = Ridge(alpha=alpha)
        elif reg_type.upper() == 'LASSO':
            model = Lasso(alpha=alpha)
        else:
            raise ValueError("Invalid reg_type. Choose 'OLS', 'Ridge', or 'Lasso'.")

        model.fit(X, y)
        coefs = model.coef_.flatten()
        intercept = model.intercept_

        return {'index': df.index[i], 'intercept': intercept, **dict(zip(independent_vars, coefs))}

    results = Parallel(n_jobs=-1)(delayed(_regress_window)(i) for i in range(n_rows))

    # All results should be valid now since we're returning NaN instead of None
    results_df = pd.DataFrame(results)

    # Handle multiindex
    if isinstance(results_df['index'].iloc[0], tuple):
        results_df[list(df.index.names)] = results_df['index'].apply(pd.Series)
        results_df = results_df.drop(columns='index').set_index(list(df.index.names))
    else:
        results_df = results_df.set_index('index')

    return results_df



### calculate simple returns 

In [36]:
mdf

Unnamed: 0,DATE,open,high,low,close,volume,market_cap,bid,ask,baspd,idx_close,ID,stock_return,idx_return,stock_excess_return
0,2019-03-31,29.00,29.950,29.00,29.95,61185.0,,29.98,30.10,0.131854,2834.40,2551003D US Equity,,,
1,2019-04-30,32.57,32.570,31.93,32.30,5428804.0,1.326591e+09,32.16,32.30,0.167972,2945.83,2551003D US Equity,0.078464,0.039313,0.039151
2,2019-05-31,30.71,30.835,30.00,30.14,3126708.0,1.239686e+09,30.07,30.16,0.146801,2752.06,2551003D US Equity,-0.066873,-0.065778,-0.001095
3,2019-06-30,30.15,30.340,29.54,29.75,9986393.0,1.222527e+09,29.66,29.67,0.034628,2941.76,2551003D US Equity,-0.012940,0.068930,-0.081870
4,2019-07-31,26.02,26.670,25.79,26.39,10273595.0,1.084454e+09,26.40,26.41,0.059012,2980.38,2551003D US Equity,-0.112941,0.013128,-0.126069
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
194133,2024-10-31,8.98,8.980,8.56,8.63,862807.0,2.748270e+08,8.61,8.65,0.034862,5705.45,ZYXI UW Equity,0.057598,-0.009897,0.067495
194134,2024-11-30,8.36,8.405,8.32,8.32,700669.0,2.649549e+08,8.32,8.35,0.044879,6032.38,ZYXI UW Equity,-0.035921,0.057301,-0.093223
194135,2024-12-31,8.01,8.080,7.97,8.01,685834.0,2.550828e+08,8.01,8.04,0.040056,5881.63,ZYXI UW Equity,-0.037260,-0.024990,-0.012269
194136,2025-01-31,8.07,8.070,7.72,7.84,457674.0,2.496691e+08,7.83,7.84,0.058212,6040.53,ZYXI UW Equity,-0.021223,0.027016,-0.048240


In [37]:
mdf = calculate_simple_returns(mdf).set_index(['ID','DATE'])

  df['stock_return'] = df.groupby('ID')['close'].pct_change()
  df['idx_return'] = df.groupby('ID')['idx_close'].pct_change()


### calculate low volatility factors | analytics

In [38]:
calc_period = "monthly"

if calc_period == "daily":
    beta_period = 252
    vol_period = 63
    fwd_ret_period = 63
elif calc_period == "weekly":
    beta_period = 52
    vol_period = 26
    fwd_ret_period = 13
elif calc_period == "monthly":
    beta_period = 24
    vol_period = 24
    fwd_ret_period = 3

print(beta_period, vol_period, fwd_ret_period)

24 24 3


In [39]:
mdf['beta'] = mdf\
.groupby('ID', group_keys = False).apply(lambda x: rolling_regression(x, window_size =  beta_period, dependent_var = 'stock_return', independent_vars = ['idx_return']))\
.drop(columns = 'intercept')\
.rename(columns = {'idx_return':'beta'})



In [40]:


mdf['volatility'] = mdf.groupby('ID',group_keys = False)['stock_return'].rolling(vol_period).std().mul(np.sqrt(252)).reset_index(level = 0, drop = True).rename('volatility')



In [41]:

mdf['avg_volm_to_cap'] = mdf.groupby('ID', group_keys = False).apply(lambda x: x['volume'].rolling(vol_period).mean()/(x['market_cap']/1000000)).rename('avg_volm_to_cap')


In [42]:


mdf['volume_trend'] = mdf.groupby('ID', group_keys = False).apply(lambda x: rolling_regression(x.assign(trend = lambda x:np.arange(len(x))), window_size = vol_period , dependent_var = 'volume', independent_vars = ['trend'])).rename(columns = {'trend':'volume_trend'}).drop(columns = 'intercept')


In [43]:



mdf['residual_variance'] = mdf.groupby('ID', group_keys = False).apply(lambda x: rolling_residual_variance(x, window_size = vol_period , dependent_var = 'stock_return', independent_vars = ['idx_return']))



In [45]:
def compound_returns(returns):
    return (1 + returns).prod() - 1

# List of rolling periods to calculate
periods = [1, 2, 3, 6, 12]

# Calculate rolling compounded returns for each period
for period in periods:
    # Stock returns
    mdf[f'stock_return_{period}m'] = mdf.groupby('ID')['stock_return'].rolling(
        window=period, min_periods=period
    ).apply(compound_returns).reset_index(level = 0, drop = True).values

    # SPY returns
    mdf[f'idx_return_{period}m'] = mdf.groupby('ID')['idx_return'].rolling(
        window=period, min_periods=period
    ).apply(compound_returns).reset_index(level = 0, drop = True).values

    # Calculate excess returns (stock - spy)
    mdf[f'rs_{period}m'] = mdf[f'stock_return_{period}m'] - mdf[f'idx_return_{period}m']

In [46]:
mdf['3mrs_3mago'] = mdf.groupby('ID')['rs_3m'].shift(3)
mdf['3mrs_6mago'] = mdf.groupby('ID')['rs_3m'].shift(6)
mdf['3mrs_9mago'] = mdf.groupby('ID')['rs_3m'].shift(9)

### value factors

In [None]:
# earnings to price

In [None]:
# sales to price 

In [None]:
# cash to price

In [None]:
# dividend to price 

In [None]:
# book to price


In [47]:
def calc_forward_rets(mdf, fwd_ret_period):
    mdf['forward_return'] = (mdf.groupby('ID')['close']
                          .shift(-fwd_ret_period) / mdf['close'] - 1)

    mdf['idx_forward_return'] = (mdf.groupby('ID')['idx_close']
                            .shift(-fwd_ret_period) / mdf['idx_close'] - 1)

    mdf['relative_return'] = (
            mdf['forward_return'] - mdf['idx_forward_return']
        )
    
    return mdf

In [48]:
mdf = calc_forward_rets(mdf, fwd_ret_period)

In [50]:
mdf.to_csv('data_analytics_univ_rty_2014_m.csv')