# Practical Exercise 7.05: Analyzing Factor Portfolios

In [None]:
import pandas as pd
import numpy as np
import requests
import zipfile
import os
from datetime import datetime
import statsmodels.formula.api as smf
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap


In [None]:
#FF Data Library file download

url = 'https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/25_Portfolios_5x5_daily_CSV.zip'

# Specify the path where you want the file to be saved
zip_path = '25_Portfolios_5x5_daily.zip'

# Make HTTP request to download the file
response = requests.get(url)
with open(zip_path, 'wb') as f:
            f.write(response.content)
            print(f"File downloaded and saved as: {zip_path}")

with zipfile.ZipFile(zip_path, 'r') as z:
    # List the names of files in the ZIP to identify the CSV
    csv_files = [f for f in z.namelist() if f.endswith('.csv')]
    csv_file_name = csv_files[0]
    z.extract(csv_file_name, '.')
    print(f"File '{csv_file_name}'extracted correctly.")

#Review the contents of the file
with open('25_Portfolios_5x5_Daily.csv', 'r') as file:
    for i in range(20):
        print(i, file.readline().strip())

# Load the CSV file, assuming the first column should be
# named 'Date' and skipping the header

filepath='25_Portfolios_5x5_Daily.csv'
portfolios = pd.read_csv(filepath, skiprows=18, low_memory=False)
portfolios.rename(columns={portfolios.columns[0]: 'Date'}, inplace=True)

# Remove the first row that is duplicated from the headings
df_portfolios = portfolios.drop(0)

# Convert column 'Date' to datetime
df_portfolios['Date'] = pd.to_datetime(df_portfolios['Date'], errors='coerce')

# Attempt to convert numeric columns to float, errors will
# be handled as NaN
for col in df_portfolios.columns[1:]:
    df_portfolios[col] = pd.to_numeric(df_portfolios[col], errors='coerce')

 # Find the index of the first completely empty row
first_empty_index = df_portfolios[df_portfolios.isnull().all(axis=1)].index.min()

# If there is a completely empty row, extract all rows up to that row.
if pd.notna(first_empty_index):
    df_portfolios = df_portfolios.iloc[:first_empty_index-1]
else:
    df_portfolios = df_portfolios   # If there are no empty rows, use the whole DataFrame
# Create a replacement dictionary
replacement = {
    'SMALL LoBM': 'ME1 BM1',
    'SMALL HiBM': 'ME1 BM5',
    'BIG LoBM': 'ME5 BM1',
    'BIG HiBM': 'ME5 BM5',
   }

# Rename headings
df_portfolios.rename(columns=replacement, inplace=True)
#Filtering data by date
df_portfolios['Date'] = pd.to_datetime(df_portfolios['Date'])
start_date = pd.to_datetime('1963-07-01')
end_date = pd.to_datetime('2024-05-31')
try:
    filtered_portfolios = df_portfolios[(df_portfolios['Date'] >= start_date) & (df_portfolios['Date'] <= end_date)]
except TypeError as e:
    print("TypeError encountered:", e)
    print("Re-checking the data types...")
    print(portfolios.dtypes)  # This will help identify if there's still a data type issue.


In [None]:
#FF Data Library file download

url = 'https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/F-F_Research_Data_5_Factors_2x3_daily_csv.zip'

# Specify the path where you want the file to be saved
zip_path2 = 'F-F_Research_Data_5_Factors_2x3_daily_csv.zip'

# Make HTTP request to download the file
response = requests.get(url)
with open(zip_path2, 'wb') as f:
            f.write(response.content)
            print(f"File downloaded and saved as: {zip_path2}")

with zipfile.ZipFile(zip_path2, 'r') as z:

    # List the names of files in the ZIP to identify the CSV
    csv_files2 = [f for f in z.namelist() if f.endswith('.csv')]
    csv_file_name2 = csv_files2[0]
    z.extract(csv_file_name2, '.')
    print(f"File '{csv_file_name2}' extracted correctly.")


