In [None]:
import numpy as np
import pandas as pd
import re
import ast
import pycountry
import requests
from tqdm import tqdm
import os

In [None]:
# Prep functions
def evaluate_literal(val):
    try:
        return ast.literal_eval(val)
    except (ValueError, SyntaxError):
        return val
    
def load(path):
    df = pd.read_csv(path)
    for col in df.columns:
        df[col] = df[col].apply(evaluate_literal)
    return df

def save(df):
    final_df = df[df.apply(is_row_valid, axis=1)]
    final_df = clean_df(final_df)
    try:
        temp_df = load('data/contract_elaborated.csv')
        temp_df = clean_df(temp_df)
        final_df = pd.concat([final_df, temp_df]).drop_duplicates(subset=['symbol', 'exact_search', 'search_exchange', 'search_symbol'])
    except FileNotFoundError:
        pass

    # Filter out the duplicates with 'exact_search' is False
    duplicates_df = final_df[final_df.duplicated(subset='symbol', keep=False)]
    final_df = final_df.drop(duplicates_df[duplicates_df['exact_search'] == False].index)

    final_df.to_csv('data/contract_elaborated.csv', index=False)

def is_numerical(val):
    try:
        val = str(val).replace('%', '')
        float(val)
        return True
    except Exception:
        return False

def is_valid_tuple(tuple, column):
    def extract_float(value):
        match = re.match(r'[^0-9]*([0-9.,]+)', value)
        if match:
            return float(match.group(1).replace(',', ''))
        return None
    
    label, value = tuple
    if not isinstance(label, str): # keep
        # if label != None: # Comment out for more rigid filter
        return False
    if value is None:
        return True # Comment out for more rigid filter
        return False 
    if is_numerical(value):
        return True
    
    if column == 'profile':
        # if value and label:
        return True
    if column == 'fundamentals':
        if value.isupper():
            return True
    if column == 'dividends':
        if value == 'Unknown':
            return True
        extract_float_value = extract_float(value)
        if extract_float_value is not None:
            return True
    if column == 'style':
        if isinstance(value, bool):
            return True
    return False

def is_row_valid(row):
    for col in row.index:
        if isinstance(row[col], list):
            # if col == 'fundamentals':
            #     if len(row[col]) not in [4,5,21,22,   23]: #4, 5, 21, 22 are the acceptable num of fund values, 23 is for little bugs
            #         print(len(row[col]))
            #         return False
            for tuple in row[col]:
                if not is_valid_tuple(tuple, col):
                    print(tuple)
                    return False
    return True

def has_bad_multiplier(long_name):
    cleaned = long_name.replace('-', '').replace('+', '')
    for word in cleaned.split():
        if re.fullmatch(r'\d+X', word):
            if int(word[:-1]) > 1:
                return True
    return False

def get_remaining():
    contract_details = load('data/contract_details.csv')
    try:
        final_df = load('data/contract_elaborated.csv')
        final_df = final_df[final_df.apply(is_row_valid, axis=1)]

        exclusion_condition = (final_df['exchange_bug'] == True) | (final_df['exact_search'] == True) | (~final_df['profile'].isna())
        # exclusion_condition = (final_df['exchange_bug'] == True) | (final_df['exact_search'] == True)
        symbols_to_exclude = final_df[exclusion_condition]['symbol']
        remaining = contract_details[~contract_details['symbol'].isin(symbols_to_exclude)]

        # # To debug invalid rows
        # remaining = final_df.copy()
        # remaining = remaining[~remaining.apply(is_row_valid, axis=1)]
    except FileNotFoundError:
        remaining = contract_details.copy()
        
    remaining = remaining[~remaining['longName'].apply(has_bad_multiplier)]
    remaining = remaining[['symbol', 'exchange', 'primaryExchange', 'validExchanges', 'currency', 'conId', 'longName', 'stockType', 'isin']]
    return remaining

In [None]:
# Prep functions 2
def evaluate_literal(val):
    try:
        return ast.literal_eval(val)
    except (ValueError, SyntaxError):
        return val
    
