In [1]:
%python
import pandas as pd
pd.core.common.is_list_like = pd.api.types.is_list_like #datareader problem probably fixed in next version of datareader
from pandas_datareader import data as pdr
import datetime
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as sf
from pyspark.sql.window import Window
import numpy as np
import fix_yahoo_finance as yf
from math import log

In [2]:
%python
yf.pdr_override() # <== that's all it takes :-)


start_date=datetime.date(1980, 12, 12)
end_date= datetime.date(2018, 1, 1)

stock_list = ["AAPL"]

stock_str = ""
for i in range(len(stock_list)):
    stock_str  = stock_str + stock_list[i] + "."

main_df = pd.DataFrame()

for stock in range(len(stock_list)):
     df = pdr.get_data_yahoo(stock_list[stock], start=start_date, end=end_date)
     #df.drop(['Close','High', 'Low' , 'Open', 'Volume'], axis=1, inplace=True)
     df.rename(columns={'Adj Close': 'Adj_close'}, inplace=True)
     if main_df.empty:
         main_df = df
     else:
        main_df = main_df.join(df) 
           
#main_df["Date"]=main_df.index
main_df.reset_index(level=0, inplace=True)

In [3]:
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
spark = SparkSession(sc)

In [4]:
df = spark.createDataFrame(df)
df = df.withColumn("ticker", sf.lit("AAPL"))
df.show(10)

In [5]:
w = Window().partitionBy("ticker").orderBy("date")
df = df.withColumn("Log_Adj_Close", sf.log("Adj_close"))

#Returns
df = df.withColumn("log_return", -1* (sf.col("Log_Adj_Close") - sf.lag("Log_Adj_Close", 1).over(w)) / sf.lag("Log_Adj_Close", 5).over(w))
df = df.withColumn("weekly_log_return", -1* (sf.col("Log_Adj_Close") - sf.lag("Log_Adj_Close", 5).over(w)) / sf.lag("Log_Adj_Close", 10).over(w))
df = df.withColumn("biweekly_log_return", -1* (sf.col("Log_Adj_Close") - sf.lag("Log_Adj_Close", 10).over(w)) / sf.lag("Log_Adj_Close", 21).over(w))
df = df.withColumn("monthly_log_return", -1* (sf.col("Log_Adj_Close") - sf.lag("Log_Adj_Close", 21).over(w)) / sf.lag("Log_Adj_Close", 42).over(w))
df = df.withColumn("bimonthly_log_return", -1* (sf.col("Log_Adj_Close") - sf.lag("Log_Adj_Close", 42).over(w)) / sf.lag("Log_Adj_Close", 126).over(w))
df = df.withColumn("annual_log_return", -1* (sf.col("Log_Adj_Close") - sf.lag("Log_Adj_Close", 252).over(w)) / sf.lag("Log_Adj_Close", 252).over(w))

#Volume
df = df.withColumn("daily_volume_diff", (sf.col("Volume") - sf.lag("Volume", 1).over(w))) 
df = df.withColumn("weekly_volume_diff", (sf.col("Volume") - sf.lag("Volume", 5).over(w))) 
df = df.withColumn("biweekly_volume_diff", (sf.col("Volume") - sf.lag("Volume", 10).over(w))) 
df = df.withColumn("monthly_volume_diff", (sf.col("Volume") - sf.lag("Volume", 21).over(w))) 
df = df.withColumn("bimonthly_volume_diff", (sf.col("Volume") - sf.lag("Volume", 42).over(w))) 
df = df.withColumn("annual_volume_diff", -1* (sf.col("Volume") - sf.lag("Volume", 252).over(w))) 

#Everything else
w = Window.orderBy('Date').rowsBetween(-5, 0)
df = df.withColumn("weekly_mean", sf.avg('log_return').over(w))
df = df.withColumn("weekly_std", sf.stddev('log_return').over(w))
df = df.withColumn("weekly_volume_mean", sf.avg('Volume').over(w))
df = df.withColumn("weekly_volume_std", sf.stddev('Volume').over(w))

w = Window.orderBy('Date').rowsBetween(-10, 0)
df = df.withColumn("biweekly_mean", sf.avg('log_return').over(w))
df = df.withColumn("biweekly_std", sf.stddev('log_return').over(w))
df = df.withColumn("biweekly_volume_mean", sf.avg('Volume').over(w))
df = df.withColumn("biweekly_volume_std", sf.stddev('Volume').over(w))

w = Window.orderBy('Date').rowsBetween(-21, 0)
df = df.withColumn("monthly_mean", sf.avg('log_return').over(w))
df = df.withColumn("monthly_std", sf.stddev('log_return').over(w))
df = df.withColumn("monthly_volume_mean", sf.avg('Volume').over(w))
df = df.withColumn("monthly_volume_std", sf.stddev('Volume').over(w))

w = Window.orderBy('Date').rowsBetween(-42, 0)
df = df.withColumn("bimonthly_mean", sf.avg('log_return').over(w))
df = df.withColumn("bimonthly_std", sf.stddev('log_return').over(w))
df = df.withColumn("bimonthly_volume_mean", sf.avg('Volume').over(w))
df = df.withColumn("bimonthly_volume_std", sf.stddev('Volume').over(w))

w = Window.orderBy('Date').rowsBetween(-126, 0)
df = df.withColumn("semiannual_mean", sf.avg('log_return').over(w))
df = df.withColumn("semiannual_std", sf.stddev('log_return').over(w))
df = df.withColumn("semiannual_volume_mean", sf.avg('Volume').over(w))
df = df.withColumn("semiannual_volume_std", sf.stddev('Volume').over(w))

w = Window.orderBy('Date').rowsBetween(-252, 0)
df = df.withColumn("annual_mean", sf.avg('log_return').over(w))
df = df.withColumn("annual_std", sf.stddev('log_return').over(w))
df = df.withColumn("annual_volume_mean", sf.avg('Volume').over(w))
df = df.withColumn("annual_volume_std", sf.stddev('Volume').over(w))

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df2 = df.select('Date', date_format('Date', 'u').alias('dow'), date_format('Date', 'd').alias('day'), date_format('Date', 'D').alias('doy'), date_format('Date', 'W').alias('wom'), date_format('Date', 'L').alias('moy'), date_format('Date', 'w').alias('woy')) #, date_format('Date', 'E').alias('day'))
df3=df2.orderBy("Date")


indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(df3.columns)-set(["Date"]))]
one_hot = [OneHotEncoder(inputCol= column +"_index", outputCol= column + "_one_hot") for column in list(set(df3.columns)-set(["Date"]))]

pipeline = Pipeline(stages=indexers +one_hot )
df_r = pipeline.fit(df3).transform(df3)

df_final = df.join(df_r, df.Date == df_r.Date)
df_final.show()