# Warm-Up Section

## Libraries

In [1]:
!pip install pyspark


Defaulting to user installation because normal site-packages is not writeable


In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
import os
os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'

## Instantiating SparkSession

In [4]:
spark = SparkSession.builder \
    .appName("CSV Processing") \
    .getOrCreate()

## 1. Reading the CSV File

In [5]:
df = spark.read.csv("./data/stocks.csv", header=True, inferSchema=True)

## 2. Demonstrating the Schema

In [6]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



## 3. Filtering Records: Closing Price < 500; Selecting 'Open', 'Close', and 'Volume' Columns

In [7]:
offset, limit = 0, 10
(df
 .filter(df["Close"] < 500)
 .select("Open", "Close", "Volume")
 .offset(offset)
 .limit(limit)
 .show())

+------------------+------------------+---------+
|              Open|             Close|   Volume|
+------------------+------------------+---------+
|        213.429998|        214.009998|123432400|
|        214.599998|        214.379993|150476200|
|        214.379993|        210.969995|138040000|
|            211.75|            210.58|119282800|
|        210.299994|211.98000499999998|111902700|
|212.79999700000002|210.11000299999998|115557400|
|209.18999499999998|        207.720001|148614900|
|        207.870005|        210.650002|151473000|
|210.11000299999998|            209.43|108223500|
|210.92999500000002|            205.93|148516900|
+------------------+------------------+---------+



## 4. Filtering Records: Opening Price > 200 & Closing Price < 200

In [8]:
(df
 .filter((df["Open"] > 200) & (df["Close"] < 200))
 .show())

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



## 5. Extracting the Year

In [9]:
offset, limit = 0, 5
(df.withColumn("Year", F.year("Date"))
 .offset(offset)
 .limit(limit)
 .show())

+----------+----------+----------+------------------+------------------+---------+------------------+----+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
+----------+----------+----------+------------------+------------------+---------+------------------+----+



## 6. Showing Minimum Volume for Each Year

In [10]:
(df
 .withColumn("Year", F.year("Date"))
 .groupby("Year")
 .min("Volume")
 .withColumnRenamed("min(Volume)", "minVolume")
 .sort("Year")
 .show())

+----+---------+
|Year|minVolume|
+----+---------+
|2010| 39373600|
|2011| 44915500|
|2012| 43938300|
|2013| 41888700|
|2014| 14479600|
|2015| 13046400|
|2016| 11475900|
+----+---------+



## 7. Finding the Highest Low Price for Each Year and Month

In [11]:
offset, limit = 0, 10
(df
 .withColumn("Year", F.year("Date"))
 .withColumn("Month", F.month("Date"))
 .groupby("Year", "Month")
 .max("Low")
 .withColumnRenamed("max(Low)", "maxLow")
 .sort("Year", "Month")
 .offset(offset)
 .limit(limit)
 .show(n=limit))

+----+-----+------------------+
|Year|Month|            maxLow|
+----+-----+------------------+
|2010|    1|        213.249994|
|2010|    2|        202.000004|
|2010|    3|        234.459999|
|2010|    4|268.19001000000003|
|2010|    5|        262.880009|
|2010|    6|        271.499992|
|2010|    7|        260.300003|
|2010|    8|        260.549995|
|2010|    9|        291.009998|
|2010|   10|        314.289997|
+----+-----+------------------+



## 8. Calculating Mean and Standard Deviation of High Price

### Solution 1

In [12]:
(df
 .select("High")
 .summary()
 .where(F.col('summary').isin(['stddev', 'mean']))
 .withColumn("High", F.col("High").cast("double"))
 .select(F.col('summary').alias('Criterion'), 
         F.format_number(F.col('High'), 2).alias('Value'))
 .show())

+---------+------+
|Criterion| Value|
+---------+------+
|     mean|315.91|
|   stddev|186.90|
+---------+------+



### Solution 2

In [13]:
(df
 .select(F.format_number(F.mean("High"), 2).alias("Mean"),
         F.format_number(F.stddev("High"), 2).alias("Std"))
 .show())

+------+------+
|  Mean|   Std|
+------+------+
|315.91|186.90|
+------+------+



### Solution 3: (As Manual as Possible)

In [14]:
(df
 .crossJoin(df.agg((F.sum("High") / F.count("High")).alias("Mean")))
 .agg(F.first("Mean").alias("Mean"), 
      F.sqrt(F.sum((F.col("High") - F.col("Mean")) ** 2) / (F.count("High") - 1)).alias("Std"))
 .select(F.format_number("Mean", 2).alias("Mean"),
         F.format_number("Std", 2).alias("Std"))
 .show())

+------+------+
|  Mean|   Std|
+------+------+
|315.91|186.90|
+------+------+



In [18]:
%%script echo "skipped"
spark.stop()

Couldn't find program: 'echo'