def load(path):
    df = pd.read_csv(path)
    for col in df.columns:
        df[col] = df[col].apply(evaluate_literal)
    return df

def save(df):
    final_df = df[df.apply(is_row_valid, axis=1)]
    final_df = clean_df(final_df)
    try:
        temp_df = load('data/contract_elaborated.csv')
        temp_df = clean_df(temp_df)
        final_df = pd.concat([final_df, temp_df]).drop_duplicates(subset=['symbol', 'exact_search', 'search_exchange', 'search_symbol'])
    except FileNotFoundError:
        pass

    # Filter out the duplicates with 'exact_search' is False
    duplicates_df = final_df[final_df.duplicated(subset='symbol', keep=False)]
    final_df = final_df.drop(duplicates_df[duplicates_df['exact_search'] == False].index)

    final_df.to_csv('data/contract_elaborated.csv', index=False)

def is_numerical(val):
    try:
        val = str(val).replace('%', '')
        float(val)
        return True
    except Exception:
        return False

def is_valid_tuple(tuple, column):
    def extract_float(value):
        match = re.match(r'[^0-9]*([0-9.,]+)', value)
        if match:
            return float(match.group(1).replace(',', ''))
        return None
    
    label, value = tuple
    if not isinstance(label, str): # keep
        # if label != None: # Comment out for more rigid filter
        return False
    if value is None:
        return True # Comment out for more rigid filter
        return False 
    if is_numerical(value):
        return True
    
    if column == 'profile':
        # if value and label:
        return True
    if column == 'fundamentals':
        if value.isupper():
            return True
    if column == 'dividends':
        if value == 'Unknown':
            return True
        extract_float_value = extract_float(value)
        if extract_float_value is not None:
            return True
    if column == 'style':
        if isinstance(value, bool):
            return True
    return False

def is_row_valid(row):
    for col in row.index:
        if isinstance(row[col], list):
            # if col == 'fundamentals':
            #     if len(row[col]) not in [4,5,21,22,   23]: #4, 5, 21, 22 are the acceptable num of fund values, 23 is for little bugs
            #         print(len(row[col]))
            #         return False
            for tuple in row[col]:
                if not is_valid_tuple(tuple, col):
                    print(tuple)
                    return False
    return True

def has_bad_multiplier(long_name):
    cleaned = long_name.replace('-', '').replace('+', '')
    for word in cleaned.split():
        if re.fullmatch(r'\d+X', word):
            if int(word[:-1]) > 1:
                return True
    return False

def get_remaining():
    contract_details = load('data/contract_details.csv')
    try:
        final_df = load('data/contract_elaborated.csv')
        final_df = final_df[final_df.apply(is_row_valid, axis=1)]

        exclusion_condition = (final_df['exchange_bug'] == True) | (final_df['exact_search'] == True) | (~final_df['profile'].isna())
        # exclusion_condition = (final_df['exchange_bug'] == True) | (final_df['exact_search'] == True)
        symbols_to_exclude = final_df[exclusion_condition]['symbol']
        remaining = contract_details[~contract_details['symbol'].isin(symbols_to_exclude)]

        # # To debug invalid rows
        # remaining = final_df.copy()
        # remaining = remaining[~remaining.apply(is_row_valid, axis=1)]
    except FileNotFoundError:
        remaining = contract_details.copy()
        
    remaining = remaining[~remaining['longName'].apply(has_bad_multiplier)]
    remaining = remaining[['symbol', 'exchange', 'primaryExchange', 'validExchanges', 'currency', 'conId', 'longName', 'stockType', 'isin']]
    return remaining

In [None]:
# Cleaning functions
def clean_labels(label, col):
    if col == 'industries':
        if isinstance(label, str):
            if label.endswith('-Discontinuedeff09/19/2020'):
                return label.split('-')[0]
        return label
    
    elif col == 'holding_types':
        if isinstance(label, str):
            if label.startswith('■'):
                return label[1:]
            elif label.startswith('1'):
                return label[1:]
        return label
    elif col == 'debtors':
        if isinstance(label, str):
            if ('（') in label:
                return label.replace('（', '(')
        return label
    elif col == 'fundamentals':
        if isinstance(label, str):
            if label == 'LTDebt/ShareholdersEquity':
                return 'LTDebt/Shareholders'
        return label
    return label
    
