# Warm-Up Using PySpark!

### Needed Libraries

In [16]:
from IPython.display import display, HTML
display(HTML('<style>pre { white-space: pre !important; }</style>'))
import pyspark as ps
from pyspark.sql.functions import year, month
from pyspark.sql.functions import mean, stddev

### Initializing a SparkSession

In this part we initialize a SparkSession in PySpark and set the configuration for Spark, including setting the master to "local" (indicating it's running on a single machine), naming the application "stock", and configuring some Spark options. If a SparkSession already exists, it gets that session; otherwise, it creates a new one.

In [2]:
spark = ps.sql.SparkSession.builder \
    .master("local") \
    .appName("stock") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/04/12 19:31:11 WARN Utils: Your hostname, Ava resolves to a loopback address: 127.0.1.1; using 172.24.85.192 instead (on interface eth0)
24/04/12 19:31:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/12 19:31:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Read From CSV File

In [3]:
def show_df(df, n=5):
    df.show(n)
    print("number of rows: ",df.count())

In [4]:
stocks_df = spark.read.csv("stocks.csv", header=True, inferSchema=True)
show_df(stocks_df)

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+
only showing top 5 rows

number of rows:  1762


### Schema of Data

In [5]:
stocks_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



As we can see, the schema of the data is automatically inferred by PySpark. All data types are double except for the date column, which has a date data type (yyyy-mm-dd) and the Volume column, which has an integer data type.

### Opening, Closing, and Volume of Records with Opening Price less than 500

In [6]:
closing_less_than_500 = stocks_df.filter(stocks_df['Close'] < 500)
closing_less_than_500_df = closing_less_than_500.select('Open', 'Close', 'Volume')
show_df(closing_less_than_500_df)

+----------+------------------+---------+
|      Open|             Close|   Volume|
+----------+------------------+---------+
|213.429998|        214.009998|123432400|
|214.599998|        214.379993|150476200|
|214.379993|        210.969995|138040000|
|    211.75|            210.58|119282800|
|210.299994|211.98000499999998|111902700|
+----------+------------------+---------+
only showing top 5 rows

number of rows:  1359


Opening, Closing, and Volume of Records with Opening Price less than 500 are shown in the output. The number of records having the circumstance is 1359 which 5 rows are shown in the output.

### Records with Opening Price more than 200 and Closing Price less than 200

In [7]:
opening_more_than_200_closing_less_than_200 = stocks_df.filter((stocks_df['Open'] > 200) & (stocks_df['Close'] < 200))
show_df(opening_more_than_200_closing_less_than_200)

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+

number of rows:  3


The number of records with Opening Price more than 200 and Closing Price less than 200 is 3 which are shown in th output.

### Adding Year Column

We extract the year from the date column and add it as a new column to the DataFrame.

In [8]:
stocks_df_without_year = stocks_df.alias("stocks_df_without_year")
stocks_df = stocks_df.withColumn('Year', year(stocks_df['Date']))
show_df(stocks_df)

+----------+----------+----------+------------------+------------------+---------+------------------+----+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
only showing top 5 rows

number of ro

### Minimum Volume for each Year

We calculate the minimum volume for each year using the groupBy and agg functions.

In [11]:
year_min_volume = stocks_df.groupBy('Year').agg({'Volume':'min'})
year_min_volume = year_min_volume.withColumnRenamed('min(Volume)', 'minVolume')
show_df(year_min_volume, 10)

+----+---------+
|Year|minVolume|
+----+---------+
|2015| 13046400|
|2013| 41888700|
|2014| 14479600|
|2012| 43938300|
|2016| 11475900|
|2010| 39373600|
|2011| 44915500|
+----+---------+

number of rows:  7


### Highest Low Price for each Month and Year

We calculate the highest low price for each month and year using the groupBy and agg functions like we did for the minimum volume for each year.

In [12]:
stocks_df_without_month = stocks_df.alias("stocks_df_without_month")
stocks_df = stocks_df.withColumn('Month', month(stocks_df['Date']))
highest_low_price = stocks_df.groupBy("year", "month").agg({'Low':'max'})
highest_low_price = highest_low_price.withColumnRenamed('max(Low)', 'maxLow')
show_df(highest_low_price, 90)

+----+-----+------------------+
|year|month|            maxLow|
+----+-----+------------------+
|2012|   10|        665.550026|
|2010|    7|        260.300003|
|2010|   12|        325.099991|
|2015|    2|        131.169998|
|2014|    4|        589.799988|
|2015|   12|        117.809998|
|2016|    7|            103.68|
|2016|   11|        111.400002|
|2012|    8| 673.5400089999999|
|2013|    2|473.24997699999994|
|2012|    4| 626.0000150000001|
|2012|   12|        585.500023|
|2014|   10|        107.209999|
|2016|    5|             99.25|
|2014|   12|        115.290001|
|2013|    9|        503.479988|
|2013|   10|        525.110016|
|2014|    5|        628.900002|
|2016|    2|         96.650002|
|2013|   12| 566.4100269999999|
|2014|    1|        552.020004|
|2010|   11|        316.759987|
|2011|    3|        357.750004|
|2013|    3|        461.780022|
|2014|    8|        102.199997|
|2013|    6|447.38999900000005|
|2010|    6|        271.499992|
|2012|    7| 605.9999849999999|
|2012|  

### Mean and STD  of High Price

The mean and standard deviation of the high price are calculated using the mean and stddev functions.

In [15]:
mean_stddev_high_price = stocks_df.select(mean('High'), stddev('High'))
show_df(mean_stddev_high_price)

+-----------------+------------------+
|        avg(High)|      stddev(High)|
+-----------------+------------------+
|315.9112880164581|186.89817686485767|
+-----------------+------------------+

number of rows:  1
