In [None]:
from zipfile import ZipFile
zip = ZipFile('Resources/archive.zip')
zip.extractall('Resources')

In [None]:
import pandas as pd
import hvplot.pandas
import matplotlib.pyplot as plt
from pathlib import Path
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler

In [None]:
#Display all the columns (to see which to drop)
pd.set_option("display.max_columns", None)

# Read in Mutual Fund informaiton csv


In [None]:
#Read in CSV data 
#MutualFunds
mutualFunds= pd.read_csv(
    Path("Resources/MutualFunds.csv")
,index_col="fund_symbol")
mutualFunds.head()

In [None]:
# Find columns with only 1 value to drop
mutualFunds.loc[: , mutualFunds.dtypes== "object"].nunique()

In [None]:
#convert dates to date
mutualFunds["inception_date"] = pd.to_datetime(mutualFunds["inception_date"])
mutualFunds["management_start_date"] = pd.to_datetime(mutualFunds["management_start_date"])
mutualFunds["returns_as_of_date"] = pd.to_datetime(mutualFunds["returns_as_of_date"])

In [None]:
mutualFunds_counts= mutualFunds.loc[: ,mutualFunds.dtypes=="object"].nunique()
mutualFunds_counts_one= mutualFunds_counts[mutualFunds_counts == 1].index.to_list()
print(mutualFunds_counts_one)

In [None]:
#Drop columns with only 1 unique value
mutualFunds.drop(columns=mutualFunds_counts_one,inplace=True)
mutualFunds

_We noticed that there are many `NaN` values throughout the data set. We will now explore which columns/rows have NaN and will remove an appropriate amount of columns/rows._

In [None]:
mutualFunds.dropna(axis = 0)

If we drop rows that have an `NaN`, then all data is removed. Instead, let's look at the columns that have missing data.

In [None]:
# Looking at each column and the count of NaN in each column
list = []
for index, row in pd.DataFrame(mutualFunds.isna().sum()).iterrows():
    list.append((index,row))


In [None]:
# Handling Missing Values
# Fill missing values in numerical columns with mean or median
numerical_cols = mutualFunds.select_dtypes(include=['number']).columns
mutualFunds[numerical_cols] = mutualFunds[numerical_cols].fillna(mutualFunds[numerical_cols].mean())

# Fill missing values in categorical columns with mode
categorical_cols = mutualFunds.select_dtypes(include=['object']).columns
mutualFunds[categorical_cols] = mutualFunds[categorical_cols].fillna(mutualFunds[categorical_cols].mode().iloc[0])

In [None]:
# Encoding Categorical Variables
# One-hot encode categorical variables
mutualFunds_encoded = pd.get_dummies(mutualFunds, columns=categorical_cols)

# Scaling Numerical Features
# Separate numerical columns for scaling
numerical_cols = mutualFunds_encoded.select_dtypes(include=['number']).columns

In [None]:
list[0]

We plan to find the number of columns that have above a certain percentage of `NaN` and then remove those columns. We have chosen 60% so far.

In [None]:
threshold = 0.6*len(mutualFunds)
drop_columns = []
for i in range(len(list)):
    if list[i][1][0] >= threshold:
        drop_columns.append(list[i][0])

In [None]:
print(len(drop_columns))
drop_columns


In [None]:
mutualFunds.drop(columns=drop_columns, inplace = True)
mutualFunds

In [None]:
# Find columns with only 1 value to drop
mutualFunds.loc[: , mutualFunds.dtypes== "object"].nunique()

In [None]:
#Drop unnecessary columns
mutualFunds.drop(columns=["fund_short_name", "fund_long_name","management_name", "management_bio", "investment_strategy"],inplace=True)