def correct_digit(value_str):
    try:
        digit = re.sub(r'[^\d.-]', '', value_str).strip()
        return float(digit)
    except Exception:
        return value_str

def clean_values(value_str, col):
    # print(value_str)
    if col == 'profile':
        return value_str
    if isinstance(value_str, str):
        if value_str.endswith('%'):
            return correct_digit(value_str.replace('%',''))/100
        try:
            return correct_digit(value_str)
        except Exception:
            return value_str
    return value_str

def clean_df(df):
    for col in df.columns:
        # print(col)
        df[col] = df[col].apply(evaluate_literal)
        df[col] = df[col].apply(lambda x: [(clean_labels(item[0], col), item[1]) if isinstance(item, tuple) and len(item) == 2 else item for item in x] if isinstance(x, list) else x)
        df[col] = df[col].apply(lambda x: [(item[0], clean_values(item[1], col)) if isinstance(item, tuple) and len(item) == 2 else item for item in x] if isinstance(x, list) else x)
        df[col] = df[col].apply(lambda x: sorted(x, key=lambda item: item[0] if isinstance(item, tuple) and item[0] else '') if isinstance(x, list) else x)
    return df

In [None]:
# Cleaning functions 2
def clean_labels(label, col):
    if col == 'industries':
        if isinstance(label, str):
            if label.endswith('-Discontinuedeff09/19/2020'):
                return label.split('-')[0]
        return label
    
    elif col == 'holding_types':
        if isinstance(label, str):
            if label.startswith('■'):
                return label[1:]
            elif label.startswith('1'):
                return label[1:]
        return label
    elif col == 'debtors':
        if isinstance(label, str):
            if ('（') in label:
                return label.replace('（', '(')
        return label
    elif col == 'fundamentals':
        if isinstance(label, str):
            if label == 'LTDebt/ShareholdersEquity':
                return 'LTDebt/Shareholders'
        return label
    return label
    
def correct_digit(value_str):
    try:
        digit = re.sub(r'[^\d.-]', '', value_str).strip()
        return float(digit)
    except Exception:
        return value_str

def clean_values(value_str, col):
    # print(value_str)
    if col == 'profile':
        return value_str
    if isinstance(value_str, str):
        if value_str.endswith('%'):
            return correct_digit(value_str.replace('%',''))/100
        try:
            return correct_digit(value_str)
        except Exception:
            return value_str
    return value_str

def clean_df(df):
    for col in df.columns:
        # print(col)
        df[col] = df[col].apply(evaluate_literal)
        df[col] = df[col].apply(lambda x: [(clean_labels(item[0], col), item[1]) if isinstance(item, tuple) and len(item) == 2 else item for item in x] if isinstance(x, list) else x)
        df[col] = df[col].apply(lambda x: [(item[0], clean_values(item[1], col)) if isinstance(item, tuple) and len(item) == 2 else item for item in x] if isinstance(x, list) else x)
        df[col] = df[col].apply(lambda x: sorted(x, key=lambda item: item[0] if isinstance(item, tuple) and item[0] else '') if isinstance(x, list) else x)
    return df

In [None]:
# Explode columns
contracts_df = load('data/contract_elaborated.csv')
contracts_df = clean_df(contracts_df)
contracts_df = contracts_df[contracts_df.apply(is_row_valid, axis=1)]