In [None]:
#Review the contents of the file
with open('F-F_Research_Data_5_Factors_2x3_daily.csv', 'r') as file:
    for i in range(20):
        print(i, file.readline().strip())


In [None]:
# Load the CSV file, assuming the first column should be named 'Date' and skipping the header

# Load CSV robustly: detect header row and add 'Date' ---
with open(csv_name, "r", encoding="utf-8", errors="replace") as f:
    lines = f.read().splitlines()

# Find the true header: line that contains 'Mkt-RF' and starts with a comma after optional spaces
header_idx = next(
    i for i, l in enumerate(lines)
    if "Mkt-RF" in l and l.lstrip().startswith(",")
)

# Prepend 'Date' to the header (turn ",Mkt-RF,..." into "Date,Mkt-RF,...")
header_line = "Date" + lines[header_idx]

# Build a CSV string from header + all subsequent lines
csv_text = "\n".join([header_line] + lines[header_idx + 1:])

# Parse; skip initial spaces after commas
raw = pd.read_csv(
    StringIO(csv_text),
    skip_blank_lines=True,
    skipinitialspace=True,
)


# Clean: keep only YYYYMMDD rows, convert types ---
raw.columns = raw.columns.str.strip()
factors = raw[raw["Date"].astype(str).str.match(r"^\s*\d{8}\s*$", na=False)].copy()

# Convert Date to datetime
factors["Date"] = pd.to_datetime(
    factors["Date"].astype(str).str.strip(),
    format="%Y%m%d",
    errors="coerce"
)
factors = factors.dropna(subset=["Date"])

# Ensure factor columns are numeric
factor_cols = ["Mkt-RF", "SMB", "HML", "RMW", "CMA", "RF"]
missing = [c for c in factor_cols if c not in factors.columns]
if missing:
    raise KeyError(f"Missing expected columns: {missing}")
for c in factor_cols:
    factors[c] = pd.to_numeric(factors[c], errors="coerce")

# --- 5) (Optional) rename, then filter by date range ---
replacement = {"Mkt-RF":"Market","SMB":"Size","HML":"Value","RMW":"Quality","CMA":"Investment"}
factors = factors.rename(columns=replacement)

start_date = pd.to_datetime("19630701", format="%Y%m%d")
end_date   = pd.to_datetime("20240531", format="%Y%m%d")
filtered_factors = factors[(factors["Date"] >= start_date) & (factors["Date"] <= end_date)].copy()

print("\nFiltered factors (tail):")
print(filtered_factors.tail())


In [None]:
#We combine the tables based on the column 'date'.

df1 = pd.DataFrame(filtered_portfolios)
df1['Date'] = pd.to_datetime(df1['Date'])

df2 = pd.DataFrame(filtered_factors)
df2['Date'] = pd.to_datetime(df2['Date'])

df_combined = pd.merge(df1, df2, on='Date', how='outer')

#Sort by date
df_combined = df_combined.sort_values(by='Date')

df_combined.tail()


In [None]:
#Calculation of annual volatility

#Ensure that the date column is in the index.

df_combined.set_index('Date', inplace=True)

#Ensure that all columns except 'date' are numeric
for col in df_combined.columns:
    if col != 'Date':
        df_combined[col] = pd.to_numeric(df_combined[col], errors='coerce')

#Excluded columns
excluded_columns= ['Market', 'Size', 'Value','Quality', 'Investment', 'RF']

#Select the columns you want to process
columns_to_process = [col for col in df_combined.columns if col not in excluded_columns]

#Calculate daily volatility
daily_volatility = df_combined[columns_to_process].std()

#Convert daily volatility to annual volatility
annual_volatility = daily_volatility * np.sqrt(252)

#Create a DataFrame for the annual volatility row
summary_volatility = pd.DataFrame(annual_volatility, columns=['volatility_annual']).T




#Ensure that all columns are present and in the
# same order as the original DataFrame.
summary_volatility = summary_volatility.reindex(columns=df_combined.columns)

#Concatenate results to original DataFrame
df_combined_volatility = pd.concat([df_combined, summary_volatility])


