**Installing Java, Spark and FindSpark**

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.2.3/spark-2.2.3-bin-hadoop2.7.tgz
!tar xf spark-2.2.3-bin-hadoop2.7.tgz
!pip install -q findspark

**Setting Environment Variables** : Location where Java and Spark will be installed





In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.2.3-bin-hadoop2.7"

**Starting a local Spark Session**

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession


spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark import HiveContext

In [0]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [0]:
sqlContext = HiveContext(sc)

**Importing the data given in csv format**

In [0]:
tick_df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('tick_data.csv')

**Checking the Schema of the data**

In [0]:
tick_df.printSchema()

root
 |-- DATE: integer (nullable = true)
 |-- TIME_S: string (nullable = true)
 |-- SYM_ROOT: string (nullable = true)
 |-- SIZE: integer (nullable = true)
 |-- TRADE: double (nullable = true)



**1. Using Spark Dataframe, calculate total trading volume for each stock in a certain hour**

In [0]:
tick_df.registerTempTable("tick_df")

part1_df = sqlContext.sql("select SYM_ROOT, DATE, substr(TIME_S,1,2) as TIME_H, sum(SIZE) as SIZE_H         \
                           from tick_df                                                                     \
                           group by SYM_ROOT, DATE, substr(TIME_S,1,2)                                      \
                           order by SYM_ROOT, DATE, substr(TIME_S,1,2)")
part1_df.show()

+--------+--------+------+------+
|SYM_ROOT|    DATE|TIME_H|SIZE_H|
+--------+--------+------+------+
|    AAPL|20120103|    09| 21092|
|    AAPL|20120103|    10| 12041|
|    AAPL|20120103|    11|  9473|
|    AAPL|20120103|    12|  5718|
|    AAPL|20120103|    13|  8103|
|    AAPL|20120103|    14|  7783|
|    AAPL|20120103|    15| 15334|
|    GOOG|20120103|    09|  8314|
|    GOOG|20120103|    10|  8080|
|    GOOG|20120103|    11|  6776|
|    GOOG|20120103|    12|  6619|
|    GOOG|20120103|    13|  6517|
|    GOOG|20120103|    14|  8453|
|    GOOG|20120103|    15| 17496|
|     IBM|20120103|    09| 14064|
|     IBM|20120103|    10| 12055|
|     IBM|20120103|    11| 12863|
|     IBM|20120103|    12| 11694|
|     IBM|20120103|    13|  5109|
|     IBM|20120103|    14|  9033|
+--------+--------+------+------+
only showing top 20 rows



**2. Assuming TRADE reflects the stock price at the time, calculate hourly return of
each stock with the given formula. The dataframe should be built on top of part 1 and include SYM_ROOT,DATE,TIME_H, SIZE_H,RETURN_H
where RETURN_H represents the hourly return that is calculated from the formula.**

In [0]:
# Calculating price of the 1st trade for every stock in a certain hour

pt1_df = sqlContext.sql("select SYM_ROOT, DATE, TIME_H, FIRST_TRADE_PRICE                                                    \
                      from                                                                                                   \
                      (                                                                                                      \
                        select SYM_ROOT, DATE, substr(TIME_S,1,2) as TIME_H,                                                 \
                        rank() over(partition by SYM_ROOT, DATE, substr(TIME_S,1,2) order by TIME_S asc) as trade_number,    \
                        sum(TRADE) as FIRST_TRADE_PRICE                                                                      \
                        from tick_df                                                                                         \
                        group by SYM_ROOT, DATE, TIME_S                                                                      \
                      ) as pt1                                                                                               \
                      where trade_number = 1")

pt1_df.show()

+--------+--------+------+------------------+
|SYM_ROOT|    DATE|TIME_H| FIRST_TRADE_PRICE|
+--------+--------+------+------------------+
|    AAPL|20120103|    13|            410.26|
|    MSFT|20120103|    15|             53.96|
|    AAPL|20120103|    09|            409.21|
|     IBM|20120103|    09|            187.67|
|    GOOG|20120103|    14|            660.36|
|    MSFT|20120103|    11|             26.78|
|    AAPL|20120103|    12|            411.22|
|     IBM|20120103|    12|            188.13|
|    AAPL|20120103|    15|            412.17|
|    AAPL|20120103|    10|            411.68|
|     IBM|20120103|    11|            188.54|
|     IBM|20120103|    10|            188.33|
|     IBM|20120103|    15|            186.87|
|    GOOG|20120103|    11|            659.74|
|    MSFT|20120103|    14|             26.93|
|    GOOG|20120103|    15|            665.28|
|    GOOG|20120103|    12|            662.06|
|    GOOG|20120103|    10|            659.42|
|    MSFT|20120103|    10|        