empty_subcategories = {
'holding_types': ['holding_types_Other'],
'countries': ['countries_Unidentified'], 
'currencies': ['currencies_<NoCurrency>'],
'industries': ['industries_NonClassifiedEquity', 'industries_NotClassified-NonEquity'],
'top10': ['AccountsPayable','AccountsReceivable','AccountsReceivable&Pay','AdministrationFees','CustodyFees','ManagementFees','OtherAssets','OtherAssetsandLiabilities','OtherAssetslessLiabilities','OtherFees','OtherLiabilities','Tax','Tax--ManagementFees'],
'debtors': ['OTHER',],
'debt_type': ['debt_type_%QualityNotAvailable', 'debt_type_%QualityNotRated'],
'maturity': ['maturity_%MaturityOther'],
}

original_columns = contracts_df.columns
columns_to_explode = ['profile', 'style', 'fundamentals', 'holding_types', 'countries', 'currencies', 'industries', 'top10']#,'debtors', 'maturity', 'debt_type']#, 'lipper', 'dividends']
percentage_columns = [col for col in ['holding_types', 'countries', 'currencies', 'industries', 'top10', 'debtors', 'maturity', 'debt_type'] if col in columns_to_explode]
for col in columns_to_explode:
    contracts_df[col] = contracts_df[col].fillna('[]')
    contracts_df[col] = contracts_df[col].apply(evaluate_literal)

    # Explode and create pivot_df
    contracts_df = contracts_df.explode(col)
    contracts_df[col] = contracts_df[col].apply(lambda x: (None, None) if pd.isna(x) else x)
    contracts_df[['label', 'value']] = pd.DataFrame(contracts_df[col].tolist(), index=contracts_df.index)
    pivot_df = contracts_df.pivot_table(index=contracts_df.index, columns='label', values='value', aggfunc='first')
    pivot_df.rename(columns={label: f'{col}_{label}' for label in pivot_df.columns}, inplace=True)

    # Correct pivot_vf values
    if col in percentage_columns:
        print(col)
        pivot_df = pivot_df.fillna(0.0).clip(lower=0)
        columns_to_drop = [f'{col}_{label}' for label in empty_subcategories[col]]
        pivot_cols = [pivot_col for pivot_col in pivot_df.columns if pivot_col not in columns_to_drop]

        # Clean up errors
        if col:
            pivot_cols_sum = pivot_df[pivot_cols].sum(axis=1)
            mask = pivot_cols_sum > 1
            pivot_df.loc[mask, pivot_cols] = pivot_df.loc[mask, pivot_cols].div(pivot_cols_sum[mask], axis=0)

        pivot_df = pivot_df.drop(columns=columns_to_drop, axis=1, errors='ignore')
        pivot_df[f'{col}_variety'] = pivot_df[pivot_cols].pow(2).sum(axis=1)
        pivot_df[f'{col}_variety'] = pivot_df[f'{col}_variety'].astype(float).replace(0.0, np.nan)

        if col == 'top10':
            print(f'{col}_variety' in pivot_df.columns.to_list())
            columns_to_drop = [column for column in pivot_df.columns if column != f'{col}_variety']
            print(f'{col}_variety' in columns_to_drop)
            pivot_df = pivot_df.drop(columns=columns_to_drop, axis=1)

    contracts_df = contracts_df.drop(columns=[col, 'label', 'value'], axis=1).drop_duplicates(subset='symbol')
    contracts_df = pd.concat([contracts_df, pivot_df], axis=1)

contracts_df = contracts_df[~contracts_df['profile_TotalNetAssets'].isna()]
# for col in percentage_columns:
#     full_columns = [full_column for full_column in contracts_df.columns if full_column.startswith(col)]
#     contracts_df[full_columns] = contracts_df[full_columns].fillna(0.0)

In [None]:
variety = [column for column in contracts_df.columns if column.endswith('_variety')]

# # Check if variety columns are greater than 1
# condition = contracts_df[variety].gt(1).any(axis=1)
# contracts_df[condition]#[variety]

contracts_df[[column for column in contracts_df.columns if column.startswith('industries')]]
contracts_df[variety]

In [None]:
'''
# Variety factors
approaches 1 => little variety
approaches 0 => a lot of variety
sum of country**2
sum of currency**2
sum of industry**2
sum of top10**2'''

