<a href="https://colab.research.google.com/github/QuinnG17/Financial-Programs/blob/main/LINEAR_REGRESSION_MODEL_FITTING.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

import pandas as pd

# Read long signal return information
long_df = spark.read.table("alpha_two.all_date_long_metrics_v2")
long_data = long_df.toPandas()
long_data['Date'] = pd.to_datetime(long_data['Date'])

# Read short signal return information
short_df = spark.read.table("alpha_two.all_date_short_metrics_v2")
short_data = short_df.toPandas()
short_data['Date'] = pd.to_datetime(short_data['Date'])

#read Signals
table_df = spark.read.table("kash.signal_shifted")
signals = table_df.toPandas()
signals['Date'] = pd.to_datetime(signals['Date'])

#Concat data for Target column calculation
all_data = pd.concat([long_data, short_data])
data = pd.merge(all_data, signals, on='Date', how='inner')
data['Signal'] = data['Signal'].replace(2, -1)

data = data.sort_values(by='Date')

# Subtract the next row by the current row within each group
data['NextValue'] = data.groupby('Symbol')['Net_Profit'].shift(-1)
data['Target'] = data['NextValue'] - data['Net_Profit']

print(long_data.columns)
print(data.columns)

#Separate Target Values back out
long_data = long_data.merge(data[['Symbol', 'Date', 'NextValue', 'Signal', 'Target']], on=['Date', 'Symbol'], how='inner')
short_data = short_data.merge(data[['Symbol', 'Date', 'NextValue', 'Signal', 'Target']], on=['Date','Symbol'], how='inner')

print(len(long_data['Date'].unique()))
print(len(short_data['Date'].unique()))







In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import col, current_date, date_sub, lit, when
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import desc
from pyspark.sql.functions import col, date_sub, expr
from pyspark.sql.types import DateType
from datetime import timedelta
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Load your daily data into a DataFrame
data = spark.createDataFrame(long_data)
data = data.withColumn('Date', col('Date').cast(DateType()))
data = data.orderBy('Date')


# Drop multiple columns
columns_to_drop = ['Date', 'Symbol', 'Target', 'NextValue', 'Target']
columns = data.drop(*columns_to_drop)
feature_columns = columns.columns

distinct_dates = data.select("Date").distinct().orderBy("Date")
distinct_dates_list = [row.Date for row in distinct_dates.collect()]

# Create an empty DataFrame with the specified columns
extra_columns = ['Symbol', 'Date', 'Model', 'Direction','Accuracy']

all_columns = extra_columns + feature_columns

weights_df = pd.DataFrame(columns=all_columns)

for date in distinct_dates_list:
    print(date)

    start_date = date - timedelta(days=60)

    if start_date <= distinct_dates_list[0]:
        continue

    # Define the model
    model = LinearRegression(featuresCol='features', labelCol='Target')

    # Fit models for each symbol
    distinct_symbols = data.select('Symbol').distinct().rdd.flatMap(lambda x: x).collect()

    # Define the features column
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")


    for symbol in distinct_symbols:

        # Filter for the last 60 days and specific symbols
        symbol_data = data.filter((col('Date') >= start_date) & (col('Symbol') == symbol))
        symbol_data = symbol_data.na.drop()

        if symbol_data.isEmpty():
            continue

        # Split the data into training and test sets (70% training, 30% test)
        trainData, testData = symbol_data.randomSplit([0.7, 0.3], seed=123)

        trainData = assembler.transform(trainData)
        testData = assembler.transform(testData)

        # Fit model for the symbol
        trained_model = model.fit(trainData)

        # Get the feature weights from the trained model
        weights = trained_model.coefficients

        # Make predictions on a test dataset
        predictions = trained_model.transform(testData)

        # Create a MulticlassClassificationEvaluator
        #evaluator = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="accuracy")

        # Calculate the accuracy
        #accuracy = evaluator.evaluate(predictions)

        # Add a new column 'accuracy' to indicate if both prediction and target are above or below 0
        predictions = predictions.withColumn('accuracy', when((col('target') > 0) & (col('prediction') > 0), 1)
                                                        .when((col('target') < 0) & (col('prediction') < 0), 1)
                                                        .otherwise(0))

        # Calculate the accuracy percentage
        accuracy_percentage = predictions.selectExpr('avg(accuracy) * 100 as accuracy_percentage').first()['accuracy_percentage']

        other_data = [symbol, date, model.__class__.__name__, "Long", accuracy_percentage]

        other_data.extend(weights)

        weights_df = weights_df.append(pd.Series(other_data, index=weights_df.columns), ignore_index=True)

    spark_df = spark.createDataFrame(weights_df)

    # Table name and database name
    table_name = "Linear_Regression_Results"
    database_name = "alpha_two"

    # Check if table exists
    if spark.catalog.tableExists(f"{database_name}.{table_name}"):
        # Table exists, union Spark DataFrame with existing table
        spark_df.unionAll(spark.table(f"{database_name}.{table_name}")).write.mode("append").saveAsTable(f"{database_name}.{table_name}")
    else:
        # Table does not exist, create table from Spark DataFrame
        spark_df.write.saveAsTable(f"{database_name}.{table_name}")


print(weights_df)