In [0]:
# Calculating price of the last trade for every stock in a certain hour

ptn_df = sqlContext.sql("select SYM_ROOT, DATE, TIME_H, LAST_TRADE_PRICE                                                     \
                      from                                                                                                   \
                      (                                                                                                      \
                        select SYM_ROOT, DATE, substr(TIME_S,1,2) as TIME_H,                                                 \
                        rank() over(partition by SYM_ROOT, DATE, substr(TIME_S,1,2) order by TIME_S desc) as trade_number,   \
                        sum(TRADE) as LAST_TRADE_PRICE                                                                       \
                        from tick_df                                                                                         \
                        group by SYM_ROOT, DATE, TIME_S                                                                      \
                      ) as ptn                                                                                               \
                      where trade_number = 1")

ptn_df.show()

+--------+--------+------+----------------+
|SYM_ROOT|    DATE|TIME_H|LAST_TRADE_PRICE|
+--------+--------+------+----------------+
|    AAPL|20120103|    13|          411.21|
|    MSFT|20120103|    15|           26.82|
|    AAPL|20120103|    09|          411.38|
|     IBM|20120103|    09|          188.25|
|    GOOG|20120103|    14|          664.96|
|    MSFT|20120103|    11|           26.88|
|    AAPL|20120103|    12|          410.19|
|     IBM|20120103|    12|          373.17|
|    AAPL|20120103|    15|          822.91|
|    AAPL|20120103|    10|          411.23|
|     IBM|20120103|    11|          188.16|
|     IBM|20120103|    10|          188.48|
|     IBM|20120103|    15|           186.4|
|    GOOG|20120103|    11|          661.86|
|    MSFT|20120103|    14|           26.98|
|    GOOG|20120103|    15|          665.89|
|    GOOG|20120103|    12|          659.56|
|    GOOG|20120103|    10|          659.95|
|    MSFT|20120103|    10|           26.77|
|     IBM|20120103|    13|      

In [0]:
# Calculating hourly return of each stock using the given formula

pt1_df.registerTempTable("pt1_df")
ptn_df.registerTempTable("ptn_df")

return_df = sqlContext.sql("select pt1_df.SYM_ROOT, pt1_df.DATE, pt1_df.TIME_H,                                                               \
                            ((LAST_TRADE_PRICE-FIRST_TRADE_PRICE)/FIRST_TRADE_PRICE) as RETURN_H                                              \
                            from pt1_df inner join ptn_df                                                                                     \
                            on (pt1_df.SYM_ROOT = ptn_df.SYM_ROOT) and (pt1_df.DATE = ptn_df.DATE) and (pt1_df.TIME_H = ptn_df.TIME_H)")

return_df.show()

+--------+--------+------+--------------------+
|SYM_ROOT|    DATE|TIME_H|            RETURN_H|
+--------+--------+------+--------------------+
|    AAPL|20120103|    13|0.002315604738458...|
|    MSFT|20120103|    15| -0.5029651593773166|
|    AAPL|20120103|    09|0.005302900711126356|
|     IBM|20120103|    09|0.003090531251665224|
|    GOOG|20120103|    14|0.006965897389302839|
|    MSFT|20120103|    11|0.003734129947722101|
|    AAPL|20120103|    12|-0.00250474198725...|
|     IBM|20120103|    12|  0.9835751873704355|
|    AAPL|20120103|    15|  0.9965305577795568|
|    AAPL|20120103|    10|-0.00109308200544...|
|     IBM|20120103|    11|-0.00201548742972...|
|     IBM|20120103|    10|7.964742738808329E-4|
|     IBM|20120103|    15|-0.00251511746133...|
|    GOOG|20120103|    11|0.003213387091884...|
|    MSFT|20120103|    14|0.001856665428889...|
|    GOOG|20120103|    15|9.169071669071874E-4|
|    GOOG|20120103|    12|-0.00377609280125...|
|    GOOG|20120103|    10|8.037366170272

In [0]:
# Appending the obtained hourly return information to the output from 1st part

