# Calculating daily percentage change in stocks using Apache Spark

We **extract** the files downloaded from https://www.kaggle.com/jacksoncrow/stock-market-dataset and read the csv format files. In this case we only use 10 stocks to keep the running of the script short.

Then we **transform** the data calculting the daily percentage change in stocks by subtracting the opening price from the closing price of each stock and dividing it by the opening price. 
We add this result in a new column (named as each specific stock) and drop the rest of the columns.
We join the new columns to the dataframe 'stocks'. 

Finally we **load** the dataframe as a parquet file partitioning by year and month.

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, TimestampType
from pyspark.sql.functions import *
from glob import glob, os

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

schema = StructType([StructField('Date', TimestampType(), True)])

stocks = sqlContext.createDataFrame(spark.sparkContext.emptyRDD(), schema)

#Extract
path = '/home/jovyan/stocks10/'
for file in sorted(glob(os.path.join(path, '*.csv'))):
    file = file.split('/')
    file = file[-1]

    df = spark.read.format('csv').option('header', True).load(f'/home/jovyan/stocks10/{file}')
    file = file.split('.')
    stock_name = file[-2]
    
#Transform
    df = df.withColumn(f'{stock_name}', (df['Close'] - df['Open'])/ df['Open'])
    df = df.drop('High', 'Low', 'Adj Close', 'Volume', 'Open', 'Close')
    stocks = stocks.join(df, on= 'Date', how = 'fullouter')

stocks = stocks.withColumn("Year", year("Date")).withColumn("Month", month("Date")).repartition(1)

#Load
stocks.write.format('parquet').mode('overwrite').partitionBy(['year', 'month']).save(f'/home/jovyan/parquet/')

stocks.show(10)

+----------+----+--------------------+----+----+----+----+----+----+----+----+----+-----+
|      Date|   A|                  AA|AACG| AAL|AAMC|AAME| AAN|AAOI|AAON| AAP|Year|Month|
+----------+----+--------------------+----+----+----+----+----+----+----+----+----+-----+
|1963-03-22|null|-0.00155407666901...|null|null|null|null|null|null|null|null|1963|    3|
|1964-01-22|null|-0.00221243046520...|null|null|null|null|null|null|null|null|1964|    1|
|1968-05-23|null|-0.00350470320111...|null|null|null|null|null|null|null|null|1968|    5|
|1968-06-04|null|0.012588933671785681|null|null|null|null|null|null|null|null|1968|    6|
|1969-04-03|null|                 0.0|null|null|null|null|null|null|null|null|1969|    4|
|1969-07-22|null|-0.01343463765040...|null|null|null|null|null|null|null|null|1969|    7|
|1970-01-22|null|0.010925834436531119|null|null|null|null|null|null|null|null|1970|    1|
|1971-12-28|null|0.001843297340496...|null|null|null|null|null|null|null|null|1971|   12|
|1973-04-2

In [3]:
!cd /home/jovyan/parquet/ && find | tail -n 10

./year=1968/month=2/.part-00000-e1f0c7ba-3d25-404b-a73b-5ab49c25d7ef.c000.snappy.parquet.crc
./year=1968/month=3
./year=1968/month=3/part-00000-e1f0c7ba-3d25-404b-a73b-5ab49c25d7ef.c000.snappy.parquet
./year=1968/month=3/.part-00000-e1f0c7ba-3d25-404b-a73b-5ab49c25d7ef.c000.snappy.parquet.crc
./year=1968/month=8
./year=1968/month=8/part-00000-e1f0c7ba-3d25-404b-a73b-5ab49c25d7ef.c000.snappy.parquet
./year=1968/month=8/.part-00000-e1f0c7ba-3d25-404b-a73b-5ab49c25d7ef.c000.snappy.parquet.crc
./year=1968/month=9
./year=1968/month=9/part-00000-e1f0c7ba-3d25-404b-a73b-5ab49c25d7ef.c000.snappy.parquet
./year=1968/month=9/.part-00000-e1f0c7ba-3d25-404b-a73b-5ab49c25d7ef.c000.snappy.parquet.crc