In [None]:
# Drop ETF duplicates
basic_classification = [col for col in original_columns if col not in columns_to_explode]
remaining_columns = [col for col in contracts_df.columns if col not in basic_classification]
og_len = len(contracts_df)
contracts_df = (
    contracts_df
    .assign(is_EUR=contracts_df['currency'] == 'EUR')  # Add column: True if currency is 'EUR'
    .sort_values(by='is_EUR', ascending=False)         # Sort so True comes before False
    .drop_duplicates(subset=remaining_columns, keep='first')  # Keep first row (prefers 'EUR')
    .drop(columns=['is_EUR'])                          # Remove temporary column
)
og_len - len(contracts_df)
contracts_df = contracts_df.dropna(subset='symbol')

In [None]:
# Correct total net assets
symbol_mapping = {
    '$': 'USD',    # Default to USD
    '￥': 'JPY',    # Japanese Yen
    'Rs': 'INR',
    'CNH': 'CNY',
    # '¥': 'JPY',    # Alternative Yen symbol
    # '€': 'EUR',    # Euro
    # '£': 'GBP',    # British Pound
    # 'A$': 'AUD',   # Australian Dollar
    # 'C$': 'CAD',   # Canadian Dollar
    # 'HK$': 'HKD',  # Hong Kong Dollar
}

def standardize_currency(currency):
    if pd.isna(currency):
        return np.nan
    if currency in symbol_mapping:
        return symbol_mapping[currency]
    if currency == '':
        return ''
    try:
        if pycountry.currencies.get(alpha_3=currency):
            return currency
    except AttributeError:
        pass
    return currency

def clean_total_net_assets(value):
    if pd.isna(value):
        return np.nan, np.nan
    value = re.sub(r'\basof\b.*', '', value, flags=re.IGNORECASE).strip()
    match = re.match(r'([^0-9\s]+)?\s*([0-9.,]+)\s*([kKmMbB]?)', value)
    if not match:
        return np.nan, np.nan
    currency, num_str, unit = match.groups()
    currency = currency if currency else ''
    num = float(num_str.replace(',', ''))
    unit = unit.lower() if unit else ''
    if unit == 'k':
        num *= 10**3
    elif unit == 'm':
        num *= 10**6
    elif unit == 'b':
        num *= 10**9
    elif unit == 't':
        num *= 10**12
    return num, currency

def get_exchange_rates(currencies, to_currency='USD'):
    rates = {}
    valid_currencies = []
    for c in currencies:
        if pd.notna(c) and pycountry.currencies.get(alpha_3=c) is not None:
            valid_currencies.append(c)
    if not valid_currencies:
        return rates
    try:
        url = f"https://open.er-api.com/v6/latest/{to_currency}"
        response = requests.get(url)
        data = response.json()
        if 'rates' in data:
            for currency in valid_currencies:
                if currency == 'USD':
                    rates[currency] = 1.0
                elif currency in data['rates']:
                    rates[currency] = 1 / data['rates'][currency] if data['rates'][currency] != 0 else np.nan
            print(f"Fetched rates: {rates}")
            return rates
        else:
            print(f"Error fetching rates: {data.get('error', 'Unknown error')}")
    except Exception as e:
        print(f"Exchange rate fetch failed: {e}")
    return rates

def convert_to_usd(row, rates):
    if pd.isna(row['profile_cap']) or pd.isna(row['profile_cap_currency']):
        return np.nan
    currency = row['profile_cap_currency']
    if currency in rates:
        return row['profile_cap'] * rates[currency]
    print(f"No rate available for {currency}")
    return np.nan

contracts_df[['profile_cap', 'profile_cap_currency']] = contracts_df['profile_TotalNetAssets'].apply(lambda x: pd.Series(clean_total_net_assets(x)))
contracts_df['profile_cap_currency'] = contracts_df['profile_cap_currency'].apply(standardize_currency)
contracts_df['profile_cap_currency'] = np.where(contracts_df['profile_cap_currency'] == '', contracts_df['currency'], contracts_df['profile_cap_currency'])
contracts_df['profile_cap_currency'] = contracts_df['profile_cap_currency'].apply(lambda x: x if (pd.isna(x) or pycountry.currencies.get(alpha_3=x) or x == '') else np.nan)

