## Building the dataset that will be input into the model

In [1]:
import findspark
import pandas as pd
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkConf

# for shared metastore (shared across all users)
spark = SparkSession.builder.appName("Building dataset").config("hive.metastore.uris", "thrift://bialobog:9083", conf=SparkConf()).getOrCreate() \

# for local metastore (your private, invidivual database) add the following config to spark session
spark.sql("USE 2023_04_01")

DataFrame[]

In [6]:
import pyspark.pandas as ps
from pyspark.sql.functions import lit,col
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import xgboost as xgb
#from boruta import BorutaPy
#from fredapi import Fred
from sklearn.linear_model import Lasso
from sklearn.model_selection import TimeSeriesSplit
import csv
from pyspark.sql import functions as F
from functools import reduce
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel


def get_macro_features():
    # fred_key = 'bdfdde3b7a21b7d528011d17996b0b8e'
    # fred = Fred(api_key=fred_key)
    # cpi = fred.get_series(series_id='CPIAUCSL')
    # cpi_change = cpi.pct_change()
    # unemp = fred.get_series(series_id='UNRATE')
    # gdp = fred.get_series(series_id='GDP')
    # gdp_change = gdp.pct_change()
    # df = pd.DataFrame({'CPI_change': cpi_change,'Unemployment_Rate': unemp,'GDP_change': gdp_change})
    # df.to_csv('macro.csv')
    df = pd.read_csv('macro.csv')
    return df

def get_all_stocks():
    query = f"""SELECT s.ticker_region, sc.fref_listing_exchange FROM sym_ticker_region s 
                LEFT JOIN FF_SEC_COVERAGE c ON c.fsym_id = s.fsym_id
                LEFT JOIN sym_coverage sc ON sc.fsym_id = s.fsym_id
                WHERE s.ticker_region LIKE "%-US" AND s.ticker_region NOT LIKE '%.%' AND c.CURRENCY = "USD"
                AND (sc.fref_listing_exchange = "NAS" OR sc.fref_listing_exchange = "NYS")"""
    df = spark.sql(query)
    df = df.withColumn("ticker_region", regexp_replace("ticker_region", "-US$", ""))
    ticker_list = [row.ticker_region for row in df.collect()]
    return ticker_list



def get_non_imp_stocks_query():
    df2 = spark.createDataFrame(get_implosion_df('imploded_stocks.csv'))
    df2.createOrReplaceTempView("imp_table")
    query = f"""SELECT s.ticker_region, s.fsym_id FROM sym_ticker_region s 
                LEFT JOIN FF_SEC_COVERAGE c ON c.fsym_id = s.fsym_id
                LEFT JOIN sym_coverage sc ON sc.fsym_id = s.fsym_id
                WHERE s.ticker_region LIKE "%-US" AND s.ticker_region NOT LIKE '%.%' AND c.CURRENCY = "USD"
                AND (sc.fref_listing_exchange = "NAS" OR sc.fref_listing_exchange = "NYS")
                AND NOT EXISTS (
                SELECT 1
                FROM imp_table
                WHERE s.ticker_region = CONCAT(imp_table.Ticker, '-US') )    
                """
    df = spark.sql(query)
    print("got non imploded stocks")
    return df


def get_implosion_df(filename):
    df = pd.read_csv(filename, index_col=False)
    df = df[df['Implosion_Start_Date'].notnull()]
    df['Implosion_Date'] = pd.to_datetime(df['Implosion_Start_Date'])
    return df

def get_non_implosion_df(filename):
    df = pd.read_csv(filename, index_col=False)
    df = df[df['Implosion_Start_Date'].isnull()]
    return df