In [None]:
#Abnormal Returns
#Create a copy for the result
df_abnormal_returns= df_combined.copy()

#Select the reference column
col_ref = 'RF'

#Indicate the columns to be excluded
exclude_columns = ['Market', 'Size', 'Value','Quality', 'Investment']


#Subtract the reference column from all other numeric columns
for column in df_combined.select_dtypes(include='number').columns:
    if column != col_ref and column not in exclude_columns:
        df_abnormal_returns[column] = df_combined[column] - df_combined[col_ref]


In [None]:
#Calculation of annual abnormal returns and Sharpe ratio

#Columns to be excluded in the calculation
excluded_columns= ['Market', 'Size', 'Value','Quality', 'Investment', 'RF']

#Select the columns you want to process
columns_to_process = [col for col in df_abnormal_returns.columns if col not in excluded_columns]

# Calculate the abnormal returns daily average
# for the selected columns
daily_mean = df_abnormal_returns[columns_to_process].mean()

#Convert to annual average
annual_mean = daily_mean * 252
Sharpe_ratio=annual_mean/annual_volatility

#Create series with the results and keep the original portfolio names.
summary_mean = pd.Series(annual_mean, name='AbnRet mean annual')
summary_volatility = pd.Series(annual_volatility, name='Volatility annual')
summary_Sharpe=pd.Series(Sharpe_ratio, name='Sharpe')

#Add the excluded columns with NaN
for col in exclude_columns:
    summary_mean[col] = np.nan
    summary_volatility[col] = np.nan
    summary_Sharpe[col]=np.nan

#Convert the Series to DataFrames and reorder the columns
# to match the original DataFrame.
summary_mean = summary_mean.reindex(df_abnormal_returns.columns).to_frame().T
summary_volatility = summary_volatility.reindex(df_combined_volatility.columns).to_frame().T
summary_Sharpe=summary_Sharpe.reindex(df_abnormal_returns.columns).to_frame().T

#Delete rows of previous results if they exist
df_abnormal_returns= df_abnormal_returns.drop(index=['AbnRet mean annual', 'Volatility annual', 'Sharpe'], errors='ignore')


#Concatenate results to original DataFrame
df_abnormal_returns = pd.concat([df_abnormal_returns, summary_mean, summary_volatility, summary_Sharpe])

#Convert the date index to a simplified string format only
# if they are not summary indexes
df_abnormal_returns.index = df_abnormal_returns.index.map(lambda x: x.strftime('%Y-%m-%d') if isinstance(x, pd.Timestamp) else x)
df_abnormal_returns.tail()


In [None]:
# Assume df_combined is your already loaded DataFrame
df = df_combined.copy()

# Clean up column names to ensure they have no spaces or special characters.
df.columns = df.columns.str.replace('[^A-Za-z0-9_]+', '_', regex=True)

# Assume that the first 25 columns are the dependent columns and the last 5 are the independent columns.
dependent_columns = df.columns[:25]
independent_columns = df.columns[25:30]


# Generate regression formulas
formulas = [f'{dep} ~ {" + ".join(independent_columns)}' for dep in dependent_columns]

# Perform regressions and extract coefficients and p-values.
results = {}
for formula in formulas:
    model = smf.ols(formula=formula, data=df).fit()
    params = model.params
    pvalues = model.pvalues
    results[formula] = {'coefficients': params, 'pvalues': pvalues}

# Show coefficients and p-values
for formula, result in results.items():
    print(f'Resultados para la fórmula "{formula}":')
    print('Coeficientes:')
    print(result['coefficients'])
    print('P-values:')
    print(result['pvalues'])
    print('\n')


In [None]:
# Initialisation of dictionaries to store coefficients and p-values separately
coefficients = {}
pvalues = {}

# Run regressions and extract coefficients and p-values
for formula in formulas:
    model = smf.ols(formula=formula, data=df).fit()
    dep = formula.split('~')[0].strip()  # Obtener el nombre de la variable dependiente
    coefficients[dep] = model.params
    pvalues[dep] = model.pvalues

