# **Homework 4**

Let's get some quick practice with your new Spark DataFrame skills, you will be asked some basic questions about some stock market data, in this case Kroger Stock data.

In [None]:
#Step 1: Install Dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.3.0-bin-hadoop3.tgz
!pip install -q findspark

#Step 2: Add environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.3.0-bin-hadoop3"

#Step 3: Initialize Pyspark
import findspark
findspark.init()

In [None]:
#creating spark context
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext, Row
from pyspark.sql import functions as sf
from pyspark.sql.functions import concat, lit, col
import datetime
from pyspark.sql.functions import year, month, dayofmonth
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

In [None]:
#import the json library
import json
#open and read the json file
#open('kroger_stock.csv') #.readlines()
#spark.sql("CREATE DATABASE learn_spark_db")
#spark.sql("USE learn_spark_db")
# learn_spark_db already exists

# Load a text file and convert each line to a tuple.
#lines = sc.textFile('kroger_stock.csv')
#header = lines.first() #extract header
#lines = lines.filter(lambda r: r != header)
df = spark.read.load('kroger_stock.csv',
                     format="csv", sep=",", inferSchema="true", header="true")

# Commented out is if we do everything manually with lambda, found the above
# function at:
# https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations
#parts = lines.map(lambda l: l.split(",")) # l is an L, split(",") specifies at comma
#rdd1 = sc.parallelize(parts)

#krogers = parts.map(lambda p: Row(Date=str(p[0]), Open=float(p[1]), 
#                                  High=float(p[2]), Low=float(p[3]), Close=float(p[4]),
#                                  Volume=int(p[5]), Adj_Close=float(p[6])))

#creating a dataframe
#df = spark.createDataFrame(krogers)

#### Use the kroger_stock.csv file to Answer and complete the  tasks below!

#### What are the column names?

In [None]:
df.show()


+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000

Column names are: Date, Open, High, Low, Close, Volume, and Adj Close (converted to Adj_Close)


#### What does the Schema look like?

In [None]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



#### Print out the first 5 columns.

In [None]:
df.show(5) # top 5 rows
df.select(df.columns[0:5]).show() # first 5 columns
#df.show(5) # making sure it's just a temp view

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:00|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:00|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows

+--

#### Use describe() to learn about the DataFrame.

In [None]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

#### Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.

In [None]:
#df.withColumn("HV Ratio", df.High + ':' + df.Volume).show()
#df.withColumn("HV Ratio", print(df.High + ':' + df.Volume)).show()

#running some querries
#ratio = spark.sql("SELECT High, Volume FROM df").show()

#ratio_col = df.map(lambda r: r.High + ':' + r.Volume).collect()
#df.High = map(lambda s: str(df.High))
#df.Volume = map(lambda s: str(df.Volume))


#df.withColumn("HV Ratio", str(df.High) + ':' + str(df.Volume))
#df.withColumn("HV Ratio", df.High + ':' + df.Volume)


df_HV = df.withColumn('HV_Ratio', concat(col('High'),lit(" : "),col('Volume'))).show()

# Or divided if that's what's meant by ratio

df.createOrReplaceTempView("df")
df.withColumn("HV Ratio", df.High/df.Volume).show()

+-------------------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|            HV_Ratio|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|2012-01-03 00:00:00|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|61.060001 : 12668800|
|2012-01-04 00:00:00|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475| 60.349998 : 9593300|
|2012-01-05 00:00:00|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|59.619999 : 12768200|
|2012-01-06 00:00:00|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51

#### What day had the Peak High in Price?

In [None]:
df.describe().show()
peak_high = spark.sql("SELECT Date, High FROM df WHERE High = (SELECT MAX(High) FROM df)")
peak_high.show()


+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

#### What is the mean of the Close column?

In [None]:
df.describe().show()
df.agg({'Close': 'mean'}).show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

#### What is the max and min of the Volume column?

In [None]:
df.describe().show()
vol_max = spark.sql("SELECT Volume FROM df WHERE Volume == (SELECT MAX(Volume) FROM df)")
vol_min = spark.sql("SELECT Volume FROM df WHERE Volume == (SELECT MIN(Volume) FROM df)")
vol_maxmin = vol_max.union(vol_min)
#vol_maxmin.show()
#vol_maxmin.
vol_maxmin_renamed = vol_maxmin.withColumnRenamed("Volume", "Volume Max and Min")
vol_maxmin_renamed.show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

#### How many days was the Close lower than 60 dollars?

In [None]:
close_less60 = spark.sql("SELECT Date, Close FROM df WHERE Close < 60")
close_less60.show()
close_less60.count()

+-------------------+------------------+
|               Date|             Close|
+-------------------+------------------+
|2012-01-04 00:00:00|59.709998999999996|
|2012-01-05 00:00:00|         59.419998|
|2012-01-06 00:00:00|              59.0|
|2012-01-09 00:00:00|             59.18|
|2012-01-10 00:00:00|59.040001000000004|
|2012-01-11 00:00:00|         59.400002|
|2012-01-12 00:00:00|              59.5|
|2012-01-13 00:00:00|59.540001000000004|
|2012-01-17 00:00:00|         59.849998|
|2012-02-22 00:00:00|         58.599998|
|2012-02-23 00:00:00|58.540001000000004|
|2012-02-24 00:00:00|58.790001000000004|
|2012-02-27 00:00:00|58.459998999999996|
|2012-02-28 00:00:00|             58.93|
|2012-02-29 00:00:00|         59.080002|
|2012-03-01 00:00:00|             58.82|
|2012-03-02 00:00:00|59.009997999999996|
|2012-03-05 00:00:00|         59.400002|
|2012-03-06 00:00:00|         58.970001|
|2012-03-07 00:00:00|59.860001000000004|
+-------------------+------------------+
only showing top

81

It counted 81 total days. 

#### What percentage of the time was the High greater than 80 dollars ?
#### In other words, (Number of Days High>80)/(Total Days in the dataset)

In [None]:
total_days = spark.sql("SELECT Date FROM df").count()
high_over80 = spark.sql("SELECT High FROM df WHERE High > 80").count()
perc_high_over80 = total_days/high_over80
print(perc_high_over80)

10.939130434782609


#### What is the max High per year?

In [None]:
#df.createOrReplaceTempView("df")
#df.select(year("date").alias('Year'), df.High) # year_maxHigh_df = 
#df.show()

#df.groupBy(year("date").alias('Year'))
#df.show()

df.groupBy(year("date").alias('Year')).max("High").show()

#year_maxHigh = spark.sql("SELECT * FROM df WHERE High == (SELECT MAX(High) FROM df)").groupBy("Year")
#year_maxHigh.show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



#### In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months.

#### What is the average Close for each Calendar Month? 

In [None]:
df.groupBy(month("date").alias('Month')).agg({'Close': 'mean'}).show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|   12|72.84792478301885|
|    1|71.44801958415842|
|    6| 72.4953774245283|
|    3|71.77794377570092|
|    5|72.30971688679247|
|    9|72.18411785294116|
|    4|72.97361900952382|
|    8|73.02981855454546|
|    7|74.43971943925233|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|    2|  71.306804443299|
+-----+-----------------+

