In [None]:
pip install pyspark

In [None]:
import pyspark
import datetime

In [None]:
from pyspark import SparkContext, SparkConf, sql
from pyspark.sql import Row

In [None]:
sc = SparkContext.getOrCreate()
sqlContext = sql.SQLContext(sc)

In [14]:
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import col,avg,sum,min,max,row_number,cume_dist,first,lag,lit,when
from pyspark.sql.functions import column
from re import split

tapeDf = sqlContext.read.format("csv")\
.option("inferSchema", True) \
.option("header", False) \
.load("/content/sample_data/HSBC3/*")

tapeDf1 = tapeDf.select("_c0", "_c2", "_c3", "_c8")\
.withColumnRenamed("_c0", "Date")\
.withColumnRenamed("_c2", "TimeInSec")\
.withColumnRenamed("_c3", "Price")\
.withColumnRenamed("_c8", "Quantity")

# tapeDf1.show()

tapeDf2 = tapeDf1.rdd.map(lambda x : (
    float(x.TimeInSec), int(x.Date.split("-")[1]), int(x.Date.split("-")[2]), float(x.Price), int(x.Quantity.split(":")[1].strip()),\
    int(datetime.datetime(int(x.Date.split("-")[0][3:]), int(x.Date.split("-")[1]), int(x.Date.split("-")[2])).strftime('%w'))))

schema = StructType([
                     StructField("TimeInSec", FloatType(), True),
                     StructField("Month", IntegerType(), True),
                     StructField("Day", IntegerType(), True),
                     StructField("Price", FloatType(), True),
                     StructField("Quantity", IntegerType(), True),
                     StructField("DayOfWeek", IntegerType(), True)
])

tapeDf3 = sqlContext.createDataFrame(tapeDf2, schema)
# tapeDf3.show()

In [15]:
windowSpec = Window.partitionBy("Month", "Day").orderBy(col("TimeInSec").desc())
windowSpecAggregator = Window.partitionBy("Month", "Day")

groupedDf = tapeDf3.withColumn("HighestPrice", max(col("Price")).over(windowSpecAggregator))\
.withColumn("LowestPrice", min(col("Price")).over(windowSpecAggregator))\
.withColumn("VolumeTraded", sum(col("Quantity")).over(windowSpecAggregator))\
.withColumn("RowNumber", row_number().over(windowSpec))\
.withColumnRenamed("Price", "ClosingPrice")\
.where(col("RowNumber")==1)\
.select("Month", "Day", "ClosingPrice", "HighestPrice", "LowestPrice", "VolumeTraded", "DayOfWeek")

# groupedDf.show()

windowSpec1 = Window.partitionBy("Month").orderBy("Day")
windowSpec10 = Window.partitionBy("Month").orderBy("Day").rangeBetween(-9, Window.currentRow)
windowSpec16 = Window.partitionBy("Month").orderBy("Day").rangeBetween(-15, Window.currentRow)
windowSpec22 = Window.partitionBy("Month").orderBy("Day").rangeBetween(-21, Window.currentRow)

featuresDf = groupedDf.withColumn("SMA10Days", sum(col("ClosingPrice")/10).over(windowSpec10))\
.withColumn("SMA16Days", sum(col("ClosingPrice")/16).over(windowSpec16))\
.withColumn("SMA22Days", sum(col("ClosingPrice")/22).over(windowSpec22))\
.withColumn("EMA10Days", (col("SMA10Days")*(2/11))+(lag("SMA10Days").over(windowSpec1)*(1-(2/11))))\
.withColumn("EMA16Days", (col("SMA16Days")*(2/17))+(lag("SMA16Days").over(windowSpec1)*(1-(2/17))))\
.withColumn("EMA22Days", (col("SMA22Days")*(2/23))+(lag("SMA22Days").over(windowSpec1)*(1-(2/23))))\
.withColumn("DaysUntilWeekend", when(col("DayOfWeek")==0, 0)
.when(col("DayOfWeek")==1, 5)
.when(col("DayOfWeek")==2, 4)
.when(col("DayOfWeek")==3, 3)
.when(col("DayOfWeek")==4, 2)
.when(col("DayOfWeek")==5, 1)
.when(col("DayOfWeek")==6, 0))\
.withColumn("DaysUntilMonthEnd", when((col("Month")==1) | (col("Month")==3) | (col("Month")==5) | (col("Month")==7) | (col("Month")==8)\
                                      | (col("Month")==10) | (col("Month")==12), 31-col("Day"))
.when(col("Month")==2, 28-col("Day"))
.otherwise(30-col("Day")))

# featuresDf.show()

In [16]:
finalDf = tapeDf3.alias("a").join(featuresDf.alias("b"), (tapeDf3.Month == featuresDf.Month) & (tapeDf3.Day == featuresDf.Day))\
.select("a.Month", "a.Day", "a.TimeInSec", "a.Price", "a.Quantity", "b.ClosingPrice", "b.HighestPrice", "b.LowestPrice", "b.VolumeTraded", \
        "b.SMA10Days", "b.SMA16Days", "b.SMA22Days", "b.EMA10Days", "b.EMA16Days", "b.EMA22Days", "b.DaysUntilWeekend", "b.DaysUntilMonthEnd")

finalDf.show()

finalDf.write.option("header",True)\
.partitionBy("Month", "Day")\
.mode("overwrite")\
.csv("/content/sample_data/HSBC3/SampleTapeFeatures3/")

+-----+---+---------+-----+--------+------------+------------+-----------+------------+------------------+---------+------------------+-----------------+------------------+------------------+----------------+-----------------+
|Month|Day|TimeInSec|Price|Quantity|ClosingPrice|HighestPrice|LowestPrice|VolumeTraded|         SMA10Days|SMA16Days|         SMA22Days|        EMA10Days|         EMA16Days|         EMA22Days|DaysUntilWeekend|DaysUntilMonthEnd|
+-----+---+---------+-----+--------+------------+------------+-----------+------------+------------------+---------+------------------+-----------------+------------------+------------------+----------------+-----------------+
|    4|  6|    5.968|125.0|       3|       150.0|       153.0|      118.0|      119064|59.599999999999994|    37.25|27.090909090909093|47.32727272727272|28.977941176470587|20.865612648221344|               3|               24|
|    4|  6|    6.512|126.0|       2|       150.0|       153.0|      118.0|      119064|59.59

# New Section