In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [737 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,080 kB]
Get:12 http://archive.ubuntu.com/ubuntu jammy-backports InRelease

In [None]:
from google.colab import drive
drive.mount('/content/drive')

data_folder = '/content/drive/MyDrive/BDT Datasets/Lab 4/'

Mounted at /content/drive


In [None]:
index_data_file = data_folder + 'indexData.csv'
index_info_file = data_folder + 'indexInfo.csv'
index_processed_file = data_folder + 'indexProcessed.csv'

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark =  SparkSession.builder.appName('Processing').getOrCreate()

### Specifying the data schema

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, TimestampType

The construct of a field has the column, the type for the column, and a boolean value that allows null values or not.

`StructField('column_name', type, allow_null)`

In [None]:
columns = []
columns.append(StructField('Index', StringType(), True))
columns.append(StructField('Date', TimestampType(), True))
columns.append(StructField('Open', DoubleType(), True))
columns.append(StructField('High', DoubleType(), True))
columns.append(StructField('Low', DoubleType(), True))
columns.append(StructField('Close', DoubleType(), True))
columns.append(StructField('Adj Close', DoubleType(), True))
columns.append(StructField('Volume', DoubleType(), True))


In [None]:
schema = StructType(fields=columns)

In [None]:
df = spark.read.csv(index_data_file, header=True, schema=schema)

As you can see our data frame has all the columns that contain numbers as double.

In [None]:
df.printSchema()

root
 |-- Index: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)



Notice that the *mean()* function is able to compute the average of all columns.

In [None]:
df.groupBy('Index').mean().show()

+---------+------------------+------------------+------------------+------------------+------------------+--------------------+
|    Index|         avg(Open)|         avg(High)|          avg(Low)|        avg(Close)|    avg(Adj Close)|         avg(Volume)|
+---------+------------------+------------------+------------------+------------------+------------------+--------------------+
|     NSEI| 7665.751272509498| 7712.532773450627| 7605.887145197213| 7660.047238088113| 7660.047238088113|  183071.90675433353|
|   GSPTSE|8091.1065434325765| 8128.025943173414| 8048.288132083155|8090.0663048776605| 8088.084509185663| 9.620499365380962E9|
|      NYA|4451.7781505843395| 4468.236551545467| 4433.695014189053| 4452.174710990798| 4452.174710990798| 1.215565480748548E9|
|      HSI|15206.355607330794| 15303.62676154348| 15091.04879081679| 15200.60562929962| 15200.60562929962| 8.430191354804522E8|
|399001.SZ| 7968.340420628268| 8052.905486944613| 7882.030536089907| 7973.831004994244|7973.758660467331

## Dealing with missing values

### Dropping rows that have null values

In [None]:
df.count()

112457

In [None]:
df2 = df.na.drop()

In [None]:
df2.count()

110253

In [None]:
df3 = df.na.drop('any') # if any of the columns is null, the row is dropped (similar with drop())

In [None]:
df3.count()

110253

In [None]:
df4 = df.na.drop('all') # if all of the columns are null, the row is dropped

In [None]:
df4.count()

112457

In [None]:
df5 = df.na.drop(thresh=6) # if at most 6 columns are null, the row is dropped

In [None]:
df5.count()

110253

In [None]:
df6 = df.na.drop(subset=['Open']) # if any of the columns in the subset is null, the row is dropped

In [None]:
df6.count()

110253

In [None]:
df6.orderBy('Open').show()

