# Calculating daily price variations in stocks using Apache Spark

We **extract** the csv files downloaded from "https://www.kaggle.com/jacksoncrow/stock-market-dataset". (In this example we only use 10 files to keep the running of the script short. The folder containing this example it's call 'stocks10'.)

Then we **transform** the data calculting the daily variation by subtracting the opening price from the closing price of each stock. 
We add this result in a new column (named as each specific stock and drop the rest of the columns)
We join the new columns to the dataframe 'stocks'. 

Finally we **load** the dataframe as a parquet file partitioning it by year and month.

In [None]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from glob import glob, os
from pyspark.sql.types import StructType, StructField, TimestampType

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

schema = StructType([StructField('Date', TimestampType(), True)])

stocks = sqlContext.createDataFrame(spark.sparkContext.emptyRDD(), schema)

#Extract
path = '/home/jovyan/stocks10/'
for file in sorted(glob(os.path.join(path, '*.csv'))):
    file = file.split('/')
    file = file[-1]

    df = spark.read.format('csv').option('header', True).load(f'/home/jovyan/stocks10/{file}')
    file = file.split('.')
    stock_name = file[-2]
    
#Transform
    df = df.withColumn(f'{stock_name}', (df['Close'] - df['Open'])/ df['Open'])
    df = df.drop('High', 'Low', 'Adj Close', 'Volume', 'Open', 'Close')
    stocks = stocks.join(df, on= 'Date', how = 'fullouter')

stocks = stocks.withColumn("Year", year("Date")).withColumn("Month", month("Date")).repartition('Year', 'Month')

#Load
stocks.write.format('parquet').mode('overwrite').partitionBy(['year', 'month']).save(f'/home/jovyan/parquet/')

In [3]:
!cd /home/jovyan/parquet/ && find | tail -n 10

./year=1968/month=2/.part-00021-3f436c55-1eb2-4f3d-9a45-1d00a7e8f6e4.c000.snappy.parquet.crc
./year=1968/month=3
./year=1968/month=3/.part-00136-3f436c55-1eb2-4f3d-9a45-1d00a7e8f6e4.c000.snappy.parquet.crc
./year=1968/month=3/part-00136-3f436c55-1eb2-4f3d-9a45-1d00a7e8f6e4.c000.snappy.parquet
./year=1968/month=8
./year=1968/month=8/part-00178-3f436c55-1eb2-4f3d-9a45-1d00a7e8f6e4.c000.snappy.parquet
./year=1968/month=8/.part-00178-3f436c55-1eb2-4f3d-9a45-1d00a7e8f6e4.c000.snappy.parquet.crc
./year=1968/month=9
./year=1968/month=9/.part-00013-3f436c55-1eb2-4f3d-9a45-1d00a7e8f6e4.c000.snappy.parquet.crc
./year=1968/month=9/part-00013-3f436c55-1eb2-4f3d-9a45-1d00a7e8f6e4.c000.snappy.parquet
