In [1]:
# Importing our libraries

import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.types import *

In [2]:
# Creating SparkSession

ss = SparkSession.builder.appName('loadCSV').getOrCreate()

In [3]:
# Reading the CSV file into spark dataframe

sparkDF = ss.read.csv('rangel.csv', header=True)

In [4]:
# Total row count of the spark dataframe

sparkDF.count()

2580185

In [6]:
sparkDF.show(20, False)

+----------+------------------+----------------------------------------------------------+-----------------------------------------------------------------+--------------------------------------+--------+----------+----------+-----+
|Date      |value             |Model Code                                                |Desc                                                             |range                                 |quotecal|endrange  |startrange|check|
+----------+------------------+----------------------------------------------------------+-----------------------------------------------------------------+--------------------------------------+--------+----------+----------+-----+
|2016-06-06|27.09951219512195 |model://ICENLFINREL/EU.EL.NL.ICE.FIN.OP.FUT.M10/SETTLE/ALL|ICE Dutch Power Financial Offpeak Futures - Dutch Grid - Month 10|range://DB:1/END/06-06-2016/10-09-2019|HICEEDX |2019-10-09|2016-06-06|True |
|2016-06-07|27.120975609756094|model://ICENLFINREL/EU.EL.NL.ICE.FIN.

In [7]:
# Printing schema of the spark dataframe

sparkDF.printSchema()

root
 |-- Date: string (nullable = true)
 |-- value: string (nullable = true)
 |-- Model Code: string (nullable = true)
 |-- Desc: string (nullable = true)
 |-- range: string (nullable = true)
 |-- quotecal: string (nullable = true)
 |-- endrange: string (nullable = true)
 |-- startrange: string (nullable = true)
 |-- check: string (nullable = true)



In [8]:
# Rounding up the values to 2 places of decimal

df1 = sparkDF.withColumn('value', fn.round(sparkDF['value'], 2))
sparkDF = df1
sparkDF.show(20, False)

+----------+-----+----------------------------------------------------------+-----------------------------------------------------------------+--------------------------------------+--------+----------+----------+-----+
|Date      |value|Model Code                                                |Desc                                                             |range                                 |quotecal|endrange  |startrange|check|
+----------+-----+----------------------------------------------------------+-----------------------------------------------------------------+--------------------------------------+--------+----------+----------+-----+
|2016-06-06|27.1 |model://ICENLFINREL/EU.EL.NL.ICE.FIN.OP.FUT.M10/SETTLE/ALL|ICE Dutch Power Financial Offpeak Futures - Dutch Grid - Month 10|range://DB:1/END/06-06-2016/10-09-2019|HICEEDX |2019-10-09|2016-06-06|True |
|2016-06-07|27.12|model://ICENLFINREL/EU.EL.NL.ICE.FIN.OP.FUT.M10/SETTLE/ALL|ICE Dutch Power Financial Offpeak Futures -

In [11]:
# Count NULLs and NaNs in each column in the dataframe

df2 = sparkDF.select([fn.count(fn.when(fn.isnan(i) |
                                       fn.col(i).isNull(), i)).alias(i)
                      for i in sparkDF.columns])
df2.show()

+----+-----+----------+----+-----+--------+--------+----------+-----+
|Date|value|Model Code|Desc|range|quotecal|endrange|startrange|check|
+----+-----+----------+----+-----+--------+--------+----------+-----+
|   0|38443|         0|   0|    0|       0|       0|         0|    0|
+----+-----+----------+----+-----+--------+--------+----------+-----+



In [36]:
# Now I will drop all the rows that contains Null in value column

sparkDF.dropna(subset=['value'])
sparkDF.show(20, False)
print('Total rowcount after removing Nulls: ', sparkDF.count())

+----------+-----+----------------------------------------------------------+-----------------------------------------------------------------+--------------------------------------+--------+----------+----------+-----+
|Date      |value|Model Code                                                |Desc                                                             |range                                 |quotecal|endrange  |startrange|check|
+----------+-----+----------------------------------------------------------+-----------------------------------------------------------------+--------------------------------------+--------+----------+----------+-----+
|2016-06-06|27.1 |model://ICENLFINREL/EU.EL.NL.ICE.FIN.OP.FUT.M10/SETTLE/ALL|ICE Dutch Power Financial Offpeak Futures - Dutch Grid - Month 10|range://DB:1/END/06-06-2016/10-09-2019|HICEEDX |2019-10-09|2016-06-06|True |
|2016-06-07|27.12|model://ICENLFINREL/EU.EL.NL.ICE.FIN.OP.FUT.M10/SETTLE/ALL|ICE Dutch Power Financial Offpeak Futures -

In [89]:
# Now creating new column called Absolute_Period in sparkDF

condition1 = fn.col('Desc').like('%Month%')
condition2 = fn.col('Desc').like('%Quarter%')

sparkDF = sparkDF.withColumn('Absolute_Period',
                             fn.concat(fn.year('Date'),
                                       fn.when(condition1, 'M')
                                       .when(condition2, 'Q')
                                       .otherwise('Y'),
                                       fn.when(condition1, fn.format_string('%02d', fn.month('Date')))
                                       .when(condition2, fn.format_string('%02d', fn.quarter('Date')))
                                       .otherwise('')))
sparkDF.show(20, False)

+----------+------------------+----------------------------------------------------------+-----------------------------------------------------------------+--------------------------------------+--------+----------+----------+-----+---------------+
|Date      |value             |Model Code                                                |Desc                                                             |range                                 |quotecal|endrange  |startrange|check|Absolute_Period|
+----------+------------------+----------------------------------------------------------+-----------------------------------------------------------------+--------------------------------------+--------+----------+----------+-----+---------------+
|2016-06-06|27.09951219512195 |model://ICENLFINREL/EU.EL.NL.ICE.FIN.OP.FUT.M10/SETTLE/ALL|ICE Dutch Power Financial Offpeak Futures - Dutch Grid - Month 10|range://DB:1/END/06-06-2016/10-09-2019|HICEEDX |2019-10-09|2016-06-06|True |2016M06        |
|201