part1_df.registerTempTable("part1_df")
return_df.registerTempTable("return_df")

part2_df = sqlContext.sql("select part1_df.SYM_ROOT, part1_df.DATE, part1_df.TIME_H, part1_df.SIZE_H, return_df.RETURN_H                                   \
                           from part1_df inner join return_df                                                                                              \
                           on (part1_df.SYM_ROOT = return_df.SYM_ROOT) and (part1_df.DATE = return_df.DATE) and (part1_df.TIME_H = return_df.TIME_H)")

part2_df.show()

+--------+--------+------+------+--------------------+
|SYM_ROOT|    DATE|TIME_H|SIZE_H|            RETURN_H|
+--------+--------+------+------+--------------------+
|    AAPL|20120103|    13|  8103|0.002315604738458...|
|    MSFT|20120103|    15| 59546| -0.5029651593773166|
|    AAPL|20120103|    09| 21092|0.005302900711126356|
|     IBM|20120103|    09| 14064|0.003090531251665224|
|    GOOG|20120103|    14|  8453|0.006965897389302839|
|    MSFT|20120103|    11| 33155|0.003734129947722101|
|    AAPL|20120103|    12|  5718|-0.00250474198725...|
|     IBM|20120103|    12| 11694|  0.9835751873704355|
|    AAPL|20120103|    15| 15334|  0.9965305577795568|
|    AAPL|20120103|    10| 12041|-0.00109308200544...|
|     IBM|20120103|    11| 12863|-0.00201548742972...|
|     IBM|20120103|    10| 12055|7.964742738808329E-4|
|     IBM|20120103|    15| 14450|-0.00251511746133...|
|    GOOG|20120103|    11|  6776|0.003213387091884...|
|    MSFT|20120103|    14| 13715|0.001856665428889...|
|    GOOG|

**3. Sort the output by SYM_ROOT, DATE, and TIME_H. **

In [0]:
part2_df.registerTempTable("part2_df")

part3_df = sqlContext.sql("select * from part2_df order by SYM_ROOT, DATE, TIME_H")

part3_df.show()

+--------+--------+------+------+--------------------+
|SYM_ROOT|    DATE|TIME_H|SIZE_H|            RETURN_H|
+--------+--------+------+------+--------------------+
|    AAPL|20120103|    09| 21092|0.005302900711126356|
|    AAPL|20120103|    10| 12041|-0.00109308200544...|
|    AAPL|20120103|    11|  9473|-1.45857642940495...|
|    AAPL|20120103|    12|  5718|-0.00250474198725...|
|    AAPL|20120103|    13|  8103|0.002315604738458...|
|    AAPL|20120103|    14|  7783|0.002066567795580...|
|    AAPL|20120103|    15| 15334|  0.9965305577795568|
|    GOOG|20120103|    09|  8314|0.006056815061178277|
|    GOOG|20120103|    10|  8080|8.037366170272154E-4|
|    GOOG|20120103|    11|  6776|0.003213387091884...|
|    GOOG|20120103|    12|  6619|-0.00377609280125...|
|    GOOG|20120103|    13|  6517|  1.0001514784294718|
|    GOOG|20120103|    14|  8453|0.006965897389302839|
|    GOOG|20120103|    15| 17496|9.169071669071874E-4|
|     IBM|20120103|    09| 14064|0.003090531251665224|
|     IBM|

In [0]:
# Exporting

!apt-get install -y -qq software-properties-common python-software-properties module-init-tools
!add-apt-repository -y ppa:alessandro-strada/ppa 2>&1 > /dev/null
!apt-get update -qq 2>&1 > /dev/null
!apt-get -y install -qq google-drive-ocamlfuse fuse
from google.colab import auth
auth.authenticate_user()
from oauth2client.client import GoogleCredentials
creds = GoogleCredentials.get_application_default()
import getpass
!google-drive-ocamlfuse -headless -id={creds.client_id} -secret={creds.client_secret} < /dev/null 2>&1 | grep URL
vcode = getpass.getpass()
!echo {vcode} | google-drive-ocamlfuse -headless -id={creds.client_id} -secret={creds.client_secret}

In [0]:
!mkdir -p drive
!google-drive-ocamlfuse drive

In [0]:
part3_df.repartition(1).write.format('com.databricks.spark.csv') \
    .option('header', 'true') \
    .save('/content/drive/abd_hw3/')