In [0]:
from pyspark.sql import DataFrame, Window
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit, udf, datediff, lead, explode, min, max, avg
import pyspark.sql.functions as f
from typing import List
import datetime

In [0]:
file_location = "/FileStore/tables/kueski/"
file_type = "csv"

infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
customSchema = StructType([StructField('ticker', StringType(), True),
                     StructField('open', DoubleType(), True),
                     StructField('close', DoubleType(), True),
                     StructField('adj_close', DoubleType(), False),
                     StructField('low', DoubleType(), True),
                     StructField('high', DoubleType(), True),
                     StructField('volume', IntegerType(), True),
                     StructField('date', DateType(), True)])

df = spark.read.format(file_type) \
               .option("header", first_row_is_header) \
               .option("sep", delimiter) \
               .schema(customSchema) \
               .load(file_location)
df1.printSchema()
df.count()

In [0]:
df1=df.groupBy("ticker","open","close","adj_close", "low", "high", "volume", "date").count().filter("count > 1")
df1.show(100)

In [0]:
df = df.drop_duplicates()
df.count()

In [0]:
df = df.na.drop()
df.count()

In [0]:
dfAHHPIH = df.select("ticker", "close", "date").where("ticker =='AHH' OR  ticker == 'PIH'").orderBy("date", ascending=True)
print(dfAHHPIH.count())

In [0]:
wma = Window.partitionBy('ticker') \
                 .orderBy("date") \
                 .rowsBetween(-7, 0)

dfAHH = dfAHH.withColumn('7MA', avg("close").over(wma)) 

In [0]:
display(dfAHH.filter(df['ticker'] =='PIH'))

ticker,close,date,7MA
PIH,7.94999980926514,2014-04-01,7.94999980926514
PIH,8.15999984741211,2014-04-02,8.054999828338625
PIH,8.39000034332275,2014-04-03,8.166666666666666
PIH,8.6899995803833,2014-04-04,8.297499895095825
PIH,8.9399995803833,2014-04-07,8.42599983215332
PIH,8.89999961853027,2014-04-08,8.504999796549479
PIH,8.89000034332275,2014-04-09,8.559999874659947
PIH,8.82999992370605,2014-04-10,8.59374988079071
PIH,8.89000034332275,2014-04-11,8.71124994754791
PIH,9.55000019073486,2014-04-14,8.884999990463253


In [0]:
display(dfAHH.filter(df['ticker'] =='AHH'))

ticker,close,date,7MA
AHH,11.5799999237061,2013-05-08,11.5799999237061
AHH,11.5500001907349,2013-05-09,11.565000057220502
AHH,11.6000003814697,2013-05-10,11.576666831970234
AHH,11.6499996185303,2013-05-13,11.59500002861025
AHH,11.5299997329712,2013-05-14,11.58199996948244
AHH,11.6000003814697,2013-05-15,11.585000038146983
AHH,11.7399997711182,2013-05-16,11.607142857142874
AHH,11.7600002288818,2013-05-17,11.62625002861024
AHH,11.7299995422363,2013-05-20,11.644999980926514
AHH,11.8299999237061,2013-05-21,11.67999994754791