In [None]:
#Remove quarterly data
mutualFunds.drop(columns=["fund_return_2021_q1",
"fund_return_2020_q4","fund_return_2020_q3","fund_return_2020_q2","fund_return_2020_q1",
"fund_return_2019_q4","fund_return_2019_q3","fund_return_2019_q2","fund_return_2019_q1",
"fund_return_2018_q4","fund_return_2018_q3","fund_return_2018_q2","fund_return_2018_q1",
"fund_return_2017_q4","fund_return_2017_q3","fund_return_2017_q2","fund_return_2017_q1",
"fund_return_2016_q4","fund_return_2016_q3","fund_return_2016_q2","fund_return_2016_q1",
"fund_return_2015_q4","fund_return_2015_q3","fund_return_2015_q2","fund_return_2015_q1",
"fund_return_2014_q4","fund_return_2014_q3","fund_return_2014_q2","fund_return_2014_q1",
"fund_return_2013_q4","fund_return_2013_q3","fund_return_2013_q2","fund_return_2013_q1",
"fund_return_2012_q4","fund_return_2012_q3","fund_return_2012_q2","fund_return_2012_q1",
"fund_return_2011_q4","fund_return_2011_q3","fund_return_2011_q2","fund_return_2011_q1",
"fund_return_2010_q4","fund_return_2010_q3","fund_return_2010_q2","fund_return_2010_q1",
"fund_return_2009_q4","fund_return_2009_q3","fund_return_2009_q2","fund_return_2009_q1",
"fund_return_2008_q3","fund_return_2008_q2","fund_return_2008_q1",
"fund_alpha_3years","fund_beta_3years","fund_mean_annual_return_3years","fund_r_squared_3years","fund_stdev_3years","fund_sharpe_ratio_3years","fund_treynor_ratio_3years",
"fund_alpha_5years","fund_beta_5years","fund_mean_annual_return_5years","fund_r_squared_5years","fund_stdev_5years","fund_sharpe_ratio_5years","fund_treynor_ratio_5years",
"fund_alpha_10years","fund_beta_10years","fund_mean_annual_return_10years","fund_r_squared_10years","fund_stdev_10years","fund_sharpe_ratio_10years","fund_treynor_ratio_10years",
"fund_return_category_rank_ytd","fund_return_category_rank_1month","fund_return_category_rank_3months","fund_return_category_rank_1year","fund_return_category_rank_3years",
"fund_return_category_rank_5years","load_adj_return_1year","load_adj_return_3years","load_adj_return_5years","load_adj_return_10years",
"top10_holdings"],inplace=True)

In [None]:
#Remove metrics that we can caluclate ourselves
#keep: "esg_score","environment_score", "sustainability_score", "sustainability_rank",   "social_score",  "governance_score", 
mutualFunds.drop(columns=["esg_peer_count","peer_esg_min", "peer_esg_avg", "peer_esg_max",
"peer_environment_min", "peer_environment_avg", "peer_environment_max", 
"peer_social_min", "peer_social_avg", "peer_social_max",
"peer_governance_min", "peer_governance_avg", "peer_governance_max"],inplace=True)


In [None]:
mutualFunds.loc[: , mutualFunds.dtypes== "object"].nunique()

Done with Data cleanup and preprocessing


In [None]:
# Calculate the average for each column
column_means = mutualFunds.mean(skipna=True, numeric_only=True)
# Replace NaN values in each column with the respective column average
mutualFunds.fillna(column_means, inplace=True)

In [None]:
mutualFunds.dropna(inplace= True)
mutualFunds

In [None]:
mutualFunds.dtypes

# Binning!

In [None]:
mutualFunds.loc[:,mutualFunds.dtypes == "object"].dtypes

Binning `fund_category`

In [None]:
fund_category_type_count = mutualFunds["fund_category"].value_counts()[mutualFunds["fund_category"].value_counts() > 150]
print(100*fund_category_type_count.sum()/len(mutualFunds))
print(fund_category_type_count.count())

In [None]:
for cat in mutualFunds["fund_category"]:
    if cat in fund_category_type_count.index.to_list():
        next
    else:
        mutualFunds["fund_category"] = mutualFunds["fund_category"].replace(cat, "Other")

In [None]:
mutualFunds["fund_category"].value_counts()

Binning `fund_family_type_count`

In [None]:
fund_family_type_count = mutualFunds["fund_family"].value_counts()[mutualFunds["fund_family"].value_counts() > 150]
print(100*fund_family_type_count.sum()/len(mutualFunds))
print(fund_family_type_count.count())

In [None]:
for cat in mutualFunds["fund_family"]:
    if cat in fund_family_type_count.index.to_list():
        next
    else:
        mutualFunds["fund_family"] = mutualFunds["fund_family"].replace(cat, "Other")

In [None]:
mutualFunds["fund_family"].value_counts()

Binning `esg_peer_group`

