In [69]:
import boto3
import pandas as pd
from io import StringIO

In [70]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import *

In [71]:
from pyspark.sql.types import *

In [72]:
s3 = boto3.resource('s3')
bucket = s3.Bucket('deutsche-boerse-xetra-pds')

In [81]:
bucket_obj1 = bucket.objects.filter(Prefix='2021-03-15')
bucket_obj2 = bucket.objects.filter(Prefix='2021-03-16')
bucket_obj3 = bucket.objects.filter(Prefix='2021-03-17')
objects = [obj for obj in bucket_obj1]+ [obj for obj in bucket_obj2]+ [obj for obj in bucket_obj3]

In [82]:
csv_obj_init = bucket.Object(key=objects[0].key).get().get('Body').read().decode('utf-8')
data = StringIO(csv_obj_init)
df_init = pd.read_csv(data, delimiter=',')

In [83]:
df_init.columns

Index(['ISIN', 'Mnemonic', 'SecurityDesc', 'SecurityType', 'Currency',
       'SecurityID', 'Date', 'Time', 'StartPrice', 'MaxPrice', 'MinPrice',
       'EndPrice', 'TradedVolume', 'NumberOfTrades'],
      dtype='object')

In [84]:
df_all = pd.DataFrame(columns=df_init.columns)
for obj in objects:
    csv_obj = bucket.Object(key=obj.key).get().get('Body').read().decode('utf-8')
    data = StringIO(csv_obj)
    df = pd.read_csv(data, delimiter=',')
    df_all = df_all.append(df, ignore_index=True)

In [85]:
df_all.shape

(305861, 14)

In [86]:
df_all.dropna(inplace=True)

In [87]:
## Extract columns for history table 

In [88]:
columns1 = ['ISIN', 'Date', 'Currency','Time', 'StartPrice', 'MaxPrice', 'MinPrice', 'EndPrice', 'TradedVolume']
history = df_all.loc[:, columns1]

In [89]:
# Add new columns to the dateframe: closing price and opening price for each ISIN per day

In [90]:
history['OpeningPrice'] = history.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('first')

In [91]:
history['ClosingPrice'] = history.sort_values(by=['Time']).groupby(['ISIN', 'Date'])['StartPrice'].transform('last')

In [92]:
spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()
sparkDF=spark.createDataFrame(history) 
sparkDF.printSchema()

root
 |-- ISIN: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- StartPrice: double (nullable = true)
 |-- MaxPrice: double (nullable = true)
 |-- MinPrice: double (nullable = true)
 |-- EndPrice: double (nullable = true)
 |-- TradedVolume: long (nullable = true)
 |-- OpeningPrice: double (nullable = true)
 |-- ClosingPrice: double (nullable = true)



In [93]:
## cast date type from string to date

In [94]:
sparkDF.withColumn("Date",sparkDF.Date.cast(DateType()))

DataFrame[ISIN: string, Date: date, Currency: string, Time: string, StartPrice: double, MaxPrice: double, MinPrice: double, EndPrice: double, TradedVolume: bigint, OpeningPrice: double, ClosingPrice: double]

In [95]:
## Filter out transcations that are not using EUR and drop multiple columns for 

In [96]:
historyDF =sparkDF.filter(sparkDF["Currency"]=="EUR").drop("Currency","Time","StartPrice","EndPrice")

In [97]:
# Rename the columns

In [98]:
historyDF = historyDF.withColumnRenamed("ISIN","isin")\
         .withColumnRenamed("Date","date")\
         .withColumnRenamed("MaxPrice","max_price")\
         .withColumnRenamed("MinPrice","min_price")\
         .withColumnRenamed("TradedVolume","volume")\
         .withColumnRenamed("OpeningPrice","opening_price")\
         .withColumnRenamed("ClosingPrice","closing_price")

In [99]:
historyDF.show()

+------------+----------+---------+---------+------+-------------+-------------+
|        isin|      date|max_price|min_price|volume|opening_price|closing_price|
+------------+----------+---------+---------+------+-------------+-------------+
|AT0000A0E9W5|2021-03-15|    22.12|    22.12|  1527|        22.12|        22.42|
|DE000A0DJ6J9|2021-03-15|    53.85|     53.5|   508|        53.85|         55.3|
|DE000A0D6554|2021-03-15|    22.24|    22.18|  5270|        22.24|        21.84|
|DE000A0D9PT0|2021-03-15|    201.5|    200.6|  1744|        201.5|        203.0|
|DE000A0HN5C6|2021-03-15|    39.06|    38.89| 28662|        38.95|        38.95|
|DE000A0JL9W6|2021-03-15|     33.6|     33.3|  2866|         33.5|         31.6|
|DE000A0LAUP1|2021-03-15|    11.38|    11.38|   743|        11.38|        11.38|
|DE000A0LD2U1|2021-03-15|     14.3|    14.28|  1749|         14.3|        14.07|
|DE000A0LD6E6|2021-03-15|     84.7|     84.4|  1673|         84.5|         84.0|
|DE000A0S8488|2021-03-15|   