def get_features_for_imploded_stocks(df, big_string, table):
    df=spark.createDataFrame(df)
    df.createOrReplaceTempView("temp_table")
    # query = """SELECT t.Ticker, t.Implosion_Date, t.Implosion_Next_Year, a.date, a.ff_gross_inc, b.date, b.ff_gross_inc, c.date, c.ff_gross_inc
    #             FROM temp_table t 
    #             LEFT JOIN sym_ticker_region s ON s.ticker_region = CONCAT(t.Ticker, '-US')
    #             LEFT JOIN FF_BASIC_AF a ON s.fsym_id = a.fsym_id AND YEAR(a.date) = YEAR(t.Implosion_Date)-1
    #             LEFT JOIN FF_BASIC_AF b ON s.fsym_id = b.fsym_id AND YEAR(b.date) = YEAR(t.Implosion_Date)-2
    #             LEFT JOIN FF_BASIC_AF c ON s.fsym_id = c.fsym_id AND YEAR(c.date) = YEAR(t.Implosion_Date)-3
    #             ORDER BY t.Ticker, a.date
    # """
    query = f"""SELECT t.ticker_region, a.date, {big_string}, t.Implosion_Next_Year FROM temp_table t
                    LEFT JOIN sym_ticker_region s ON s.ticker_region = t.ticker_region
                    LEFT JOIN {table} a ON a.fsym_id = s.fsym_id AND t.Year = YEAR(a.date)
                    LEFT JOIN FF_BASIC_AF b ON b.fsym_id = s.fsym_id AND YEAR(b.date) = t.Year
                    ORDER BY t.ticker_region, a.date
    """
    df2 = spark.sql(query)
    print("imploded query done")
    return df2
    
    
def get_features_for_non_imploded(metric_string, metric_string2,table):
    df = spark.createDataFrame(get_non_implosion_df('imploded_stocks3.csv'))
    df.createOrReplaceTempView("temp_table")
    query = f"""WITH RankedData AS (
    SELECT
        t.ticker_region, s.fsym_id,
        a.date,
        {metric_string},
        ROW_NUMBER() OVER (PARTITION BY t.ticker_region ORDER BY a.date DESC) AS row_num
        FROM temp_table t
        LEFT JOIN sym_ticker_region s ON s.ticker_region = t.ticker_region
        LEFT JOIN {table} a ON a.fsym_id = s.fsym_id 
        WHERE a.date < (
            SELECT MAX(date)
            FROM {table} a_sub
            WHERE a_sub.fsym_id = s.fsym_id ))
    SELECT
        r.ticker_region, r.date, {metric_string2}
        FROM RankedData r
        WHERE row_num <= 4
        ORDER BY ticker_region, date"""
    new_df = spark.sql(query)
    print("non imploded query done")
    return new_df


def create_non_imploded_ds(table):
    #df_metrics = ps.DataFrame(spark.sql(f"SELECT * FROM {table} LIMIT 10")) #get all the metrics
    # cols = []
    # for c in df_metrics.columns:
    #     if df_metrics[c].dtype=='float64':#get all the metrics we can calculate correlations with
    #         cols.append(c)
    cols = ['ff_debt_entrpr_val', 'ff_tot_debt_tcap_std', 'ff_fix_assets_com_eq', 'ff_debt_eq', 'ff_inven_curr_assets', 'ff_liabs_lease', 'ff_ltd_tcap', 'ff_sales_wkcap',
           'ff_bps_gr', 'ff_oper_inc_tcap', 'ff_assets_gr', 'ff_fcf_yld', 'ff_mkt_val_gr', 'ff_earn_yld', 'ff_pbk_tang', 'ff_zscore', 'ff_entrpr_val_sales', 'ff_psales_dil', 'ff_roea', 'ff_dps_gr',
           'ff_loan_loss_pct', 'ff_loan_loss_actual_rsrv'] #advanced_der_qf
    
    metric_string = ', '.join('a.' + item for item in cols)
    metric_string2 = ', '.join('r.' + item for item in cols)
    df = get_features_for_non_imploded(metric_string, metric_string2, table)
    df = df.withColumn("Implosion_Next_Year", lit(0))
    return df

