In [0]:
%pip install -r '/Workspace/Users/chrismarshall.wi@icloud.com/dbxProjects/Alpha Factors/requirements.txt'

In [0]:
import sys 
import pandas as pd
sys.path.append('/Workspace/Users/chrismarshall.wi@icloud.com/dbxProjects/Alpha Factors')
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.getOrCreate()

In [0]:
with open('sql/fact_price_daily_backtest.sql', 'r') as file:
    qry = file.read()

data = spark.sql(qry).toPandas()

In [0]:
from utilities import Factor

In [0]:
data = Factor(data=data)
data.data()

In [0]:
def backtest_mean_reversion(df:pd.DataFrame, moving_average_window:float,start_date:str, end_date:str):
    '''
    Backtest Mean Reversion strategy using a moving average window (typically 200 days) and start/end date
    '''
    #First assert that there are 3 columns named symbol, date_value and close
    required_columns = ['symbol','date_value','close']
    if list(df.columns)!=required_columns:
        raise ValueError(f'The columns must be {required_columns}')

    #Second assert that the date_value column is pd.to_datetime()
    df['date_value'] = pd.to_datetime(df['date_value'])

    #Sort Values before applying moving average
    df = df.sort_values(['symbol','date_value']).copy()

    #Fourth calculate the moving average based on what was passed
    df[f'{moving_average_window}_moving_average'] = df.groupby('symbol')['close'].transform(lambda x: x.rolling(moving_average_window).mean())

    #Compute Previous values
    df['previous_close'] = df.groupby('symbol')['close'].shift(1)
    df[f'previous_{moving_average_window}_moving_average'] = df.groupby('symbol')[f'{moving_average_window}_moving_average'].shift(1)

    #Last Step: filter the dataframe to the start/end dates (add other filters here in the future to create df_filtered)
    df_filtered = df[(df['date_value'] >= start_date) & (df['date_value'] <= end_date)]

    return df_filtered

In [0]:
df=backtest_mean_reversion(df=data,moving_average_window=200,start_date='2010-01-01',end_date='2010-12-31')
df_spark=spark.createDataFrame(df)
df_spark.write.format("delta").mode("overwrite").option("overwriteschema", True).saveAsTable("operations.finance.fact_price_daily_moving_average")