In [None]:
#finding threshhold to bin esg_peer_group
esg_peer_type_count = mutualFunds["esg_peer_group"].value_counts()[mutualFunds["esg_peer_group"].value_counts() > 100]
print(100* esg_peer_type_count.sum()/len(mutualFunds))
print(esg_peer_type_count.count())

In [None]:
#check the bins of esg_peer_group
esg_peer_type_count.index.to_list()
#replace bins in dataframe for esg_peer_group
for esg in mutualFunds["esg_peer_group"]:
    if esg in esg_peer_type_count.index.to_list():
        next
    else:
        mutualFunds["esg_peer_group"] = mutualFunds["esg_peer_group"].replace(esg, "Other")

In [None]:
mutualFunds["esg_peer_group"].value_counts()

In [None]:
mutualFunds

In [None]:
#move the cleaned and binned mutualFunds CSV to a new CSV
import os

# Create the directory if it doesn't exist
output_directory = "Resources/Cleaned"
os.makedirs(output_directory, exist_ok=True)

# Now you can save the CSV file
binned_mutual_funds = mutualFunds.copy()
output_file_path = os.path.join(output_directory, "binned_mutual_funds.csv")
binned_mutual_funds.to_csv(output_file_path, header=True, index=True)

# Read In CSV data for Mutual Fund prices A-Z


In [None]:
#Read in CSV data 
#MutualFund prices A-E
df_AE= pd.read_csv(
    Path("Resources/MutualFund Prices - A-E.csv")
)
df_AE.head()

#set index
df_AE.set_index("fund_symbol", inplace=True)
df_AE["price_date"]=pd.to_datetime(df_AE['price_date'])

In [None]:
#Read in CSV data 
#MutualFund prices F-K
df_FK= pd.read_csv(
    Path("Resources/MutualFund Prices - F-K.csv")
)
df_FK.head()

#set index
df_FK.set_index("fund_symbol", inplace=True)
df_FK["price_date"]=pd.to_datetime(df_FK['price_date'])

In [None]:
#Read in CSV data 
#MutualFund prices L-P
df_LP= pd.read_csv(
    Path("Resources/MutualFund Prices - L-P.csv")
)
df_LP.head()

#set index
df_LP.set_index("fund_symbol", inplace=True)
df_LP["price_date"]=pd.to_datetime(df_LP['price_date'])

In [None]:
#Read in CSV data 
#MutualFund prices Q-Z
df_QZ= pd.read_csv(
    Path("Resources/MutualFund Prices - Q-Z.csv")
)
df_QZ.head()

#set index
df_QZ.set_index("fund_symbol", inplace=True)
df_QZ["price_date"]=pd.to_datetime(df_QZ['price_date'])

In [None]:
# #Concatonate dataframes
mutual_fund_df= pd.concat([df_AE,df_FK,df_LP,df_QZ])
mutual_fund_df.shape 

In [None]:
#check to see if price_date datatype has successfully changed
mutual_fund_df.dtypes

In [None]:
# Make a copy of the cleaned and concatonated mutual_fund_df DataFrame
az_mutual_funds = mutual_fund_df.copy()

# Define the new file name and path where you want to save the CSV file
output_file_path_2 = "Resources/Cleaned/az_mutual_funds.csv"

# Write the DataFrame to CSV
az_mutual_funds.to_csv(output_file_path_2, header=True, index=False)

# Merge mutualFunds and mutual_fund_df on index= "fund_symbol"

In [None]:
import findspark
findspark.init()
# Import packages
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType
from pyspark import SparkFiles

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()


In [None]:
# Define the file paths for the cleaned CSV files
az_mutual_funds_file_path = "Resources/Cleaned/az_mutual_funds.csv"
binned_mutual_funds_file_path = "Resources/Cleaned/binned_mutual_funds.csv"

# Read the CSV files into Spark DataFrames
az_mutual_funds_df = spark.read.csv(az_mutual_funds_file_path, header=True, inferSchema=True)
binned_mutual_funds_df = spark.read.csv(binned_mutual_funds_file_path, header=True, inferSchema=True)

# Show the DataFrames
az_mutual_funds_df.show()
binned_mutual_funds_df.show()

In [None]:
# Save DataFrame in Parquet format with compression and overwrite existing files
az_mutual_funds_df.write.parquet("path/to/az_mutual_funds_parquet", mode="overwrite")
binned_mutual_funds_df.write.parquet("path/to/binned_mutual_funds_parquet", mode="overwrite")