def create_imploded_df(table):
    df = get_implosion_df('imploded_stocks3.csv')
    df = df.drop(df.columns[0], axis=1)
    df['Implosion_Year'] = df['Implosion_Date'].dt.year-1
    df['Implosion_Next_Year'] = 1
    
    additional_rows_1 = df.copy()
    additional_rows_1['Implosion_Year'] = df['Implosion_Year'] - 1
    additional_rows_1['Implosion_Next_Year'] = 0
    additional_rows_2 = df.copy()
    additional_rows_2['Implosion_Year'] = df['Implosion_Year'] - 2
    additional_rows_2['Implosion_Next_Year'] = 0
    additional_rows_3 = df.copy()
    additional_rows_3['Implosion_Year'] = df['Implosion_Year'] - 3
    additional_rows_3['Implosion_Next_Year'] = 0
    df = pd.concat([df, additional_rows_1, additional_rows_2, additional_rows_3])
    df = df.sort_values(by=['ticker_region', 'Implosion_Year'])
    df = df.reset_index(drop=True)
    df =df.rename({'Implosion_Year' : 'Year'},axis=1)
    
    # df_metrics = ps.DataFrame(spark.sql(f"SELECT * FROM {table} LIMIT 10")) #get all the metrics
    # cols = []
    # for c in df_metrics.columns:
    #     if df_metrics[c].dtype=='float64':#get all the metrics we can calculate correlations with
    #         cols.append(c)
    
    cols = ['ff_debt_entrpr_val', 'ff_tot_debt_tcap_std', 'ff_fix_assets_com_eq', 'ff_debt_eq', 'ff_inven_curr_assets', 'ff_liabs_lease', 'ff_ltd_tcap', 'ff_sales_wkcap',
           'ff_bps_gr', 'ff_oper_inc_tcap', 'ff_assets_gr', 'ff_fcf_yld', 'ff_mkt_val_gr', 'ff_earn_yld', 'ff_pbk_tang', 'ff_zscore', 'ff_entrpr_val_sales', 'ff_psales_dil', 'ff_roea', 'ff_dps_gr',
           'ff_loan_loss_pct', 'ff_loan_loss_actual_rsrv']
    
    metric_string = ', '.join('a.' + item for item in cols)
    df = get_features_for_imploded_stocks(df, metric_string, table)
    
    return df


    
def create_dataset(table):
    # df = get_implosion_df('imploded_stocks.csv')
    # df = df.drop(df.columns[0], axis=1)
    # df['Implosion_Year'] = df['Implosion_Date'].dt.year
    # df['Implosion_Next_Year'] = 1
    # get_features_for_imploded_stocks(df)
    #print(df.head())
    #df=spark.createDataFrame(df)
    #df.createOrReplaceTempView("temp_table")
    
    imp_df = create_imploded_df(table).toPandas()
    non_imp_df =create_non_imploded_ds(table).toPandas()
    result_df = pd.concat([non_imp_df,imp_df], ignore_index=True)
    #print(result_df.head())
    result_df['date'] = pd.to_datetime(result_df['date'], format='%Y-%m-%d')
    result_df=result_df.sort_values(by=['ticker_region','date'])
    macro_df = get_macro_features().reset_index()
    macro_df['Date'] = pd.to_datetime(macro_df['Date'], format='%d/%m/%Y')
    #print(macro_df.head())
    result_df['month_year'] = result_df['date'].dt.to_period("M")
    macro_df['Month_year'] = macro_df['Date'].dt.to_period("M")
    result_df = pd.merge(result_df, macro_df, left_on='month_year', right_on='Month_year', how='left')
    result_df.drop(['Date', 'index', 'month_year','Month_year','GDP'],axis=1,inplace=True)
    
    print(result_df.head())
    
    null_pcts = result_df.isnull().sum()/len(result_df)
    print(null_pcts)
    
    cols_to_drop = null_pcts[null_pcts > 0.1].index.tolist()
    result_df.drop(cols_to_drop,axis=1,inplace=True)
    print("dropped cols: ", cols_to_drop)
    
    result_df=pd.DataFrame(result_df)
    print("before dropping nulls: ",len(result_df))
    result_df = result_df.dropna()
    print("after dropping nulls: ", len(result_df))
    print("number of implosions: ", len(result_df[result_df['Implosion_Next_Year']==1]))
    print("number of non-implosions: ", len(result_df[result_df['Implosion_Next_Year']==0]))
    result_df.to_csv('Advanced_AF_DER_Dataset.csv', index=False)
    print("dataset written")
    