+-----+-------------------+---------+---------+---------+---------+---------+------+
|Index|               Date|     Open|     High|      Low|    Close|Adj Close|Volume|
+-----+-------------------+---------+---------+---------+---------+---------+------+
| IXIC|1974-10-03 00:00:00|54.869999|54.869999|54.869999|54.869999|54.869999|   0.0|
| IXIC|1974-10-04 00:00:00|    55.16|    55.16|    55.16|    55.16|    55.16|   0.0|
| IXIC|1974-10-01 00:00:00|    55.48|    55.48|    55.48|    55.48|    55.48|   0.0|
| IXIC|1974-09-30 00:00:00|55.669998|55.669998|55.669998|55.669998|55.669998|   0.0|
| IXIC|1974-10-02 00:00:00|55.669998|55.669998|55.669998|55.669998|55.669998|   0.0|
| IXIC|1974-10-08 00:00:00|56.130001|56.130001|56.130001|56.130001|56.130001|   0.0|
| IXIC|1974-10-07 00:00:00|    56.57|    56.57|    56.57|    56.57|    56.57|   0.0|
| IXIC|1974-09-13 00:00:00|    56.66|    56.66|    56.66|    56.66|    56.66|   0.0|
| IXIC|1974-09-27 00:00:00|57.119999|57.119999|57.119999|57.11999

### Filling rows that have null values

In [None]:
df7 = df.na.fill(0) # fill with 0 where null is found

In [None]:
df7.count()

112457

In [None]:
df8 = df.na.fill('No Index Name', subset=['Index']) # replace null with 'No Index Name' if a row has null for 'Index'

In [None]:
df8.groupBy('Index').mean().show()

+---------+------------------+------------------+------------------+------------------+------------------+--------------------+
|    Index|         avg(Open)|         avg(High)|          avg(Low)|        avg(Close)|    avg(Adj Close)|         avg(Volume)|
+---------+------------------+------------------+------------------+------------------+------------------+--------------------+
|     NSEI| 7665.751272509498| 7712.532773450627| 7605.887145197213| 7660.047238088113| 7660.047238088113|  183071.90675433353|
|   GSPTSE|8091.1065434325765| 8128.025943173414| 8048.288132083155|8090.0663048776605| 8088.084509185663| 9.620499365380962E9|
|      NYA|4451.7781505843395| 4468.236551545467| 4433.695014189053| 4452.174710990798| 4452.174710990798| 1.215565480748548E9|
|      HSI|15206.355607330794| 15303.62676154348| 15091.04879081679| 15200.60562929962| 15200.60562929962| 8.430191354804522E8|
|399001.SZ| 7968.340420628268| 8052.905486944613| 7882.030536089907| 7973.831004994244|7973.758660467331

### Using functions to fill data

In [None]:
from pyspark.sql.functions import mean

In [None]:
mean_val = df.select(mean(df['Open'])).collect()

In [None]:
mean_val

[Row(avg(Open)=7658.515221546726)]

In [None]:
print(mean_val[0][0])

7658.515221546726


In [None]:
mean_open = mean_val[0][0]

In [None]:
df.orderBy('Open').show()

+---------+-------------------+----+----+----+-----+---------+------+
|    Index|               Date|Open|High| Low|Close|Adj Close|Volume|
+---------+-------------------+----+----+----+-----+---------+------+
|     N225|2018-12-31 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|399001.SZ|2000-01-03 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|      HSI|1987-01-01 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|399001.SZ|1999-12-31 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|     N225|2018-01-02 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|399001.SZ|1999-02-10 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|      HSI|1988-04-04 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|399001.SZ|1999-02-18 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|     N225|2018-05-03 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|399001.SZ|1998-05-01 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|      HSI|1987-01-29 00:00:00|NULL|NULL|NULL| NULL|     NULL|  NULL|
|399001.SZ|1999-05-0

In [None]:
df_mean = df.na.fill(mean_open, subset=['Open'])
df_mean.orderBy('Open').show()