# Converting dictionaries to DataFrames
coefficients_df = pd.DataFrame(coefficients)
pvalues_df = pd.DataFrame(pvalues)

# Simplifying column names and replacing underscores with spaces
coefficients_df.columns = [col.replace('_', ' ') for col in coefficients_df.columns]
pvalues_df.columns = [col.replace('_', ' ') for col in pvalues_df.columns]


In [None]:
# Create the final DataFrame ensuring that the above data does not accumulate
df_abnormal_returns= df_abnormal_returns.drop(index=['AbnRet mean annual', 'Volatility annual', 'Sharpe'], errors='ignore')
df_updated = pd.concat([df_abnormal_returns, summary_mean, summary_volatility, summary_Sharpe, coefficients_df])

# Ensure the index is called 'Date'.
if df_updated.index.name != 'Date':
    df_updated.index.name = 'Date'
df_updated.tail(11)


In [None]:
# Filter the first 25 relevant columns
df_filtered = df_updated.iloc[:, :25]

# Define the metrics to create individual DataFrames and
# their respective pivots
metrics = [
    'AbnRet mean annual', 'Volatility annual', 'Sharpe', 'Intercept',
    'Market', 'Size', 'Value', 'Quality', 'Investment'
]

# Dictionary to store the pivot DataFrames
pivot_dfs = {}

# Process each metric, extract data, create pivot and
# store in dictionary
for metric in metrics:
    temp_df = df_filtered.loc[metric].reset_index()
    temp_df[['ME', 'BM']] = temp_df['index'].str.split(' ', expand=True)
    temp_df.drop(columns=['index'], inplace=True)
    pivot_dfs[metric] = temp_df.pivot(index='BM', columns='ME', values=metric)

# Print each rounded DataFrame pivot
for metric, df in pivot_dfs.items():
    print(f'{metric}'); print(round(df, 2 if metric != 'Sharpe' else 3))


In [None]:
# Filter the first 25 relevant columns
df_filtered = df_updated.iloc[:, :25]

# Defining metrics and their colours for heat maps
metrics = [
    'AbnRet mean annual', 'Volatility annual', 'Sharpe', 'Intercept',
    'Market', 'Size', 'Value', 'Quality', 'Investment'
]
colors = ["#ff0000", "#ff6600", "#ffff00", "#99ff99", "#00cc00"]
n_bins = 100  # Use 100 discrete colors
cmap_name = 'custom_heatmap'
custom_cmap = LinearSegmentedColormap.from_list(cmap_name, colors, N=n_bins)

# Process each metric and generate heat maps
for metric in metrics:
    # Filtering the data relevant to the metric
    temp_df = df_filtered.loc[metric].reset_index()

    # Separating variables to obtain row and column categories
    temp_df[['ME', 'BM']] = temp_df['index'].str.split(' ', expand=True)

    # Delete the original index column
    temp_df.drop(columns=['index'], inplace=True)

    # Create a pivot matrix with ME as columns and BM as rows
    pivot_df = temp_df.pivot(index='BM', columns='ME', values=metric)

    # Create the heat matrix
    plt.figure(figsize=(6, 4))
    sns.heatmap(pivot_df, annot=True, cmap=custom_cmap, cbar_kws={'label': f'{metric} Range'})
    plt.title(f'{metric} Heatmap')
    plt.show()
# Dictionary to store the pivot DataFrames
pivot_dfs = {}

# Process each metric, extract data, create pivot and
# store in dictionary
for metric in metrics:
    temp_df = df_filtered.loc[metric].reset_index()
    temp_df[['ME', 'BM']] = temp_df['index'].str.split(' ', expand=True)
    temp_df.drop(columns=['index'], inplace=True)
    pivot_dfs[metric] = temp_df.pivot(index='BM', columns='ME', values=metric)

# Print each rounded DataFrame pivot
for metric, df in pivot_dfs.items():
    print(f'{metric}'); print(round(df, 2 if metric != 'Sharpe' else 3))
