In [1]:
# import findspark

import findspark
import logging

In [2]:
# import datetime and pandas function

import datetime
import pandas as pd

In [3]:
logging.basicConfig(filename='logfile.txt', level=logging.INFO, format='%(asctime)s:%(levelname)s:%(message)s')

In [4]:
# launch findspark.init()

findspark.init()

In [5]:
# import this to launch a spark session

from pyspark.sql import SparkSession

In [6]:
logging.info("All packages are imported")

In [7]:
# create a spark session

spark = SparkSession.builder.appName("Finance").master("local").enableHiveSupport().getOrCreate()


In [8]:
logging.info("Spark Session created {}".format(spark))

In [9]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType, TimestampType, DateType

# Create an empty RDD
data = spark.sparkContext.emptyRDD()

fileSchema = StructType([StructField('ISIN', StringType(),True),
                        StructField('Mnemonic', StringType(),True),
                        StructField('SecurityDesc', StringType(),True),
                        StructField('SecurityType', StringType(),True),
                        StructField('Currency', StringType(),True),
                        StructField('SecurityID', StringType(),True),
                        StructField('Date', DateType(),True),
                        StructField('Time', TimestampType(),True),
                        StructField('StartPrice', StringType(),True),
                        StructField('MaxPrice', StringType(),True),
                        StructField('MinPrice', StringType(),True),
                        StructField('EndPrice', StringType(),True),
                        StructField('TradedVolume', StringType(),True),
                        StructField('NumberOfTrades', StringType(),True),])
 

# Create an empty RDD with empty schema
df_initial = spark.createDataFrame(data = data,
                             schema = fileSchema)

In [10]:
logging.info("Intial schema with a blank database created. Columns are {}".format(df_initial.columns))

In [11]:
# initializing date
test_date = datetime.datetime.strptime("03-01-2022", "%d-%m-%Y")
date_generated = pd.date_range(test_date, periods=2)

In [12]:
date_generated

DatetimeIndex(['2022-01-03', '2022-01-04'], dtype='datetime64[ns]', freq='D')

In [13]:
logging.info("Date of files which we will be reading {}".format(date_generated))

In [14]:
# reading data into the dataframe from files

for jdx in date_generated:    
    if jdx.weekday() >= 0 and jdx.weekday() <= 4:
        for idx in range(0, 24):
            if idx > 9:
                fname = "dataset/" + str(jdx.date()) + "/" + str(jdx.date()) + "_BINS_XETR" + str(idx) + ".csv"
            else:
                fname = "dataset/" + str(jdx.date()) + "/" + str(jdx.date()) + "_BINS_XETR0" + str(idx) + ".csv"
            
            logging.info("File name {} is picked to be loaded".format(fname))
            df = spark.read.load(fname, format="csv", schema = fileSchema, header = True)
            logging.info("File {} is loaded now. It has {} records and ".format(fname, df.count()))
            df_initial = df_initial.union(df)
            logging.info("Records are merged into main records. Total records now are: {}".format(fname, df.count()))


In [15]:
# column names in the dataframe

df_initial.printSchema()

root
 |-- ISIN: string (nullable = true)
 |-- Mnemonic: string (nullable = true)
 |-- SecurityDesc: string (nullable = true)
 |-- SecurityType: string (nullable = true)
 |-- Currency: string (nullable = true)
 |-- SecurityID: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- StartPrice: string (nullable = true)
 |-- MaxPrice: string (nullable = true)
 |-- MinPrice: string (nullable = true)
 |-- EndPrice: string (nullable = true)
 |-- TradedVolume: string (nullable = true)
 |-- NumberOfTrades: string (nullable = true)



In [16]:
# initial look of data

df_initial.show()

+------------+--------+--------------------+------------+--------+----------+----------+-------------------+----------+--------+--------+--------+------------+--------------+
|        ISIN|Mnemonic|        SecurityDesc|SecurityType|Currency|SecurityID|      Date|               Time|StartPrice|MaxPrice|MinPrice|EndPrice|TradedVolume|NumberOfTrades|
+------------+--------+--------------------+------------+--------+----------+----------+-------------------+----------+--------+--------+--------+------------+--------------+
|AT0000A0E9W5|    SANT|         S+T AG O.N.|Common stock|     EUR|   2504159|2022-01-03|2022-10-28 08:00:00|     14.76|   14.76|   14.75|   14.75|        4414|             2|
|DE000A0DJ6J9|     S92|SMA SOLAR TECHNOL.AG|Common stock|     EUR|   2504287|2022-01-03|2022-10-28 08:00:00|     37.64|   37.66|    37.6|   37.66|        1649|             3|
|DE000A0D6554|    NDX1|      NORDEX SE O.N.|Common stock|     EUR|   2504290|2022-01-03|2022-10-28 08:00:00|     13.99|   14.

