In [1]:
#0: Start a Spark Session and import the libraries you will need.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()

In [2]:
# Read the stock data from the GCP bucket and save it as a data frame.
amzn_df = spark.read.csv("gs://jpstreet-5132/assignment/AMZN.csv", header=True, inferSchema=True)

                                                                                

In [3]:
#1: Display the Schema of the data frame.
amzn_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [4]:
#2: Display the summary statistics for all columns in the data frame.
amzn_df.describe().show()

                                                                                

+-------+----------+-----------------+-----------------+-----------------+------------------+------------------+-------------------+
|summary|      Date|             Open|             High|              Low|             Close|         Adj Close|             Volume|
+-------+----------+-----------------+-----------------+-----------------+------------------+------------------+-------------------+
|  count|      2518|             2518|             2518|             2518|              2518|              2518|               2518|
|   mean|      null|73.82353615726771|74.66189798570285| 72.8813564964257| 73.78004878752976| 73.78004878752976|8.025619868943606E7|
| stddev|      null|53.34565607175615|53.99876250199932|52.61413456378813|53.289557886169824|53.289557886169824|4.230300092993142E7|
|    min|2013-01-02|           12.447|          12.6465|          12.2875|           12.4115|           12.4115|           17626000|
|    max|2022-12-30|       187.199997|       188.654007|       184.83

In [5]:
#3: Find and display the five records that has the highest price in the High column.
amzn_df.orderBy(amzn_df["High"].desc()).show(5)

[Stage 5:>                                                          (0 + 1) / 1]

+----------+----------+----------+----------+----------+----------+---------+
|      Date|      Open|      High|       Low|     Close| Adj Close|   Volume|
+----------+----------+----------+----------+----------+----------+---------+
|2021-07-13|185.104996|188.654007|183.565994|183.867996|183.867996| 76918000|
|2021-11-19|185.634506|188.107498|183.785995|183.828506|183.828506| 98734000|
|2021-07-08|182.177994|187.999496|   181.056|186.570496|186.570496|103612000|
|2021-07-12|187.199997|187.864502|184.839493|185.927505|185.927505| 51432000|
|2021-07-09|186.126007|187.399994|184.669998|185.966995|185.966995| 74964000|
+----------+----------+----------+----------+----------+----------+---------+
only showing top 5 rows



                                                                                

In [6]:
#4: What day had the lowest price? Display the date (as given in the Date column) and the price?
amzn_df.orderBy(amzn_df["Low"]).select("Date", "Low").show(1)

+----------+-------+
|      Date|    Low|
+----------+-------+
|2013-05-01|12.2875|
+----------+-------+
only showing top 1 row



In [7]:
#5: What is the range of the Volume column? (Hint: Range is the difference between max and min values)
volume_range = amzn_df.agg({"Volume": "max"}).first()[0] - amzn_df.agg({"Volume": "min"}).first()[0]
print(f"Range of Volume: {volume_range}")

Range of Volume: 459496000


In [8]:
#6: Calculate the average of the High column. What percent of the observations had a High price greater than the average?
avg_high = amzn_df.agg({"High": "avg"}).first()[0]
percent_greater_than_avg = amzn_df.filter(amzn_df["High"] > avg_high).count() / amzn_df.count() * 100
print(f"Average High: {avg_high}, Percent greater than average: {percent_greater_than_avg:.2f}%")

Average High: 74.66189798570285, Percent greater than average: 47.97%


In [9]:
#7: What is the total Volume per year? Sort the results based on the total Volume in a decreasing order.
amzn_df.withColumn("Year", year(amzn_df["Date"])).groupBy("Year").sum("Volume").orderBy("sum(Volume)", ascending=False).show()

[Stage 22:>                                                         (0 + 1) / 1]

+----+-----------+
|Year|sum(Volume)|
+----+-----------+
|2018|28357952000|
|2020|24950814000|
|2016|20775126000|
|2014|20581334000|
|2019|19493002000|
|2015|19142040000|
|2022|19096256300|
|2017|17654108000|
|2021|17076362000|
|2013|14958114000|
+----+-----------+



                                                                                

In [10]:
#8: What is the average Adjusted Close price for each month? Sort the results in an increasing order based on the average Adjusted Close price.
amzn_df.withColumn("Month", month(amzn_df["Date"])).groupBy("Month").agg({"Adj Close": "avg"}).orderBy("avg(Adj Close)").show()

+-----+-----------------+
|Month|   avg(Adj Close)|
+-----+-----------------+
|    1|67.00627561083743|
|    2|68.95255491623037|
|    5|69.37138398578199|
|    3|69.88013780275226|
|    4|71.78249272596155|
|    6|73.02435191079809|
|   12|75.53066592417065|
|   10|76.20908837556561|
|   11|76.53358077450977|
|    7|77.88898079245286|
|    9|78.86672661951219|
|    8| 79.5366561040724|
+-----+-----------------+



In [11]:
#9: Using SQL commands, create a new data frame that includes the records for the days where closing price is lower than the opening price. 
#   Display the number of records in this new data frame.
amzn_df.createOrReplaceTempView("stocks")
low_close_amzn_df = spark.sql("SELECT * FROM stocks WHERE `Close` < `Open`")
print(f"# records with close price < open price: {low_close_amzn_df.count()}")

# records with close price < open price: 1272


In [12]:
#10: Perform the following using SQL commands: 
#    -Count the number of days in each month of each year during the last three years, 
#      where closing price is at least $5 less than the opening price. 
#    -The resulting data frame should be sorted based on Year first and then Month in increasing order.
recent_years_amzn_df = spark.sql("""
    SELECT Year, Month, COUNT(*) AS Days
    FROM (
        SELECT year(Date) AS Year, month(Date) AS Month
        FROM stocks
        WHERE `Close` <= `Open` - 5 AND year(Date) >= 2020
    )
    GROUP BY Year, Month
    ORDER BY Year, Month
""")
recent_years_amzn_df.show()

+----+-----+----+
|Year|Month|Days|
+----+-----+----+
|2020|    7|   2|
|2020|    9|   4|
|2020|   10|   1|
|2021|    1|   1|
|2021|    2|   1|
|2021|   11|   1|
|2021|   12|   1|
|2022|    1|   3|
|2022|    2|   1|
|2022|    3|   2|
|2022|    4|   3|
|2022|    5|   1|
|2022|    6|   1|
|2022|    8|   1|
|2022|   10|   1|
|2022|   11|   2|
+----+-----+----+

