In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName('financial_analysis').getOrCreate()
df = spark.read.csv('full_data.csv', header=True, inferSchema=True)

df = df.withColumn('bar_range', F.floor((df.bar_num - 1) / 10)) # starts from 1
df = df.withColumn('bar_range', df.bar_range.cast("int"))

In [None]:
df.show(5)

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import avg, lag, col
# Calculate average profit per bar_range and trade_id
df_range_avg = df.groupBy("trade_id", "bar_range").agg(avg("profit").alias("avg_profit"))

# Define a window partitioned by trade_id and ordered by bar_range
window = Window.partitionBy('trade_id').orderBy('bar_range')

# Create profit_lag column, which is the lagged cumulative average of avg_profit
df_range_avg = df_range_avg.withColumn('cumulative_avg_profit', avg('avg_profit').over(window))
df_range_avg = df_range_avg.withColumn('profit_lag', lag('cumulative_avg_profit').over(window))

df_range_avg.orderBy('trade_id').show()

In [None]:
# profit_lag = avg of avg_profit of bar_ranges before current
# avg_profit = avg of profit of current bar_range
df_new = df.join(df_range_avg, ['trade_id', 'bar_range'], 'left').fillna(0)
df_new.orderBy(['trade_id', 'bar_range']).show(20)
# df_new.select('trade_id', 'bar_range', 'profit_lag', 'avg_profit', 'profit').orderBy(['trade_id', 'bar_range']).show(200)

In [None]:
import pyspark.sql.functions as F
min_month = df.select(F.month(df.time_stamp)).groupBy().max().collect()[0][0]
max_month = df.select(F.month(df.time_stamp)).groupBy().max().collect()[0][0]

df = df.withColumn('bar_range', F.floor((df.bar_num - 1) / 10)) # starts from 1
df = df.withColumn('bar_range', df.bar_range.cast("int"))

In [None]:
import datetime

start_date = df.agg({"time_stamp": "min"}).collect()[0]["min(time_stamp)"]
end_date = start_date + datetime.timedelta(6*365/12)

In [None]:
df_new.show(5)

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import datetime

mape_list = []

# Define your feature columns
feature_columns = ['bar_range', 'direction', 'var12', 'var13', 'var14', 'var15', 'var16', 
                   'var17', 'var18', 'var23', 'var24', 'var25', 'var26', 'var27', 'var28', 'var34', 'var35', 'var36',
                   'var37', 'var38', 'var45', 'var46', 'var47', 'var48', 'var56', 'var57', 'var58', 'var67',
                   'var68', 'var78', 'profit_lag']

# Initialize the VectorAssembler with the input and output column names
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Read the dataset
df = df_new

# Sort the dataframe by time_stamp
df = df.orderBy('time_stamp')

# Create a month_year column to group data
df = df.withColumn('month_year', F.date_format('time_stamp', 'yyyy-MM'))

# Assemble the features
df = assembler.transform(df)

# Cache the dataframe
df.cache()

# Set up the initial training period
start_date = df.agg({"time_stamp": "min"}).collect()[0]["min(time_stamp)"]
end_date = start_date + datetime.timedelta(6*365/12)

# Initialize the regression evaluator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="profit", metricName="rmse")

# Set up the rolling window
while end_date <= datetime.datetime(2015, 8, 3, 11, 15):
    # Define the training data
    train_df = df.filter((df['time_stamp'] >= start_date) & (df['time_stamp'] < end_date))

    # Fit the model
    lr = LinearRegression(featuresCol='features', labelCol='profit')
    model = lr.fit(train_df)

    # Define the test data (one month after the training period)
    test_date = end_date + datetime.timedelta(365/12)
    test_df = df.filter((df['time_stamp'] >= end_date) & (df['time_stamp'] < test_date))

    # Make predictions
    predictions = model.transform(test_df)

    # Evaluate the model
    mape = predictions.select(F.abs((F.col('profit') - F.col('prediction')) / F.col('profit')).alias('mape')).agg(F.mean('mape')).first()[0]
    mape_list.append(mape)
    print(f"Current training period is {start_date.strftime('%Y-%m')} and {end_date.strftime('%Y-%m')}")
    print(f"MAPE for prediction period {end_date.strftime('%Y-%m')} to {test_date.strftime('%Y-%m')}: {mape}")

    # Shift the training window
    start_date = test_date
    end_date = start_date + datetime.timedelta(6*365/12)

In [None]:
x = [1,2,3]
max(x)

In [None]:
import numpy as np
np.mean(x)