In [18]:
# converting time column into time format

from pyspark.sql.functions import date_format
df_initial = df_initial.withColumn('Time', date_format('time', 'HH:mm:ss'))
logging.info("Format of time updated")

In [19]:
#  checking data after conversion

df_initial.show(5)

+------------+--------+--------------------+------------+--------+----------+----------+--------+----------+--------+--------+--------+------------+--------------+
|        ISIN|Mnemonic|        SecurityDesc|SecurityType|Currency|SecurityID|      Date|    Time|StartPrice|MaxPrice|MinPrice|EndPrice|TradedVolume|NumberOfTrades|
+------------+--------+--------------------+------------+--------+----------+----------+--------+----------+--------+--------+--------+------------+--------------+
|AT0000A0E9W5|    SANT|         S+T AG O.N.|Common stock|     EUR|   2504159|2022-01-03|08:00:00|     14.76|   14.76|   14.75|   14.75|        4414|             2|
|DE000A0DJ6J9|     S92|SMA SOLAR TECHNOL.AG|Common stock|     EUR|   2504287|2022-01-03|08:00:00|     37.64|   37.66|    37.6|   37.66|        1649|             3|
|DE000A0D6554|    NDX1|      NORDEX SE O.N.|Common stock|     EUR|   2504290|2022-01-03|08:00:00|     13.99|   14.03|   13.94|   13.96|       23011|            36|
|DE000A0D9PT0|  

### Analysing using Spark SQL

In [20]:
# Create temporary view of our table

df_initial.createOrReplaceTempView('df_initial_table')
logging.info("Initial table created")

In [21]:
# keep only necessary columns and save them in a dataframe. Two new columns also added to filter open price and close price for later stage

df_initial_trans_1 = spark.sql("Select Date, Time, Mnemonic, StartPrice, row_number() over(partition by Date, Mnemonic order by Time) as for_start, row_number() over(partition by Date, Mnemonic order by Time Desc) as for_end, MaxPrice, MinPrice, EndPrice, TradedVolume, NumberOfTrades from df_initial_table")
logging.info("Unecessary columns filtered out")

In [22]:
# Checking data to see if new columns are populated correctly

df_initial_trans_1.filter(df_initial_trans_1.Mnemonic == '00XJ').show(5)

+----------+--------+--------+----------+---------+-------+--------+--------+--------+------------+--------------+
|      Date|    Time|Mnemonic|StartPrice|for_start|for_end|MaxPrice|MinPrice|EndPrice|TradedVolume|NumberOfTrades|
+----------+--------+--------+----------+---------+-------+--------+--------+--------+------------+--------------+
|2022-01-03|16:36:00|    00XJ|    6.3345|        5|      1|  6.3345|  6.3345|  6.3345|           0|             2|
|2022-01-03|15:03:00|    00XJ|     6.433|        4|      2|   6.433|   6.433|   6.433|          92|             1|
|2022-01-03|14:44:00|    00XJ|       6.5|        3|      3|     6.5|     6.5|     6.5|         500|             1|
|2022-01-03|12:12:00|    00XJ|    6.4345|        2|      4|  6.4345|  6.4345|  6.4345|           0|             1|
|2022-01-03|08:04:00|    00XJ|    6.4435|        1|      5|  6.4435|  6.4435|  6.4435|           0|             1|
+----------+--------+--------+----------+---------+-------+--------+--------+---

In [23]:
# Create temp table after filteration

df_initial_trans_1.createOrReplaceTempView('df_initial_trans_1_table')
logging.info("Table created for 2nd query")

In [24]:
# Query to select high price, low price, total traded volumne and total number of trades

df_initial_trans_2 = spark.sql("Select Date, Mnemonic, case when for_start = 1 then StartPrice else NULL end as open_price, case when for_end = 1 then EndPrice else NULL end as close_price, max(MaxPrice) over(partition by Date, Mnemonic) as High, min(MinPrice) over(partition by Date, Mnemonic) as Low, sum(TradedVolume) over(partition by Date, Mnemonic) as total_trades, sum(NumberOfTrades) over(partition by Date, Mnemonic) as NumberOfTrades from df_initial_trans_1_table")

logging.info("Query to select high price, low price, total traded volumne and total number of trades executed")

In [25]:
# Checking data to see if new columns are populated correctly

df_initial_trans_2.filter(df_initial_trans_2.Mnemonic == '00XJ').show(5)

