In [1]:
# initialize spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pro').getOrCreate()

In [2]:
spark

In [3]:
# read in dataset from s3
df = spark.read.format('csv')\
    .option('inferSchema', 'true')\
    .option('header', 'true')\
    .load('s3://hzhang502/data')

In [4]:
df.show(5) # show first 5 lines

+------------+--------+--------------------+------------+--------+----------+-------------------+-----+----------+--------+--------+--------+------------+--------------+
|        ISIN|Mnemonic|        SecurityDesc|SecurityType|Currency|SecurityID|               Date| Time|StartPrice|MaxPrice|MinPrice|EndPrice|TradedVolume|NumberOfTrades|
+------------+--------+--------------------+------------+--------+----------+-------------------+-----+----------+--------+--------+--------+------------+--------------+
|AT0000A0E9W5|    SANT|S+T AG (Z.REG.MK....|Common stock|     EUR|   2504159|2018-02-06 00:00:00|09:00|     20.04|   20.04|   19.91|   19.95|        3314|            16|
|AT00000FACC2|     1FC|    FACC AG INH.AKT.|Common stock|     EUR|   2504163|2018-02-06 00:00:00|09:00|      16.5|    16.5|    16.5|    16.5|         250|             2|
|AT0000743059|     OMV|              OMV AG|Common stock|     EUR|   2504175|2018-02-06 00:00:00|09:00|      48.8|    48.8|    48.8|    48.8|         

In [5]:
df.printSchema() # show schema of dataset, some need correction

root
 |-- ISIN: string (nullable = true)
 |-- Mnemonic: string (nullable = true)
 |-- SecurityDesc: string (nullable = true)
 |-- SecurityType: string (nullable = true)
 |-- Currency: string (nullable = true)
 |-- SecurityID: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Time: string (nullable = true)
 |-- StartPrice: double (nullable = true)
 |-- MaxPrice: double (nullable = true)
 |-- MinPrice: double (nullable = true)
 |-- EndPrice: double (nullable = true)
 |-- TradedVolume: integer (nullable = true)
 |-- NumberOfTrades: integer (nullable = true)



In [6]:
# drop columns irrelevant to our analysis
drop = ['ISIN', 'Currency', 'SecurityID', 'TradedVolume', 'NumberOfTrades']
for col in drop:
    df = df.drop(col)

In [7]:
df.show(5) # show first 5 lines

+--------+--------------------+------------+-------------------+-----+----------+--------+--------+--------+
|Mnemonic|        SecurityDesc|SecurityType|               Date| Time|StartPrice|MaxPrice|MinPrice|EndPrice|
+--------+--------------------+------------+-------------------+-----+----------+--------+--------+--------+
|    SANT|S+T AG (Z.REG.MK....|Common stock|2018-02-06 00:00:00|09:00|     20.04|   20.04|   19.91|   19.95|
|     1FC|    FACC AG INH.AKT.|Common stock|2018-02-06 00:00:00|09:00|      16.5|    16.5|    16.5|    16.5|
|     OMV|              OMV AG|Common stock|2018-02-06 00:00:00|09:00|      48.8|    48.8|    48.8|    48.8|
|     VAS|      VOESTALPINE AG|Common stock|2018-02-06 00:00:00|09:00|     49.63|   49.64|   49.63|   49.63|
|     AUS|AT+S AUSTR.T.+SYS...|Common stock|2018-02-06 00:00:00|09:00|      21.6|    21.6|    21.4|    21.6|
+--------+--------------------+------------+-------------------+-----+----------+--------+--------+--------+
only showing top 5 

In [8]:
df.count() # show total number of rows

34947852

In [9]:
# cast col Date to date type
df_new = df.withColumn('Date', df['Date'].cast('date'))

In [10]:
# select stocks only
df_new = df_new.filter(df_new['SecurityType'] == 'Common stock')

In [11]:
# separate hour and minute to do time range selection later
import pyspark
import pyspark.sql.functions as fct
split_col = fct.split(df_new['Time'], ':')
df1 = df_new.withColumn('Hour', split_col.getItem(0))
df1 = df1.withColumn('Minute', split_col.getItem(1))

In [12]:
df1.show(5) # show first 5 lines

+--------+--------------------+------------+----------+-----+----------+--------+--------+--------+----+------+
|Mnemonic|        SecurityDesc|SecurityType|      Date| Time|StartPrice|MaxPrice|MinPrice|EndPrice|Hour|Minute|
+--------+--------------------+------------+----------+-----+----------+--------+--------+--------+----+------+
|    SANT|S+T AG (Z.REG.MK....|Common stock|2018-02-06|09:00|     20.04|   20.04|   19.91|   19.95|  09|    00|
|     1FC|    FACC AG INH.AKT.|Common stock|2018-02-06|09:00|      16.5|    16.5|    16.5|    16.5|  09|    00|
|     OMV|              OMV AG|Common stock|2018-02-06|09:00|      48.8|    48.8|    48.8|    48.8|  09|    00|
|     VAS|      VOESTALPINE AG|Common stock|2018-02-06|09:00|     49.63|   49.64|   49.63|   49.63|  09|    00|
|     AUS|AT+S AUSTR.T.+SYS...|Common stock|2018-02-06|09:00|      21.6|    21.6|    21.4|    21.6|  09|    00|
+--------+--------------------+------------+----------+-----+----------+--------+--------+--------+----+

In [13]:
df1.printSchema() # show schema of dataset, still need to correct Hour and Minute

root
 |-- Mnemonic: string (nullable = true)
 |-- SecurityDesc: string (nullable = true)
 |-- SecurityType: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- StartPrice: double (nullable = true)
 |-- MaxPrice: double (nullable = true)
 |-- MinPrice: double (nullable = true)
 |-- EndPrice: double (nullable = true)
 |-- Hour: string (nullable = true)
 |-- Minute: string (nullable = true)



In [14]:
# cast col Hour and Minute to int 
df1 = df1.withColumn('Hour', df1['Hour'].cast('int'))
df1 = df1.withColumn('Minute', df1['Minute'].cast('int'))

In [15]:
# https://www.xetra.com/xetra-en/trading/trading-calendar-and-trading-hours
# according to xetra, the trading hour for stocks is 9:00-17:30
# so we select data in this range only
df2 = df1.filter(df1['Hour'] >= 9)

In [16]:
df3 = df2.filter((df2['Hour'] <= 17) | ((df2['Hour'] == 17) & (df2['Minute'] <= 30)))

In [17]:
df3.show(10) # show first 10 lines

+--------+--------------------+------------+----------+-----+----------+--------+--------+--------+----+------+
|Mnemonic|        SecurityDesc|SecurityType|      Date| Time|StartPrice|MaxPrice|MinPrice|EndPrice|Hour|Minute|
+--------+--------------------+------------+----------+-----+----------+--------+--------+--------+----+------+
|    SANT|S+T AG (Z.REG.MK....|Common stock|2018-02-06|09:00|     20.04|   20.04|   19.91|   19.95|   9|     0|
|     1FC|    FACC AG INH.AKT.|Common stock|2018-02-06|09:00|      16.5|    16.5|    16.5|    16.5|   9|     0|
|     OMV|              OMV AG|Common stock|2018-02-06|09:00|      48.8|    48.8|    48.8|    48.8|   9|     0|
|     VAS|      VOESTALPINE AG|Common stock|2018-02-06|09:00|     49.63|   49.64|   49.63|   49.63|   9|     0|
|     AUS|AT+S AUSTR.T.+SYS...|Common stock|2018-02-06|09:00|      21.6|    21.6|    21.4|    21.6|   9|     0|
|    1NBA|ANHEUSER-BUSCH INBEV|Common stock|2018-02-06|09:00|     86.76|   86.76|   86.76|   86.76|   9|

In [18]:
df3.createOrReplaceTempView('df3') # create view of df3

In [19]:
# The Mnemonics of the 20 stocks we want to analyze
stocks = ['AMZ', 'EBA', 'NFC', 'FB2A', 'MSF', 'TWR', 'DBK', 'DAI', 'CBK', 'ALV', 'BMW', 'AIR', 'VOW3', 'SIE', 'PHI1', 'ADS', 'CON', 'BAS', 'BAYN', '1COV']

In [20]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, desc, col

# this function will select stock with name stockname,
# aggregate the by-minute data, to collect the by-day data of the stock
# and save the file to s3
def create_csv(stockname):
    # first filter out rows for stockname and create view for it
    stock1 = df3.filter(df3['Mnemonic'] == stockname)
    stock1.createOrReplaceTempView('stock1')
    
    # use window to select the MaxPrice in a day and store it in max_df
    window = Window.partitionBy(stock1['Date']).orderBy(df['MaxPrice'].desc())
    max_df = stock1.select('*', rank().over(window).alias('Max')).filter(col('Max') <= 1)
    max_df.createOrReplaceTempView('max')
    
    # use window1 to select the MinPrice in a day and store it in min_df
    window1 = Window.partitionBy(stock1['Date']).orderBy(df['MinPrice'].asc())
    min_df = stock1.select('*', rank().over(window1).alias('Min')).filter(col('Min') <= 1)
    min_df.createOrReplaceTempView('min')
    
    # use window again to create 2 cols: time_rank an time_rankdown which will
    # keep track of the rank of each record according to time
    ranked = stock1.withColumn("time_rank",rank().over(Window.partitionBy(stock1['Date']).orderBy("Hour", "Minute")))
    rankdown = ranked.withColumn("time_rankdown",rank().over(Window.partitionBy(ranked['Date']).orderBy(desc("Hour"), desc("Minute"))))
    # and then select the first and last entry in a day
    # to get the start and end price of a stock
    start_end = rankdown.filter((col('time_rank') == 1) | (col('time_rankdown') == 1))
    start_end.createOrReplaceTempView('start_end')
    
    # join the stock df and max to add max price in a day
    final = spark.sql("""SELECT s.Mnemonic, s.Date, a.MaxPrice
FROM max a INNER JOIN stock1 s On a.Max == 1 AND s.Date == a.Date
GROUP BY s.Mnemonic, s.Date, a.MaxPrice
""")
    final.createOrReplaceTempView('final')
    
    # join the resulting df and min to add min price in a day
    final = spark.sql("""SELECT f.Mnemonic, f.Date, f.MaxPrice, i.MinPrice
FROM min i INNER JOIN final f On i.Min == 1 AND i.Date == f.Date
GROUP BY f.Mnemonic, f.Date, f.MaxPrice, i.MinPrice
""")
    final.createOrReplaceTempView('final')
    
    # join the resulting df and start_end to add start price in a day
    final = spark.sql("""SELECT f.Mnemonic, f.Date, f.MaxPrice, f.MinPrice, s.StartPrice
FROM start_end s INNER JOIN final f On s.time_rank == 1 AND s.Date == f.Date
GROUP BY f.Mnemonic, f.Date, f.MaxPrice, f.MinPrice, s.StartPrice
""")
    final.createOrReplaceTempView('final')
    
    # join the resulting df and start_end to add end price in a day
    final = spark.sql("""SELECT f.Mnemonic, f.Date, f.MaxPrice, f.MinPrice, f.StartPrice, s.EndPrice
FROM start_end s INNER JOIN final f On s.time_rankdown == 1 AND s.Date == f.Date
GROUP BY f.Mnemonic, f.Date, f.MaxPrice, f.MinPrice, f.StartPrice, s.EndPrice
""")
    
    # order the records by date
    final = final.orderBy('Date')
    
    # store the resulting df to s3
    path = 's3://hzhang502/' + stockname
    final.write.format("csv").option("header","true").mode("Overwrite").save(path)

In [21]:
# run create_csv on each stock we want to study to get separte csvs in s3
for s in stocks:
    create_csv(s)