In [None]:
import numpy as np
from numpy.linalg import LinAlgError
import pandas as pd
import polars as pl
from itertools import combinations

# Data

In [None]:
def gen_rets(mu, sig, n_pts=1000):
    return pd.Series(mu + sig * np.random.randn(n_pts))
    return pd.Series(1. + (
        mu + sig * np.random.randn(n_pts)).cumsum())

n_assets = 50
mus = np.linspace(-0.2, 0.2, n_assets) / 365
sig = 0.1 / 365 ** 0.5
returns = [gen_rets(mu, sig) for mu in mus]
# returns[10].plot(grid=True)

dates = (
    pd.Series(pd.date_range(start='1/1/2000', periods=len(returns[0])))
    .rename('date')
)
returns = pd.concat([dates] + returns, axis=1).set_index('date')
returns.plot(grid=True, legend=False)

In [None]:
returns = (
    pl.DataFrame(returns.reset_index())
    .with_columns(pl.col('date').cast(pl.Date))
)

# Polars corr

In [None]:
def window_corrs(columns, method):
    return [
        pl.corr(*pair, method=method).alias('_'.join(pair))
        for pair in combinations(columns, r=2)]

def shape_corr(line):
    dim  = (1 + (1 + 8 * len(line)) ** .5) / 2
    assert dim.is_integer()
    dim = int(dim)
    corr_mat = np.ones((dim, dim))
    inds = np.triu_indices(dim, k=1)
    corr_mat[inds] = corr_mat.T[inds] = line
    return corr_mat

def svd_catched(line):
    corr = shape_corr(line)
    try:
        res = np.linalg.svd(corr, full_matrices=True)[0]
    except LinAlgError:
        res = None
    return res

## Full sample

In [None]:
def get_loadings(returns, n_factors, method='spearman'):
    columns = returns.drop('date').columns
    cols = pl.col(columns)
    returns = returns.with_columns(
        (cols - cols.mean()) / (cols.std())
    )
    corr = (
        returns
        .select(window_corrs(columns, method))
    )
    loads = svd_catched(corr.to_numpy()[0])[:n_factors, :]
    if loads is None:
        loads = []
    return pl.DataFrame(loads, schema=columns)

In [None]:
n_factors = 3
loadings = get_loadings(returns, n_factors)
loadings.shape

## Rolling

In [None]:
def get_rolling_loadings(returns, n_days, n_factors, method='spearman'):
    columns = returns.drop('date').columns
    cols = pl.col(columns)
    returns = returns.with_columns(
        (cols - cols.mean()) / (cols.std())
    )
    rolling_corrs = (
        returns
        .sort('date')
        .rolling('date', period=f'{n_days}d')
        .agg(window_corrs(columns, method))
        .sort('date')
    )
    loadings = [
        [line[0], *load]
        for line in rolling_corrs.iter_rows()
        if (loads := svd_catched(line[1:])) is not None
        for load in loads.tolist()[:n_factors]
    ]
    return (
        pl.DataFrame(loadings, schema=['date', *columns])
        .with_columns(pl.col('date').cast(pl.Date))
    )

In [None]:
%%time

n_days = 50
n_factors = 3
method = 'spearman'

loadings = get_rolling_loadings(returns, n_days, n_factors, method)
loadings

# Misc

In [None]:
def get_lin_coef(rets_vals, factors_vals):
    " LinearRegression(fit_intercept=False).coef_ "
    norms = np.matmul(rets_vals.T, rets_vals)
    ins = np.matmul(rets_vals.T, factors_vals)
    return np.linalg.solve(norms, ins).T