exchange_rates = get_exchange_rates(contracts_df['profile_cap_currency'].unique())
contracts_df['profile_cap_usd'] = contracts_df.apply(lambda row: convert_to_usd(row, exchange_rates),axis=1)
contracts_df.loc[contracts_df['stockType'] == 'ETC', 'industries_BasicMaterials'] = 1.0

# contracts_df[contracts_df['profile_cap_usd'].isna()][['currency', 'profile_cap_currency', 'profile_cap', 'profile_cap_usd', 'profile_TotalNetAssets', 'profile_Domicile', 'profile_MarketGeoFocus']]
contracts_df.to_csv('data/fundamentals.csv', index=False)

# EXTRA

In [None]:
# Reduce to numerical columns
basic_classification = [col for col in original_columns if col not in columns_to_explode]
bond_fundamentals = ['fundamentals_AverageQuality', 'fundamentals_NominalMaturity', 'fundamentals_EffectiveMaturity', 'fundamentals_AverageCoupon', 'fundamentals_YieldtoMaturity']
profile_classification = ['profile_Domicile', 'profile_MarketGeoFocus', 'profile_BenchmarkIndex', 'profile_FundCategory', 'profile_TotalExpenseRatio', 'profile_TotalNetAssets', 'profile_cap', 'profile_cap_currency', 'profile_MarketCapFocus']

classification_columns = basic_classification + bond_fundamentals + profile_classification
data_cols = contracts_df.columns[~contracts_df.columns.isin(classification_columns)]

data = contracts_df[~contracts_df['fundamentals_Price/Book'].isna()].copy()
data = data[~data['fundamentals_LTDebt/Shareholders'].isna()]
data = data[~data['style_large-core'].isna()]

fundamental_columns = [full_column for full_column in contracts_df.columns if full_column.startswith('fundamentals') and full_column not in bond_fundamentals]
for col in fundamental_columns + ['profile_cap_usd']:
    data[col] = (data[col] - data[col].mean()) / data[col].std()

data[data_cols]

In [None]:
# Graph correlations
import seaborn as sns
import matplotlib.pyplot as plt

corr_df = data[data_cols].corr()

# drop columns with missing values
corr_df = data[data_cols].corr()
corr_df.dropna(axis=1, how='all', inplace=True)
corr_df.dropna(axis=0, how='all', inplace=True)
data_cols = corr_df.columns

plt.figure(figsize=(50, 50))
sns.heatmap(corr_df, cmap='coolwarm')
plt.show()

---
### Factor definition
---

In [None]:
'''
# Academic factors
Market beta
SMB
HML
RMW


# Variety factors
approaches 1 => little variety
approaches 0 => a lot of variety
sum of country**2
sum of currency**2
sum of industry**2
sum of top10**2

Check if factors are orthogonal
'''

In [None]:
# Fill in style columns
fundamental_columns = [full_column for full_column in contracts_df.columns if full_column.startswith('fundamentals') and full_column not in bond_fundamentals]

value_columns = ['fundamentals_Price/Book',  'fundamentals_Price/Cash', 'fundamentals_Price/Earnings', 'fundamentals_Price/Sales']#, 'fundamentals_LTDebt/Shareholders', 'fundamentals_TotalDebt/TotalCapital', 'fundamentals_TotalDebt/TotalEquity']#,  'fundamentals_TotalAssets/TotalEquity']
growth_columns = ['fundamentals_EPSGrowth-1yr', 'fundamentals_EPS_growth_3yr', 'fundamentals_EPS_growth_5yr']#, 'fundamentals_ReturnonAssets', 'fundamentals_SalestoTotalAssets']
output_columns = [full_column for full_column in contracts_df.columns if full_column.startswith('style')]

