In [6]:
from IPython.display import display, HTML
display(HTML('<style>pre { white-space: pre !important; }</style>'))

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

import pyspark.sql.functions as func
from pyspark.sql.functions import year, month, to_date
from pyspark.sql.functions import min, max
from pyspark.sql.functions import mean, stddev
from pyspark.sql.functions import format_number

# Warm-Up!

In [7]:
spark = SparkSession.builder \
    .appName("StockMarketAnalysis") \
    .getOrCreate()

stocks_df = spark.read.csv("stocks.csv", header=True)

stocks_df.printSchema()

print(stocks_df.count())
stocks_df.show()

KeyboardInterrupt: 

In [29]:
filtered_by_closing_price = stocks_df.filter(stocks_df["Close"] < 500)

selected_columns_df = filtered_by_closing_price.select("Open", "Close", "Volume")

def show_df_info(df):
    df_count = df.count()
    print(f"Number of rows: {df_count}")
    df.show()

show_df_info(selected_columns_df)

Number of rows: 1359
+------------------+------------------+---------+
|              Open|             Close|   Volume|
+------------------+------------------+---------+
|        213.429998|        214.009998|123432400|
|        214.599998|        214.379993|150476200|
|        214.379993|        210.969995|138040000|
|            211.75|            210.58|119282800|
|        210.299994|211.98000499999998|111902700|
|212.79999700000002|210.11000299999998|115557400|
|209.18999499999998|        207.720001|148614900|
|        207.870005|        210.650002|151473000|
|210.11000299999998|            209.43|108223500|
|210.92999500000002|            205.93|148516900|
|        208.330002|        215.039995|182501900|
|        214.910006|            211.73|153038200|
|        212.079994|        208.069996|152038600|
|206.78000600000001|            197.75|220441900|
|202.51000200000001|        203.070002|266424900|
|205.95000100000001|        205.940001|466777500|
|        206.849995|        2

In [30]:
filtered_records_df = stocks_df.filter((stocks_df["Open"] > 200) & (stocks_df["Close"] < 200))

show_df_info(filtered_records_df)

Number of rows: 3
+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



In [31]:
stocks_df_with_year = stocks_df.withColumn("Year", year("Date"))
show_df_info(stocks_df_with_year)

Number of rows: 1762
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2

In [33]:
min_volume_by_year = stocks_df_with_year.groupBy("Year").agg(min("Volume").alias("minVolume"))
show_df_info(min_volume_by_year)

Number of rows: 7
+----+---------+
|Year|minVolume|
+----+---------+
|2010|100901500|
|2011|100110500|
|2012|100023000|
|2013|100345700|
|2014|100092000|
|2015|101217500|
|2016|110888700|
+----+---------+



In [36]:
stocks_df_with_year_month = stocks_df.withColumn("Year", year("Date")).withColumn("Month", month("Date"))
max_low_price = stocks_df_with_year_month.groupBy("Year", "Month").agg(max("Low").alias("maxLow"))

show_df_info(max_low_price)

Number of rows: 84
+----+-----+------------------+
|Year|Month|            maxLow|
+----+-----+------------------+
|2010|    1|        213.249994|
|2010|    2|        202.000004|
|2010|    3|        234.459999|
|2010|    4|268.19001000000003|
|2010|    5|        262.880009|
|2010|    6|        271.499992|
|2010|    7|        260.300003|
|2010|    8|        260.549995|
|2010|    9|        291.009998|
|2010|   10|        314.289997|
|2010|   11|        316.759987|
|2010|   12|        325.099991|
|2011|    1|        344.440006|
|2011|    2|             360.5|
|2011|    3|        357.750004|
|2011|    4|        350.300007|
|2011|    5|        346.880009|
|2011|    6|        344.649998|
|2011|    7|399.67998900000003|
|2011|    8|        392.369995|
+----+-----+------------------+
only showing top 20 rows



In [42]:
mean_stddev_high_df = stocks_df.agg(
    format_number(mean("High"), 2).alias("MeanHigh"), 
    format_number(stddev("High"), 2).alias("StdDevHigh")
)
mean_stddev_high_df.show(truncate=False)

+--------+----------+
|MeanHigh|StdDevHigh|
+--------+----------+
|315.91  |186.90    |
+--------+----------+



# Main Task

In [5]:
spark = SparkSession.builder \
    .appName("SpotifyAnalysis") \
    .getOrCreate()

KeyboardInterrupt: 

In [None]:
spotify_data = spark.read.parquet("spotify.parquet")