+----------+--------+----------+-----------+----+------+------------+--------------+
|      Date|Mnemonic|open_price|close_price|High|   Low|total_trades|NumberOfTrades|
+----------+--------+----------+-----------+----+------+------------+--------------+
|2022-01-03|    00XJ|      null|     6.3345| 6.5|6.3345|       592.0|           6.0|
|2022-01-03|    00XJ|      null|       null| 6.5|6.3345|       592.0|           6.0|
|2022-01-03|    00XJ|      null|       null| 6.5|6.3345|       592.0|           6.0|
|2022-01-03|    00XJ|      null|       null| 6.5|6.3345|       592.0|           6.0|
|2022-01-03|    00XJ|    6.4435|       null| 6.5|6.3345|       592.0|           6.0|
+----------+--------+----------+-----------+----+------+------------+--------------+
only showing top 5 rows



In [26]:
# Create temp table after filteration

df_initial_trans_2.createOrReplaceTempView('df_initial_trans_2_table')

In [27]:
# Query to remove the null rows from the data

df_initial_trans_3 = spark.sql("Select Date, Mnemonic, round(max(open_price),2) as open_price, round(max(close_price),2) as close_price, round(max(High),2) as High, round(max(Low),2) as Low, max(total_trades) as total_trades, max(NumberOfTrades) as NumberOfTrades from df_initial_trans_2_table group by Date, Mnemonic")
logging.info("Table created for 3rd query")

In [28]:
# Query to check if data is getting populated correctly

df_initial_trans_3.filter(df_initial_trans_3.Mnemonic == 'SANT').show()

+----------+--------+----------+-----------+-----+-----+------------+--------------+
|      Date|Mnemonic|open_price|close_price| High|  Low|total_trades|NumberOfTrades|
+----------+--------+----------+-----------+-----+-----+------------+--------------+
|2022-01-03|    SANT|     14.76|      15.62| 15.7|14.75|    370776.0|         717.0|
|2022-01-04|    SANT|     15.66|      14.96|15.68|14.96|    589461.0|         973.0|
+----------+--------+----------+-----------+-----+-----+------------+--------------+



In [29]:
# Create temp table after filteration

df_initial_trans_3.createOrReplaceTempView('df_initial_trans_3_table')
logging.info("Table created for 4th query")

In [30]:
df_initial_trans_4 = spark.sql("Select *, Lag(close_price,1) over(partition by Mnemonic order by Date) as previous_day from df_initial_trans_3_table")
logging.info("Query to create a lag column which will be used for calculating the closing price")

In [31]:
 # Query to check if data is getting populated correctly

df_initial_trans_4.filter(df_initial_trans_4.Mnemonic == 'SANT').show()

+----------+--------+----------+-----------+-----+-----+------------+--------------+------------+
|      Date|Mnemonic|open_price|close_price| High|  Low|total_trades|NumberOfTrades|previous_day|
+----------+--------+----------+-----------+-----+-----+------------+--------------+------------+
|2022-01-03|    SANT|     14.76|      15.62| 15.7|14.75|    370776.0|         717.0|        null|
|2022-01-04|    SANT|     15.66|      14.96|15.68|14.96|    589461.0|         973.0|       15.62|
+----------+--------+----------+-----------+-----+-----+------------+--------------+------------+



In [32]:
# Create temp table after filteration

df_initial_trans_4.createOrReplaceTempView('df_initial_trans_4_table')
logging.info("Table created for 5th query")

In [33]:
df_initial_trans_5 = spark.sql("Select *, case when previous_day is NULL then 0 else round((close_price - previous_day),2) end as close_price_diff from df_initial_trans_4_table")
logging.info("Query to calculate price difference is calculated")

In [34]:
 # Query to check if data is getting populated correctly

df_initial_trans_5.filter(df_initial_trans_5.Mnemonic == 'SANT').show()

+----------+--------+----------+-----------+-----+-----+------------+--------------+------------+----------------+
|      Date|Mnemonic|open_price|close_price| High|  Low|total_trades|NumberOfTrades|previous_day|close_price_diff|
+----------+--------+----------+-----------+-----+-----+------------+--------------+------------+----------------+
|2022-01-03|    SANT|     14.76|      15.62| 15.7|14.75|    370776.0|         717.0|        null|             0.0|
|2022-01-04|    SANT|     15.66|      14.96|15.68|14.96|    589461.0|         973.0|       15.62|           -0.66|
+----------+--------+----------+-----------+-----+-----+------------+--------------+------------+----------------+



In [35]:
# Create temp table after filteration

df_initial_trans_5.createOrReplaceTempView('df_initial_trans_5_table')
logging.info("Table created for 6th query")