'''
growth score = 2 * [ N_P/B + N_P/E + N_P/Cash + N_P/Sales + N_EPS_growth_1yr + N_EPS_growth_3yr + N_EPS_growth_5yr + 
              N_ReturnonAssets1Yr + N_ReturnonAssets3Yr + N_ReturnonCapital + N_ReturnonCapital3Yr + 
              N_ReturnonEquity1Yr + N_ReturnonEquity3Yr + N_ReturnonInvestment1Yr + N_ReturnonInvestment3Yr + 
              N_SalestoTotalAssets + N_EBITtoInterest + N_RelativeStrength + 
              (1 - N_LTDebt/ShareholdersEquity) + (1 - N_TotalAssets/TotalEquity) + 
              (1 - N_TotalDebt/TotalCapital) + (1 - N_TotalDebt/TotalEquity) ] / 22 - 1

Extreme Growth: If all growth indicators ≈ 1 and value indicators ≈ 0, then S = [18*1 + 4*1]/22 = 1, score = 2*1 - 1 = 1.
Extreme Value: If all growth indicators ≈ 0 and value indicators ≈ 1, then S = [18*0 + 4*0]/22 = 0, score = 2*0 - 1 = -1.
Neutral: If all ≈ 0.5, then S = [18*0.5 + 4*0.5]/22 = 0.5, score = 2*0.5 - 1 = 0.


Step 4: Proposed Refined Model
Balancing your suggestions with practicality and Morningstar’s framework, I recommend:
Select Key Metrics: Use only the most relevant IBKR metrics.
Equal Weighting Within Categories: Follow Morningstar’s approach for simplicity and grounding.
Score Calculation: Compute a value-growth spectrum from -1 to 1.
Refined Model
Value Score = mean((1 - N_P/B) + (1 - N_P/Sales) + (1 - N_P/Cash) + (1 - N_P/E)) # Possibly add: LTDebt/ShareholdersEquity, TotalDebt/Equity
Growth Score = mean(N_EPS_growth_1yr + N_EPS_growth_3yr + N_EPS_growth_5yr) # Possibly add: ReturnonAssets, SalestoTotalAssets

Why This Works
Relevance: Uses metrics tied to Morningstar’s historical measures and value investing principles.
Simplicity: Equal weighting avoids overcomplication while mirroring industry practice.
No Additional Standardization: Normalization suffices for comparability.
Flexibility: Captures the spectrum effectively with available data.

'''


---
### Clustering analysis
---

In [None]:
# distance correlation
import dcor

training_array = data[data_cols].values # Convert training matrix to numpy array
symbol_list = data[data_cols].columns.tolist()
num_symbols = len(symbol_list)
corr_matrix = np.zeros((num_symbols, num_symbols)) # Pre-allocate numpy array for correlation
cov_matrix = np.zeros((num_symbols, num_symbols))  # Pre-allocate numpy array for covariance

for i, sym_i in tqdm(enumerate(symbol_list), total=num_symbols, desc=f"Calculating distance stats sqr"):
    for j, sym_j in enumerate(symbol_list):
        if i <= j:  # Compute only for upper triangle (including diagonal)
            stats = dcor.distance_stats(training_array[:, i], training_array[:, j])
            corr_value = stats.correlation_xy
            cov_value = stats.covariance_xy

            corr_matrix[i, j] = corr_value
            corr_matrix[j, i] = corr_value  # Fill symmetric value

            cov_matrix[i, j] = cov_value
            cov_matrix[j, i] = cov_value  # Fill symmetric value

corr_df = pd.DataFrame(corr_matrix, index=symbol_list, columns=symbol_list) # Convert numpy array back to df for output
cov_df = pd.DataFrame(cov_matrix, index=symbol_list, columns=symbol_list)   # Convert numpy array back to df for output


# drop columns with missing values
corr_df = data[data_cols].corr()#.values
corr_df = corr_df.dropna(axis=1, how='all')
corr_df.dropna(axis=0, how='all', inplace=True)
corr_df.shape

In [None]:
# Create distance matrix
symbol_list = corr_df.columns