def get_feature_col_names():
    csv_file_path = 'features.csv'
    data_list = []
    with open(csv_file_path, mode='r') as file:
    # Create a CSV reader object
        reader = csv.reader(file)
        for row in reader:
            data_list.append(row)
    col_list = data_list[0]
    return col_list
    
    
def get_features_all_stocks():
    table = "FF_ADVANCED_DER_AF"
    df = pd.read_csv('imploded_stocks_price.csv', index_col=False)
    spark_df = spark.createDataFrame(df)
    spark_df.createOrReplaceTempView("temp_table")
    col_names = get_feature_col_names()
    col_string = ', '.join('a.' + item for item in col_names)
    q=f"""SELECT t.fsym_id, a.date, {col_string}
                FROM temp_table t
                LEFT JOIN {table} a ON t.fsym_id = a.fsym_id
                WHERE a.date >= "2000-01-01"
                ORDER BY t.fsym_id, a.date"""
    features_df = spark.sql(q)
    feature_cols = [col for col in features_df.columns if col not in ['fsym_id', 'date']]
    sequences = []
    

    grouped_df = features_df.groupBy("fsym_id").agg(
        *[F.collect_list(col).alias(col) for col in feature_cols])

    #feature_cols = [col for col in grouped_df.columns if col not in ['fsym_id', 'date']]
    grouped_df_padded = grouped_df.select(
        "fsym_id",
        *[F.expr(f"IF(size({col}) < 23, concat({col}, array_repeat(0, 23 - size({col}))), {col})").alias(col) for col in feature_cols])

    
    spark_df = spark_df.withColumn('label', F.when(F.isnan('Implosion_Start_Date'), 0).otherwise(1))
    joined_df = grouped_df_padded.join(spark_df.select("fsym_id", "label"), "fsym_id", "inner")
    joined_df=joined_df.orderBy('fsym_id')

    
    return joined_df

def train_test(df):
    df = df.filter(
        reduce(lambda acc, column: acc & (F.size(col(column)) == 23), df.columns[1:-1], lit(True))
        )
    # print(filtered_df.count())

    # average_lengths = df.agg(*[(F.avg(F.size(col(column))).alias(f'avg_length_{column}')) for column in df.columns[1:-1]])
    
    # test = padded_df.select('ff_non_oper_exp').filter(col('fsym_id')=='RTTY5P-R').collect()[0]
    # print(test['ff_non_oper_exp'], len(test['ff_non_oper_exp']))

    #need to decide whether to only include stocks that started from 2000, or include just from e.g. 2019
    #temporary measure - replace with 0
    #try imputer?
    #look into masking
    features = df.columns[1:-1]
    list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
    for f in features:
        df = df.withColumn(f, list_to_vector_udf(f))
    df.show(2)
    vector_assembler = VectorAssembler(inputCols=features, outputCol="features_vector")
    df_assembled = vector_assembler.transform(df)
    
    lr = LogisticRegression(featuresCol="features_vector", labelCol="label")

    # pipeline = Pipeline(stages=[vector_assembler, lr])
    
    paramGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.01, 0.1]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
        .addGrid(lr.maxIter, [10, 20]) \
        .build()

    evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

    crossval = CrossValidator(estimator=lr,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=5) 

    cvModel = crossval.fit(df_assembled)

    avg_metrics = cvModel.avgMetrics

    for i, acc in enumerate(avg_metrics):
        print(f"Fold {i + 1} - Validation Accuracy: {acc}")

    best_model = cvModel.bestModel

    predictions = best_model.transform(df_assembled)
    predictions.select('fsym_id', 'label', 'prediction').show(100)
    tp = predictions.filter((predictions.label == 1) & (predictions.prediction == 1)).count()

    # Calculate True Negatives (TN)
    tn = predictions.filter((predictions.label == 0) & (predictions.prediction == 0)).count()

    # Calculate False Positives (FP)
    fp = predictions.filter((predictions.label == 0) & (predictions.prediction == 1)).count()

    # Calculate False Negatives (FN)
    fn = predictions.filter((predictions.label == 1) & (predictions.prediction == 0)).count()

    # Print the results
    print(f"True Positives: {tp}")
    print(f"True Negatives: {tn}")
    print(f"False Positives: {fp}")
    print(f"False Negatives: {fn}")

    # You can also calculate precision, recall, and F1-score if needed
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1_score = 2 * (precision * recall) / (precision + recall)

    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1_score}")

