In [11]:
from pyspark.sql import SparkSession

# Get existed or Create new SparkSession
spark = SparkSession.builder.appName('Spark Demo').master('local[*]').getOrCreate()
spark

In [30]:
from pyspark.sql import functions as F

# Warm-Up!

## Q1

In [19]:
data_path = 'stocks.csv'
df = spark.read.csv(data_path, header=True, inferSchema=True)


## Q2

In [20]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



## Q3

In [25]:
df.where(df['Close']<500).select(df.Open, df.Close, df.Volume).show(10)

+------------------+------------------+---------+
|              Open|             Close|   Volume|
+------------------+------------------+---------+
|        213.429998|        214.009998|123432400|
|        214.599998|        214.379993|150476200|
|        214.379993|        210.969995|138040000|
|            211.75|            210.58|119282800|
|        210.299994|211.98000499999998|111902700|
|212.79999700000002|210.11000299999998|115557400|
|209.18999499999998|        207.720001|148614900|
|        207.870005|        210.650002|151473000|
|210.11000299999998|            209.43|108223500|
|210.92999500000002|            205.93|148516900|
+------------------+------------------+---------+
only showing top 10 rows



## Q4

In [31]:
df.where((F.col('Close')<200) & (F.col('Open') > 200)).show(10)

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



## Q5

In [35]:
df2 = df.withColumn('Year',F.year(F.to_timestamp('Date', 'dd/MM/yyyy')))

In [36]:
df2.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
|2010-01-11|212.

## Q6

In [75]:
df4.groupBy('Year').agg(F.min('Volume').alias('minVolume')).show()

+----+---------+
|Year|minVolume|
+----+---------+
|2015| 13046400|
|2013| 41888700|
|2014| 14479600|
|2012| 43938300|
|2016| 11475900|
|2010| 39373600|
|2011| 44915500|
+----+---------+



## Q7

In [43]:
df3 = df2.withColumn('Month',F.month(F.to_timestamp('Date', 'dd/MM/yyyy')))

In [44]:
df3.show(10)

+----------+------------------+------------------+------------------+------------------+---------+------------------+----+-----+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|Month|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+-----+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|    1|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|    1|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|    1|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|    1|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700

In [58]:
df4 = df3.withColumn('Year/Month',F.concat('Year', f.lit("/"), 'Month'))

In [59]:
df4.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+----+-----+----------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|Month|Year/Month|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+-----+----------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|    1|    2010/1|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|    1|    2010/1|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|    1|    2010/1|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|    1|    2010/1|
|2010-01-08|        

In [73]:
df4.groupBy('Year/Month').agg(F.max('Low').alias('maxLow')).show()

+----------+------------------+
|Year/Month|            maxLow|
+----------+------------------+
|    2013/6|447.38999900000005|
|    2011/9|        412.000004|
|    2014/8|        102.199997|
|    2016/4|111.33000200000001|
|    2010/2|        202.000004|
|   2015/11|        121.620003|
|   2012/11|        594.170021|
|    2012/4| 626.0000150000001|
|    2014/6|        644.470024|
|    2010/1|        213.249994|
|    2011/2|             360.5|
|    2014/2| 545.6099780000001|
|    2011/1|        344.440006|
|    2010/5|        262.880009|
|   2011/12|        403.490009|
|   2013/11|        547.809975|
|   2011/11|        401.560005|
|    2015/2|        131.169998|
|   2014/12|        115.290001|
|   2010/10|        314.289997|
+----------+------------------+
only showing top 20 rows



## Q8

In [92]:
df.agg(F.round(F.mean('High'), 2).alias('Avg'), F.round(F.std('High'), 2).alias('Std')).show()

+------+-----+
|   Avg|  Std|
+------+-----+
|315.91|186.9|
+------+-----+