In [36]:
df_initial_trans_6 = spark.sql("Select weekofyear(Date) as Week_of_Year, Date, Mnemonic, open_price, close_price, High, Low, close_price_diff, NumberOfTrades, total_trades from df_initial_trans_5_table")
logging.info("Adding week of the year in the table")

In [37]:
df_initial_trans_6.filter(df_initial_trans_6.Mnemonic == 'SANT').show()

+------------+----------+--------+----------+-----------+-----+-----+----------------+--------------+------------+
|Week_of_Year|      Date|Mnemonic|open_price|close_price| High|  Low|close_price_diff|NumberOfTrades|total_trades|
+------------+----------+--------+----------+-----------+-----+-----+----------------+--------------+------------+
|           1|2022-01-03|    SANT|     14.76|      15.62| 15.7|14.75|             0.0|         717.0|    370776.0|
|           1|2022-01-04|    SANT|     15.66|      14.96|15.68|14.96|           -0.66|         973.0|    589461.0|
+------------+----------+--------+----------+-----------+-----+-----+----------------+--------------+------------+



In [38]:
df_initial_trans_6.toPandas().to_csv("Converted\Weekly_Record\Overall_Days.csv")

In [39]:
# Create temp table after filteration

df_initial_trans_6.createOrReplaceTempView('df_initial_trans_6_table')
logging.info("Table created for 7th query")

In [40]:
df_initial_trans_week = spark.sql("Select Week_of_Year, Mnemonic, round(avg(open_price),2) as op_week, round(avg(close_price),2) as cp_week, round(avg(High),2) as high_week, round(avg(Low),2) as low_week, round(sum(NumberOfTrades),2) as NumberOfTrades, round(sum(total_trades),2) as total_trades from df_initial_trans_6_table group by Week_of_Year, Mnemonic")
logging.info("Table created for week wise data")

In [41]:
df_initial_trans_week.filter(df_initial_trans_week.Mnemonic == 'SANT').show()

+------------+--------+-------+-------+---------+--------+--------------+------------+
|Week_of_Year|Mnemonic|op_week|cp_week|high_week|low_week|NumberOfTrades|total_trades|
+------------+--------+-------+-------+---------+--------+--------------+------------+
|           1|    SANT|  15.21|  15.29|    15.69|   14.86|        1690.0|    960237.0|
+------------+--------+-------+-------+---------+--------+--------------+------------+



In [42]:
# Create temp table after filteration

df_initial_trans_week.createOrReplaceTempView('df_initial_trans_week_table')
logging.info("Table created for 8th query")

In [43]:
df_initial_trans_week_2 = spark.sql("Select *, LAG(cp_week, 1) over(partition by Mnemonic order by Week_of_Year) as close_price_dff from df_initial_trans_week_table")
logging.info("Table created for weekly price difference")

In [44]:
df_initial_trans_week_2.filter(df_initial_trans_week_2.Mnemonic == 'SANT').show()

+------------+--------+-------+-------+---------+--------+--------------+------------+---------------+
|Week_of_Year|Mnemonic|op_week|cp_week|high_week|low_week|NumberOfTrades|total_trades|close_price_dff|
+------------+--------+-------+-------+---------+--------+--------------+------------+---------------+
|           1|    SANT|  15.21|  15.29|    15.69|   14.86|        1690.0|    960237.0|           null|
+------------+--------+-------+-------+---------+--------+--------------+------------+---------------+



In [45]:
# Create temp table after filteration

df_initial_trans_week_2.createOrReplaceTempView('df_initial_trans_week_2_table')
logging.info("Table created for 9th query")

In [46]:
df_initial_trans_week_3 = spark.sql("Select Week_of_Year, Mnemonic, op_week, cp_week, high_week, low_week, NumberOfTrades, total_trades, Case when close_price_dff is NULL then 0 else round((close_price_dff - cp_week),2) end as close_week_diff from df_initial_trans_week_2_table")
logging.info("Creating weekly difference column")

In [47]:
df_initial_trans_week_3.filter(df_initial_trans_week_3.Mnemonic == 'SANT').show()

+------------+--------+-------+-------+---------+--------+--------------+------------+---------------+
|Week_of_Year|Mnemonic|op_week|cp_week|high_week|low_week|NumberOfTrades|total_trades|close_week_diff|
+------------+--------+-------+-------+---------+--------+--------------+------------+---------------+
|           1|    SANT|  15.21|  15.29|    15.69|   14.86|        1690.0|    960237.0|            0.0|
+------------+--------+-------+-------+---------+--------+--------------+------------+---------------+



In [48]:
df_initial_trans_week_3.toPandas().to_csv("Converted\Weekly_Record\Overall_Weeks.csv")