#     # Filter and display predictions for label 1
#     predictions.filter(predictions['label'] == 1).select("fsym_id", "label", "prediction").show()
# from sklearn.model_selection import train_test_split, GridSearchCV
# from sklearn.linear_model import LogisticRegression
# from sklearn.metrics import roc_auc_score

def test_pandas(df1):
    df1 = df1.toPandas()
    exclude_columns = ['fsym_id', 'label']
    df = df1[df1.loc[:, ~df1.columns.isin(exclude_columns)].apply(lambda row: all(len(cell) == 23 for cell in row), axis=1)]
    print(len(df))
    X = df.drop(exclude_columns, axis=1)
    y = df['label']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    lr = LogisticRegression()

    param_grid = {
        'penalty': ['l1', 'l2'],
        'C': [0.01, 0.1, 1.0, 10.0],
        'max_iter': [100, 200, 300]
    }

    grid_search = GridSearchCV(lr, param_grid, cv=5, scoring='roc_auc')

    grid_search.fit(X_train, y_train)
    
    best_model = grid_search.best_estimator_

    predictions = best_model.predict(X_test)

    auc = roc_auc_score(y_test, predictions)

    print(f"Area under the ROC curve (AUC): {auc}")
    print("Best model hyperparameters:")
    print(grid_search.best_params_)
    
    #extra_df= df[df.apply(lambda row: any(len(cell) != 23 for cell in row), axis=1)]
    
    
    
    
#create_dataset('FF_ADVANCED_DER_AF')
df = get_features_all_stocks()
train_test(df)
#test_pandas(df)

+--------+--------------------+----------------------+--------------------+----------------------+--------------------+-----------------------+--------------------------+--------------------+--------------------+--------------------------+--------------------+-------------------------+-----------------------+--------------------+-----+
| fsym_id|     ff_non_oper_exp|ff_net_inc_bef_xord_gr|      ff_oper_inc_gr|ff_ut_non_oper_inc_oth|         ff_earn_yld|ff_net_inc_dil_aft_xord|ff_net_inc_basic_beft_xord|        ff_assets_gr|          ff_fcf_yld|ff_net_inc_dil_bef_unusual|            ff_wkcap|ff_net_inc_basic_aft_xord|ff_oper_inc_aft_unusual|      ff_net_inc_dil|label|
+--------+--------------------+----------------------+--------------------+----------------------+--------------------+-----------------------+--------------------------+--------------------+--------------------+--------------------------+--------------------+-------------------------+-----------------------+--------------

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense

model = Sequential()
model.add(SimpleRNN(units=32, input_shape=(23, input_features)))
model.add(Dense(units=output_features))



In [None]:
spark.sql("SELECT MIN(a.p_date) FROM fp_basic_prices a LEFT JOIN sym_ticker_region s ON s.fsym_id = a.fsym_id WHERE s.ticker_region = 'AACQU-US' ").show()