+-----+-------------------+---------+---------+---------+---------+---------+------+
|Index|               Date|     Open|     High|      Low|    Close|Adj Close|Volume|
+-----+-------------------+---------+---------+---------+---------+---------+------+
| IXIC|1974-10-03 00:00:00|54.869999|54.869999|54.869999|54.869999|54.869999|   0.0|
| IXIC|1974-10-04 00:00:00|    55.16|    55.16|    55.16|    55.16|    55.16|   0.0|
| IXIC|1974-10-01 00:00:00|    55.48|    55.48|    55.48|    55.48|    55.48|   0.0|
| IXIC|1974-09-30 00:00:00|55.669998|55.669998|55.669998|55.669998|55.669998|   0.0|
| IXIC|1974-10-02 00:00:00|55.669998|55.669998|55.669998|55.669998|55.669998|   0.0|
| IXIC|1974-10-08 00:00:00|56.130001|56.130001|56.130001|56.130001|56.130001|   0.0|
| IXIC|1974-10-07 00:00:00|    56.57|    56.57|    56.57|    56.57|    56.57|   0.0|
| IXIC|1974-09-13 00:00:00|    56.66|    56.66|    56.66|    56.66|    56.66|   0.0|
| IXIC|1974-09-27 00:00:00|57.119999|57.119999|57.119999|57.11999

## Date and DateTime

In [None]:
from pyspark.sql.functions import dayofmonth, dayofyear, weekofyear, hour, month, year, format_number, date_format

In [None]:
df.withColumn('Day of Month', dayofmonth(df['Date'])).show()

+-----+-------------------+----------+----------+----------+----------+----------+------+------------+
|Index|               Date|      Open|      High|       Low|     Close| Adj Close|Volume|Day of Month|
+-----+-------------------+----------+----------+----------+----------+----------+------+------------+
|  NYA|1965-12-31 00:00:00|528.690002|528.690002|528.690002|528.690002|528.690002|   0.0|          31|
|  NYA|1966-01-03 00:00:00|527.210022|527.210022|527.210022|527.210022|527.210022|   0.0|           3|
|  NYA|1966-01-04 00:00:00|527.840027|527.840027|527.840027|527.840027|527.840027|   0.0|           4|
|  NYA|1966-01-05 00:00:00|531.119995|531.119995|531.119995|531.119995|531.119995|   0.0|           5|
|  NYA|1966-01-06 00:00:00|532.070007|532.070007|532.070007|532.070007|532.070007|   0.0|           6|
|  NYA|1966-01-07 00:00:00|532.599976|532.599976|532.599976|532.599976|532.599976|   0.0|           7|
|  NYA|1966-01-10 00:00:00|533.869995|533.869995|533.869995|533.869995|53

In [None]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [None]:
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|         12|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
+-----------+
only showing top 20 rows



In [None]:
df2 = df.withColumn('Year', year(df['Date']))
df3 = df.groupBy(year(df['Date']).alias('Year')).mean().orderBy('Year').show()

+----+------------------+------------------+------------------+------------------+------------------+-----------------+
|Year|         avg(Open)|         avg(High)|          avg(Low)|        avg(Close)|    avg(Adj Close)|      avg(Volume)|
+----+------------------+------------------+------------------+------------------+------------------+-----------------+
|1965|1198.7025404426226|1198.7025404426226|1198.7025404426226|1198.7025404426226|1198.7025404426226|              0.0|
|1966|  976.632735987928|  976.632735987928|  976.632735987928|  976.632735987928|  976.632735987928|              0.0|
|1967| 971.8781275835003| 971.8781275835003| 971.8781275835003| 971.8781275835003| 971.8781275835003|              0.0|
|1968|1085.7454434300842|1085.7454434300842|1085.7454434300842|1085.7454434300842|1085.7454434300842|              0.0|
|1969|1259.6028146093117|1259.6028146093117|1259.6028146093117|1259.6028146093117|1259.6028146093117|              0.0|
|1970|1327.3219898067732|1327.3219898067

In [None]:
order_by_year = df2.groupBy('Year').mean().orderBy('Year')
order_by_year.show(order_by_year.count())

