# ***Factor Scoring***

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
fundamentals = pd.read_csv("fundamentals_clean.csv")
prices = pd.read_csv("daily_prices_clean.csv")
prices_split_adjusted = pd.read_csv("daily_adjusted_prices_clean.csv")
securities = pd.read_csv("securities_clean.csv")

## ***Value Factor***

***Idea:*** Buy undervalued stocks, ones that are cheap relative to their fundamentals.

***Key Metrics:***
  * **P/E Ratio (Price to Earnings) -** Lower is better (cheap earnings).
  * **P/B Ratio (Price to Book) -** Lower suggests undervaluation.
  * **EV/EBITDA or EV/Sales -** Adjusted for debt and cash.

> We'll rank stocks based on a composite value score (e.g., average rank of P/E, P/B, EV/EBITDA).

*(EV - Enterprise Value)
, (EBITDA - Earnings Before Interest, Taxes, Depreciation, and Amortization*)


* Price-to-Earnings (P/E): `close / eps`
* Price-to-Book (P/B): `close / (total_assets - total_liabilities)`
* Dividend Yield: `dividend_per_share / close`
* PEG Ratio: `pe / earnings_growth_rate`

In [3]:
def calculate_value_factor(fundamentals_df, prices_df):
    value_df = pd.merge(fundamentals_df, prices_df, left_on='ticker_symbol', right_on='symbol', how='left')

    value_df['pe'] = value_df['close'] / value_df['earnings_per_share'].replace(0, np.nan)
    value_df['pb'] = value_df['close'] / (value_df['total_assets'] - value_df['total_liabilities']).replace(0, np.nan)
    value_df['earnings_growth'] = value_df.groupby('symbol')['earnings_per_share'].pct_change(periods=4)

    for metric in ['pe', 'pb']:
        # lower the better hence, dividing it by 1
        value_df[metric + '_norm'] = 1 / value_df.groupby('date')[metric].transform(
            lambda x: (x - x.min()) / (x.max() - x.min() + 1e-8)
        )

    value_df['value_score'] = value_df[['pe_norm', 'pb_norm']].mean(axis=1)

    # Ensure date is datetime64[ns]
    value_df['date'] = pd.to_datetime(value_df['date'])

    return value_df[['symbol', 'date', 'value_score']]

## ***Momentum Factor***

***Idea:*** Stocks that performed well recently tend to keep doing well in the short term.

***Key Metrics:***
  * **3M, 6M -** 3-month or 6-month price return. Higher is better.
  * **Relative Strength Index (RSI) -** For overbought/oversold signals.

> We'll sort stocks by past 12-month returns and select the top decile.


* 3-Month Momentum: `(current_close - close_3mo_ago) / close_3mo_ago`
* 6-Month Momentum: `(current_close - close_6mo_ago) / close_6mo_ago`

In [4]:
def calculate_momentum_factor(prices_df):
    momentum_df = prices_df.copy()

    # 3M(=63 days) and 6M(126 days) Returns
    momentum_df['3m_return'] = momentum_df.groupby('symbol')['close'].pct_change(periods=63)
    momentum_df['6m_return'] = momentum_df.groupby('symbol')['close'].pct_change(periods=126)

    # RSI (Relative Strength Index)
    delta = momentum_df.groupby('symbol')['close'].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    avg_gain = gain.rolling(window=14).mean()
    avg_loss = loss.rolling(window=14).mean()
    rs = avg_gain / (avg_loss + 1e-8)
    momentum_df['rsi'] = 100 - (100 / (1 + rs + 1e-8))

    # Normalize and combine the metrics
    momentum_metrics = ['3m_return', '6m_return', 'rsi']
    for metric in momentum_metrics:
        # As higher momentum is better, we don't resiprocate it.
        momentum_df[metric + '_norm'] = momentum_df.groupby('date')[metric].transform(
            lambda x: (x - x.min()) / (x.max() - x.min() + 1e-8)
        )

    momentum_df['momentum_score'] = momentum_df[[m + '_norm' for m in momentum_metrics]].mean(axis=1)
    momentum_df['date'] = pd.to_datetime(momentum_df['date'])
    return momentum_df[['symbol', 'date', 'momentum_score']]

## ***Quality Factor***

***Idea:***  Invest in financially healthy and efficient companies.

***Key Metrics:***
  * **ROE (Return on Equity) -** Higher = better efficiency.
  * **Debt-to-Equity Ratio -** Lower = Less Financial Risk.
  * **Profit Margin**

*High-quality companies are more resilient to downturns and are often underpriced due to market inefficiencies and hence, Quality is important.*

> We'll create a composite quality score from ROE, D/E, and margin stability.


* Return on Assets (ROA): `net_income / total_assets`
* Asset Turnover Change: `(current_revenue/assets - prev_revenue/assets)`
* Accruals: `(net_income - operating_cash_flow) / total_assets`
* Leverage: `operating_cash_flow / total_debt`

