In [8]:
import sys
import pandas as pd
from datetime import datetime
from tqdm.notebook import tqdm
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

In [9]:
# Initialize Spark session
spark = SparkSession.builder.appName("OHLCV Analysis").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)


In [10]:
# Location of bucket that contains market data parquet files
data_path = "gs://rzbk-stockdata/"

# Read prices parquet file into a DataFrame (OHLCV market data)
parquet_file_path = f"{data_path}sharadar.sep.parquet"
df = spark.read.parquet(parquet_file_path)

# Read ticker metadata file into a Dataframe (company information)
parquet_file_path = f"{data_path}sharadar.tickers.parquet"
df_t = spark.read.parquet(parquet_file_path)
df_t = df_t.withColumnRenamed('ticker', 'ticker2')
for col in [column for column in df_t.columns if column not in ['exchange', ',currency', 'category', 'sector', 'industry', 'ticker2']]:
    df_t = df_t.drop(col)
    

In [11]:
# Merge dataframes and drop right-hand join col
df = df.alias("df").join(
    df_t.alias("df2"), 
    (F.col("df.ticker") == F.col("df2.ticker2")), 
    how="inner"
)
df = df.drop("ticker2")


In [12]:
# Define windows for processing
win = Window.partitionBy("ticker").orderBy("date")
win2 = Window.partitionBy("ticker").orderBy("date").rowsBetween(1, 25)


In [13]:
# Define the RSI calculation function for adding RSI cols
def add_rsi(df, price_column, win, period=14):
    # Calculate price difference
    price_diff = F.col(price_column) - F.lag(F.col(price_column), 1).over(win)

    # Calculate gains and losses
    gains = F.when(price_diff > 0, price_diff).otherwise(0)
    losses = F.when(price_diff < 0, -price_diff).otherwise(0)

    # Calculate average gains and average losses over the specified period
    win_mod = win.rowsBetween(-period, -1)
    avg_gains = F.avg(gains).over(win_mod)
    avg_losses = F.avg(losses).over(win_mod)

    # Calculate relative strength (RS)
    rs = avg_gains / avg_losses

    # Calculate the RSI
    rsi = F.when(avg_losses != 0, 100 - (100 / (1 + rs))).otherwise(100)

    # Add the RSI column to the DataFrame
    result_df = df.withColumn(f"rsi{period}_rel", rsi / 100)

    return result_df


# Calculate SMA for price or volume average cols
def add_sma(df, column, win, num_days, add_indicator=False):
    window_spec = win.rowsBetween((-1*num_days), 0)
    if add_indicator:
        df = df.withColumn(f'sma_{column}_{num_days}', F.avg(F.col(column)).over(window_spec))
    return df.withColumn(f'sma_{column}_{num_days}_rel', (F.col(column) - F.avg(F.col(column)).over(window_spec)) / F.avg(F.col(column)).over(window_spec))


# Calculate volatility by calculating absolute percentage diff between low and high
def add_price_range(df, win):
    return df.withColumn('price_range', (F.col('high') - F.col('low')) / F.col('low'))    


# Add indicators to sep data
df = add_rsi(df, "close", win, 14)
df = add_rsi(df, "close", win, 28)
df = add_sma(df, "close", win, 5)
df = add_sma(df, "close", win, 10)
df = add_sma(df, "close", win, 50, add_indicator=True)
df = add_sma(df, "close", win, 80)
df = add_sma(df, "close", win, 200, add_indicator=True)
df = add_sma(df, "volume", win, 10)
df = add_sma(df, "volume", win, 50)
df = add_sma(df, "volume", win, 100)
df = add_price_range(df, win)
df = add_sma(df, "price_range", win, 5, add_indicator=True)
df = add_sma(df, "price_range", win, 30, add_indicator=True)
             

In [14]:
# Add additional trend columns on moving averages for closing price
df = df.withColumn("sma_close_50_trend", (F.col("sma_close_50") - F.lag("sma_close_50").over(win)) / F.lag("sma_close_50").over(win) * 100)
df = df.withColumn("sma_close_200_trend", (F.col("sma_close_200") - F.lag("sma_close_200").over(win)) / F.lag("sma_close_200").over(win) * 100)


In [15]:
# Define entry point as when the smoothed-out price dips 
df = df.withColumn("Buy", (F.col("low") <= 0.85 * F.col("sma_close_50")).cast("int"))
df = df.withColumn("Result", (
        (F.col("Buy") == True )
        & (F.max('high').over(win2) >= F.col("close") * 1.15)
        & (F.min('low').over(win2) >= F.col("close") * 0.15)
    ).cast("int")
)


In [19]:
# Filter out irrelevant data (most data)
df = df.filter(F.col("Buy") == 1)


In [None]:
# Save data back to parquet file as a checkpoint
df.write.parquet(f"{data_path}sma_dip_strat.parquet", mode="overwrite")