+----+------------------+------------------+------------------+------------------+------------------+--------------------+---------+
|Year|         avg(Open)|         avg(High)|          avg(Low)|        avg(Close)|    avg(Adj Close)|         avg(Volume)|avg(Year)|
+----+------------------+------------------+------------------+------------------+------------------+--------------------+---------+
|1965|1198.7025404426226|1198.7025404426226|1198.7025404426226|1198.7025404426226|1198.7025404426226|                 0.0|   1965.0|
|1966|  976.632735987928|  976.632735987928|  976.632735987928|  976.632735987928|  976.632735987928|                 0.0|   1966.0|
|1967| 971.8781275835003| 971.8781275835003| 971.8781275835003| 971.8781275835003| 971.8781275835003|                 0.0|   1967.0|
|1968|1085.7454434300842|1085.7454434300842|1085.7454434300842|1085.7454434300842|1085.7454434300842|                 0.0|   1968.0|
|1969|1259.6028146093117|1259.6028146093117|1259.6028146093117|1259.6

In [None]:
df_1983 = df2.filter('Year > 1983')

In [None]:
order_by_year = df_1983.groupBy('Year').mean().orderBy('Year')
order_by_year.show(order_by_year.count())

+----+------------------+------------------+------------------+------------------+------------------+--------------------+---------+
|Year|         avg(Open)|         avg(High)|          avg(Low)|        avg(Close)|    avg(Adj Close)|         avg(Volume)|avg(Year)|
+----+------------------+------------------+------------------+------------------+------------------+--------------------+---------+
|1984| 3526.495369124255| 3527.996860725647| 3524.751230931412| 3526.265448887675| 3524.787771734594|   3739602.385685885|   1984.0|
|1985| 4128.950976680639| 4130.926922893212| 4127.397683393213| 4129.361755783432| 4127.654152191616|2.3691656686626747E7|   1985.0|
|1986| 5256.340434208955| 5258.433371317413| 5254.224513865673| 5256.403720688557| 5254.508889664677| 3.326731343283582E7|   1986.0|
|1987| 6308.204833446036| 6311.865983571538| 6303.511787921656|6307.4549525523535| 6305.625431402073| 3.602266986410871E7|   1987.0|
|1988| 5916.046891052529| 5932.510553593086|5901.3005752253985| 5917.

In [None]:
from pyspark.sql.functions import countDistinct, avg, stddev


In [None]:
df.select(countDistinct('Index')).show()


+---------------------+
|count(DISTINCT Index)|
+---------------------+
|                   14|
+---------------------+



In [None]:
df.select(avg('Open')).show()


+-----------------+
|        avg(Open)|
+-----------------+
|7658.515221546692|
+-----------------+



In [None]:
df.select(avg('Open').alias('Average Open')).show()


+-----------------+
|     Average Open|
+-----------------+
|7658.515221546692|
+-----------------+



In [None]:
df.select(stddev('Open')).show()
df.describe().show()

+-----------------+
|stddev_samp(Open)|
+-----------------+
|9011.478912966066|
+-----------------+

+-------+---------+-----------------+-----------------+------------------+-----------------+-----------------+--------------------+
|summary|    Index|             Open|             High|               Low|            Close|        Adj Close|              Volume|
+-------+---------+-----------------+-----------------+------------------+-----------------+-----------------+--------------------+
|  count|   112457|           110253|           110253|            110253|           110253|           110253|              110253|
|   mean|     null|7658.515221546692|7704.372961277125|7608.0004223377055|7657.545871842828|7657.351729363816|1.2739751626030312E9|
| stddev|     null|9011.478912966066| 9066.63854803485| 8954.506981251896|9011.510443530442|9011.608899984898|  4.31578312088231E9|
|    min|000001.SS|        54.869999|        54.869999|         54.869999|        54.869999|        54.8699

In [None]:
description = df.describe()
description.printSchema()
description.show()

root
 |-- summary: string (nullable = true)
 |-- Index: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)

+-------+---------+-----------------+-----------------+------------------+-----------------+-----------------+--------------------+
|summary|    Index|             Open|             High|               Low|            Close|        Adj Close|              Volume|
+-------+---------+-----------------+-----------------+------------------+-----------------+-----------------+--------------------+
|  count|   112457|           110253|           110253|            110253|           110253|           110253|              110253|
|   mean|     null|7658.515221546692|7704.372961277125|7608.0004223377055|7657.545871842828|7657.351729363816|1.2739751626030312E9|
| stddev|     null|9011.47891296606