In [5]:
def calculate_quality_factor(fundamentals_df):
    qual_df = fundamentals_df.copy()

    qual_df['roa'] = qual_df['net_income'] / qual_df['total_assets']
    qual_df['asset_turnover'] = qual_df['total_revenue'] / qual_df['total_assets']
    qual_df['asset_turnover_chg'] = qual_df.groupby('ticker_symbol')['asset_turnover'].diff()
    qual_df['accruals'] = (qual_df['net_income'] - qual_df['operating_income']) / qual_df['total_assets']
    qual_df['leverage_ratio'] = qual_df['operating_income'] / qual_df['total_liabilities']

    qual_df['accruals_norm'] = 1 - qual_df.groupby('period_ending')['accruals'].transform(
        lambda x: (x - x.min()) / (x.max() - x.min() + 1e-8)
    )

    # Higher the better
    for metric in ['roa', 'asset_turnover_chg', 'leverage_ratio']:
        qual_df[metric + '_norm'] = qual_df.groupby('period_ending')[metric].transform(
            lambda x: (x - x.min()) / (x.max() - x.min() + 1e-8)
        )

    qual_df['quality_score'] = qual_df[['roa_norm', 'asset_turnover_chg_norm', 'accruals_norm', 'leverage_ratio_norm']].mean(axis=1)
    qual_df = qual_df.rename(columns={'period_ending': 'date', 'ticker_symbol': 'symbol'})
    qual_df['date'] = pd.to_datetime(qual_df['date'])

    return qual_df[['symbol', 'date', 'quality_score']]

## ***Volume(Liquidity) Factor***

***Idea:***  Liquid stocks are easier to trade and more stable.

*'Liquid' means how easily and quickly an asset(like a stock) can be bought or sold in the market w/o significantly affecting it's price.*

***Key Metrics:***
  * **Average Daily Trading Volume**
  * **Turnover Ratio -** Volume/ Shares Outstanding

*High volume means there are high number of investor interested in a particular stock.*

> We'll use volume as a filter to eliminate illiquid stocks.

In [6]:
# Calculate Volume factor using On-Balance Volume (OBV)
def calculate_volume_factor(prices_df):
    vol_df = prices_df.copy()
    vol_df['price_change'] = vol_df.groupby('symbol')['close'].diff()
    vol_df['obv_direction'] = np.where(vol_df['price_change'] > 0, 1,
                              np.where(vol_df['price_change'] < 0, -1, 0))
    vol_df['obv'] = vol_df.groupby('symbol')['obv_direction'].transform(lambda x: (x * vol_df['volume']).cumsum())

    vol_df['volume_score'] = vol_df.groupby('date')['obv'].transform(
        lambda x: (x - x.min()) / (x.max() - x.min() + 1e-8)
    )
    vol_df['date'] = pd.to_datetime(vol_df['date'])
    return vol_df[['symbol', 'date', 'volume_score']]

## ***Volatility Factor***

***Idea:***  Stocks with lower price fluctuations tend to give better risk-adjusted returns.

***Key Metrics:***
  * **Standard Deviation of Daily Returns**
  * **Beta -** Senstivity to market movements.

*Many investors irrationally chase high-volatility "lottery-like" stocks, causing low-volatility stocks to be underpriced.*

> We'll select stocks with the lowest historical volatility over 1–2 years.

In [7]:
def calculate_volatility_factor(prices_df, window=30):
    volat_df = prices_df.copy()
    volat_df['daily_return'] = volat_df.groupby('symbol')['close'].pct_change()
    volat_df['volatility'] = volat_df.groupby('symbol')['daily_return'].transform(lambda x: x.rolling(window).std())
    volat_df['date'] = pd.to_datetime(volat_df['date'])

    # Normalize: lower volatility = higher score
    def normalize_group(x):
        if x.max() == x.min():
            return pd.Series([0.5]*len(x), index=x.index)
        return 1 - (x - x.min()) / (x.max() - x.min() + 1e-8)
    volat_df['volatility_score'] = volat_df.groupby('date')['volatility'].transform(normalize_group)

    # Fill any remaining NaNs with 0.5 (neutral)
    volat_df['volatility_score'] = volat_df['volatility_score'].fillna(0.5)
    return volat_df[['symbol', 'date', 'volatility_score']]

## ***NLP Integration (Sentiment Factor)***
The NLP is used to do the following things -
1. Uses FinBERT model to score the sentiment from financial texts,
2. Scores financial news headlines or tweets,
3. Computes a `sentiment_factor = positive - negative`,
4. Aggregates sentiment scores per stock symbol.

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import pandas as pd
import numpy as np