symbol2index = dict(zip(corr_df.columns, corr_df.index))
index2symbol = dict(zip(corr_df.index, corr_df.columns))
corr_df.rename(columns=symbol2index, inplace=True)
# cov_df.rename(columns=symbol2index, inplace=True)

distance_matrix = (1 - corr_df).to_numpy()
np.fill_diagonal(distance_matrix, 0)

In [None]:
# Thresholds / cluster_num graphs
methods = ['single', 'ward', 'average', 'complete', 'weighted', 'centroid', 'median']
methods = ['ward']
for method in methods:
    linked = sch.linkage(squareform(distance_matrix), method=method)
    
    num_clusters = range(len(corr_df), 1, -1)
    thresholds = linked[:, 2]

    plt.figure(figsize=(12, 6))
    plt.plot(num_clusters, thresholds, marker='o')
    plt.title(f"Threshold/Num ({method})")
    plt.xlabel('Number of Clusters')
    plt.ylabel('Threshold (Distance)')
    plt.grid(True)
    plt.show()


In [None]:
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
import scipy.cluster.hierarchy as sch
from scipy.spatial.distance import squareform

# Silhouettes and dendrograms
def product(row):
    product = 1
    for value in row.values():
        product *= value
    return product

methods = ['single', 'ward', 'average', 'complete', 'weighted', 'centroid', 'median']
methods = ['ward']
for method in methods:

    ks = []
    scores = []
    counts = []
    for k in range(2, min(len(distance_matrix), 9)):
        clusters = AgglomerativeClustering(n_clusters=k, linkage=method).fit_predict(distance_matrix)
        score = silhouette_score(distance_matrix, clusters, metric='precomputed')
        ks.append(k)
        scores.append(score)
        unique_clusters, label_counts = np.unique(clusters, return_counts=True)
        label_counts_dict = dict(zip(unique_clusters, label_counts))
        counts.append(label_counts_dict)

    silhouettes = pd.DataFrame({
        'k': ks,
        'score': scores,
        'counts': counts
    })
    silhouettes['combitions'] = silhouettes['counts'].apply(product)
    silhouettes = silhouettes.sort_values(by='score', ascending=False)
    best_k = silhouettes.k.iloc[0]
    display(silhouettes)

    linked = sch.linkage(squareform(distance_matrix), method=method)
    plt.figure(figsize=(40, 15))
    sch.dendrogram(linked, labels=corr_df.index, leaf_rotation=90)
    plt.title(f"Method {method}")
    plt.show()

In [None]:
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
import scipy.cluster.hierarchy as sch
from scipy.spatial.distance import squareform

# Silhouettes and dendrograms
def product(row):
    product = 1
    for value in row.values():
        product *= value
    return product

methods = ['single', 'ward', 'average', 'complete', 'weighted', 'centroid', 'median']
methods = ['ward', 'complete']
for method in methods:

    ks = []
    scores = []
    counts = []
    for k in range(2, min(len(distance_matrix), 9)):
        clusters = AgglomerativeClustering(n_clusters=k, linkage=method).fit_predict(distance_matrix)
        score = silhouette_score(distance_matrix, clusters, metric='precomputed')
        ks.append(k)
        scores.append(score)
        unique_clusters, label_counts = np.unique(clusters, return_counts=True)
        label_counts_dict = dict(zip(unique_clusters, label_counts))
        counts.append(label_counts_dict)

    silhouettes = pd.DataFrame({
        'k': ks,
        'score': scores,
        'counts': counts
    })
    silhouettes['combitions'] = silhouettes['counts'].apply(product)
    silhouettes = silhouettes.sort_values(by='score', ascending=False)
    best_k = silhouettes.k.iloc[0]
    display(silhouettes)

    linked = sch.linkage(squareform(distance_matrix), method=method)
    plt.figure(figsize=(40, 15))
    sch.dendrogram(linked, labels=corr_df.index, leaf_rotation=90)
    plt.title(f"Method {method}")
    plt.show()