In [None]:
# Read Parquet files into Spark DataFrame
az_mutual_funds_df = spark.read.parquet("path/to/az_mutual_funds_parquet")
binned_mutual_funds_df = spark.read.parquet("path/to/binned_mutual_funds_parquet")


In [None]:
# Register DataFrames as temporary views
az_mutual_funds_df.createOrReplaceTempView("az_mutual_funds")
binned_mutual_funds_df.createOrReplaceTempView("binned_mutual_funds")

In [None]:
# Perform SQL merge 
merged_df = spark.sql("""
    SELECT *
    FROM az_mutual_funds
    INNER JOIN binned_mutual_funds
""")


# Post merging and binning:

In [None]:
# Import Dependencies
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [None]:
# Convert Pandas DataFrame to list of lists
data = mutualFunds.values.tolist()

# Get the column names from the Pandas DataFrame
columns = mutualFunds.columns.tolist()

In [None]:
# Define the schema for the Spark DataFrame
schema = StructType([
    StructField(name, StringType(), nullable=True)  # Adjust StringType() as needed
    for name in columns
])

# Create the Spark DataFrame
mutualFunds_spark = spark.createDataFrame(data, schema=schema)

In [None]:
# Select the target column 'year_to_date_return'
y = mutualFunds["year_to_date_return"]

# Select all columns except 'year_to_date_return' as features
X = mutualFunds.drop(columns=["year_to_date_return"])



# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Identify numeric, categorical, and datetime columns
numeric_cols = [col for col, dtype in X_train.dtypes.items() if dtype in ['int', 'float']]
categorical_cols = [col for col, dtype in X_train.dtypes.items() if dtype == 'object']
# Assuming datetime columns are identified by a specific string pattern in the column name
datetime_cols = [col for col in X_train.columns if 'date' in col.lower()]


In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer

# Identify numeric, categorical, and datetime columns
numeric_cols = X_train.select_dtypes(include=['number']).columns
categorical_cols = X_train.select_dtypes(include=['object']).columns
datetime_cols = X_train.select_dtypes(include=['datetime']).columns

# Create transformers for numeric and categorical features
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

# Preprocess numeric and categorical features
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_cols),
        ('cat', categorical_transformer, categorical_cols)
    ],
    remainder='passthrough'
)

In [None]:
from sklearn.preprocessing import FunctionTransformer

# Define a function to convert datetime columns to timestamps
def datetime_to_timestamp(X):
    for col in datetime_cols:
        X[col] = X[col].apply(lambda x: x.timestamp())
    return X

# Create a transformer to apply the conversion
datetime_transformer = FunctionTransformer(datetime_to_timestamp)

# Modify the preprocessor to include the datetime transformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_cols),
        ('cat', categorical_transformer, categorical_cols),
        ('datetime', datetime_transformer, datetime_cols)
    ],
    remainder='passthrough'
)


In [None]:
# Fit and transform the preprocessor on the training data
X_train_preprocessed = preprocessor.fit_transform(X_train)

# Transform the test data using the fitted preprocessor
X_test_preprocessed = preprocessor.transform(X_test)

## Building the Model

In [None]:
# Define the model architecture
number_input_features = X_train_preprocessed.shape[1]
hidden_nodes_layer1 = 64
hidden_nodes_layer2 = 32

nn = tf.keras.models.Sequential([
    tf.keras.layers.Dense(units=hidden_nodes_layer1, input_dim=number_input_features, activation="relu"),
    tf.keras.layers.BatchNormalization(),  # Add batch normalization
    tf.keras.layers.Dropout(0.2),  # Add dropout layer for regularization
    tf.keras.layers.Dense(units=hidden_nodes_layer2, activation="relu"),
    tf.keras.layers.Dense(units=1, activation="sigmoid")
])



### Compile, Train, Evaluate the Model

In [None]:

# Compile the model
nn.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

# Print model summary
nn.summary()

In [None]:
# Train the model
history = nn.fit(X_train_preprocessed, y_train, epochs=50, batch_size=32, validation_data=(X_test_preprocessed, y_test))

In [None]:
# Evaluate the model
model_loss, model_accuracy = nn.evaluate(X_test_preprocessed, y_test, verbose=2)
print(f"Loss: {model_loss}, Accuracy: {model_accuracy}")