In [100]:
#save to local file system

In [101]:
historyDF.toPandas().to_csv("History.csv",encoding='utf-8', index=False)

In [102]:
## Extract columns for security table 

In [103]:
sparkDF=spark.createDataFrame(df_all)

In [104]:
sparkDF.columns

['ISIN',
 'Mnemonic',
 'SecurityDesc',
 'SecurityType',
 'Currency',
 'SecurityID',
 'Date',
 'Time',
 'StartPrice',
 'MaxPrice',
 'MinPrice',
 'EndPrice',
 'TradedVolume',
 'NumberOfTrades']

In [105]:
security = sparkDF.select('ISIN','SecurityDesc','SecurityType','SecurityID')

In [106]:
security = security.withColumnRenamed('ISIN','isin')\
        .withColumnRenamed('SecurityDesc','security_desc')\
        .withColumnRenamed('SecurityType','security_type')\
        .withColumnRenamed('SecurityID','security_id')

In [107]:
security.show()

+------------+--------------------+-------------+-----------+
|        isin|       security_desc|security_type|security_id|
+------------+--------------------+-------------+-----------+
|AT0000A0E9W5|S+T AG (Z.REG.MK....| Common stock|    2504159|
|DE000A0DJ6J9|SMA SOLAR TECHNOL.AG| Common stock|    2504287|
|DE000A0D6554|      NORDEX SE O.N.| Common stock|    2504290|
|DE000A0D9PT0|MTU AERO ENGINES ...| Common stock|    2504297|
|DE000A0HN5C6|DEUTSCHE WOHNEN S...| Common stock|    2504314|
|DE000A0JL9W6|VERBIO VER.BIOENE...| Common stock|    2504343|
|DE000A0LAUP1|     CROPENERGIES AG| Common stock|    2504376|
|DE000A0LD2U1|ALSTRIA OFFICE RE...| Common stock|    2504379|
|DE000A0LD6E6|     GERRESHEIMER AG| Common stock|    2504380|
|DE000A0S8488|HAMBURG.HAFEN LOG...| Common stock|    2504409|
|DE000A0WMPJ6|  AIXTRON SE NA O.N.| Common stock|    2504428|
|DE000A0Z2ZZ5|  FREENET AG NA O.N.| Common stock|    2504438|
|DE000A1DAHH0| BRENNTAG SE NA O.N.| Common stock|    2504453|
|DE000A1

In [108]:
security.toPandas().to_csv("Security.csv",encoding='utf-8', index=False)

In [109]:
# Want to know the percentage changes of every ISIN for each day

In [110]:
historyDF.createOrReplaceTempView("history_table")

In [111]:
query_1='''
select isin, date, round((closing_price-opening_price),2) as percentage_change
from history_table 
        '''

In [112]:
history_changes=spark.sql(query_1)

In [113]:
history_changes.show()

+------------+----------+-----------------+
|        isin|      date|percentage_change|
+------------+----------+-----------------+
|AT0000A0E9W5|2021-03-15|              0.3|
|DE000A0DJ6J9|2021-03-15|             1.45|
|DE000A0D6554|2021-03-15|             -0.4|
|DE000A0D9PT0|2021-03-15|              1.5|
|DE000A0HN5C6|2021-03-15|              0.0|
|DE000A0JL9W6|2021-03-15|             -1.9|
|DE000A0LAUP1|2021-03-15|              0.0|
|DE000A0LD2U1|2021-03-15|            -0.23|
|DE000A0LD6E6|2021-03-15|             -0.5|
|DE000A0S8488|2021-03-15|             -0.1|
|DE000A0WMPJ6|2021-03-15|             0.88|
|DE000A0Z2ZZ5|2021-03-15|            -0.18|
|DE000A1DAHH0|2021-03-15|            -0.58|
|DE000A1EWWW0|2021-03-15|             -6.0|
|DE000A1H8BV3|2021-03-15|            -1.26|
|DE000A1J5RX9|2021-03-15|            -0.02|
|DE000A1ML7J1|2021-03-15|            -0.24|
|DE000A1MMCC8|2021-03-15|             -1.0|
|DE000A1PHFF7|2021-03-15|             0.29|
|DE000A1TNUT7|2021-03-15|       

In [114]:
history_changes.toPandas().to_csv("DailyChange.csv",encoding='utf-8', index=False)