tokenizer = AutoTokenizer.from_pretrained("yiyanghkust/finbert-tone")
model = AutoModelForSequenceClassification.from_pretrained("yiyanghkust/finbert-tone")
model.eval()

# Labels used for the model
labels = ['negative', 'neutral', 'positive']

# Scores sentiment of a single news/tweet string.
def score_sentiment(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    with torch.no_grad():
        outputs = model(**inputs)
        probs = torch.nn.functional.softmax(outputs.logits, dim=1).numpy()[0]
    return dict(zip(labels, probs))

# Adds FinBERT sentiment scores to a DataFrame of financial news/tweets.
def compute_sentiment_scores(news_df):
    sentiment_data = news_df['text'].apply(score_sentiment).apply(pd.Series)
    return pd.concat([news_df, sentiment_data], axis=1)

# Aggregates sentiment scores into a sentiment factor by averaging per (date, symbol).
def get_sentiment_factor(news_df):
    df = compute_sentiment_scores(news_df)
    df['sentiment_factor'] = df['positive'] - df['negative']  # Customize as needed
    sentiment_factor = df.groupby(['date', 'symbol'])['sentiment_factor'].mean().reset_index()
    return sentiment_factor

# Merging the sentiment_factor_df with the factor scores.
# Function to merge sentiment factor with existing factor scores
def merge_sentiment_with_factors(factor_scores_df, sentiment_df):
    merged_df = factor_scores_df.merge(
        sentiment_df,
        on=['date', 'symbol'],
        how='left'
    )
    merged_df['sentiment_factor'] = merged_df['sentiment_factor'].fillna(0)  # Handle missing values
    return merged_df

## ***Calculating the Factor Values***

In [8]:
value_scores = calculate_value_factor(fundamentals, prices)
quality_scores = calculate_quality_factor(fundamentals)
momentum_scores = calculate_momentum_factor(prices)
volume_scores = calculate_volume_factor(prices)
volatility_scores = calculate_volatility_factor(prices)

In [9]:
quality_scores['date'] = pd.to_datetime(quality_scores['date'])
quality_scores_cleaned = quality_scores.sort_values('date').drop_duplicates(subset=['symbol', 'date'], keep='last')
quality_scores_daily = quality_scores_cleaned.set_index('date').groupby('symbol').resample('D').ffill().drop(columns='symbol').reset_index()

## ***Combining All Factors (Composite Factor)***

While combining the factors, we will consider the following points -
* ***Normalization -*** Min-max scaling ensures comparable factor scores.
* ***Weight Customization -*** Adjust factor weights in composite score based on strategy.
* ***Rebalancing Frequency -*** Monthly rebalancing recommended for factor strategies.

In [None]:
def calculate_composite_score(factor_dfs, sentiment_df, sentiment_weight=0.1):
    base_weights = {
        'value': 0.18,
        'momentum': 0.18,
        'quality': 0.18,
        'volume': 0.18,
        'volatility': 0.18
    }
    total_base = sum(base_weights.values())
    adjusted_weights = {k: v * (1 - sentiment_weight) / total_base for k, v in base_weights.items()}
    adjusted_weights['sentiment'] = sentiment_weight

    sentiment_df = sentiment_df.rename(columns={'sentiment_factor': 'sentiment_score'})
    from functools import reduce

    all_dfs = factor_dfs + [sentiment_df]
    merged_df = reduce(lambda left, right: pd.merge(left, right, on=['symbol', 'date'], how='inner'), all_dfs)
    merged_df['composite_score'] = (
        adjusted_weights['value'] * merged_df['value_score'] +
        adjusted_weights['momentum'] * merged_df['momentum_score'] +
        adjusted_weights['quality'] * merged_df['quality_score'] +
        adjusted_weights['volume'] * merged_df['volume_score'] +
        adjusted_weights['volatility'] * merged_df['volatility_score'] +
        adjusted_weights['sentiment'] * merged_df['sentiment_score']
    )

    return merged_df[['symbol', 'date', 'composite_score']]

In [11]:
factor_dataframes = {
    'value': value_scores,
    'momentum': momentum_scores,
    'quality': quality_scores,
    'volume': volume_scores,
    'volatility': volatility_scores
}
composite_scores = calculate_composite_score(factor_dataframes)

In [None]:
composite_scores = calculate_composite_score_with_sentiment(
    factor_dataframes,
    sentiment_df=sentiment_scores,  
    sentiment_weight=0.15          
)

In [None]:
value_scores.to_csv('value_scores.csv', index=False)
momentum_scores.to_csv('momentum_scores.csv', index=False)
quality_scores.to_csv('quality_scores.csv', index=False)
volume_scores.to_csv('volume_scores.csv', index=False)
volatility_scores.to_csv('volatility_scores.csv', index=False)
composite_scores.to_csv('composite_scores.csv', index=False)