<a href="https://colab.research.google.com/github/datasigntist/deeplearning/blob/master/pySpark_Codes_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Experiments with Spark**

Author : Vishwanathan Raman

EmailId : datasigntist@gmail.com

Description : The following is a collection of UNITED STATES HISTORICAL CLIMATOLOGY NETWORK (USHCN) Daily Dataset
https://cdiac.ess-dive.lbl.gov/ftp/ushcn_daily/.

The notebook illustrates the development of functions using pySpark to process and consolidate the data
   

Reference Links:

*   Introduction to Spark 1 : https://youtu.be/TuGn3e1EgXM
*   Introduction to Spark 2 : https://youtu.be/JruCKuWHKpk
*   Introduction to Spark 3 : https://youtu.be/c9jd4yZGyT8
*   Introduction to RDD 1   : https://youtu.be/M7UuKHYecXQ
*   Introduction to RDD 2   : https://youtu.be/qLGUPdSvAVg
*   Introduction to RDD 3   : https://youtu.be/9NBP-FiHrQg



## Spark Installation in Google Colab

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
import numpy as np
from pyspark.sql.window import Window
from pyspark.sql import functions as func
import datetime

## Download and Process Data

In [0]:
states = ["AL",  "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", 
          "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
          "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
          "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
          "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"]

In [0]:
fileNameList = []
for loop in range(len(states)):
  index = loop+1
  fileName = "state"+str(index).zfill(2)+"_"+states[loop]+".txt.gz"
  fileNameList.append(fileName)

In [9]:
for loop in range(len(fileNameList)):
  fileNameWithPath = "https://cdiac.ess-dive.lbl.gov/ftp/ushcn_daily/"+fileNameList[loop]
  !wget "https${fileNameWithPath}"

--2019-11-22 05:47:02--  https://cdiac.ess-dive.lbl.gov/ftp/ushcn_daily/state01_AL.txt.gz
Resolving cdiac.ess-dive.lbl.gov (cdiac.ess-dive.lbl.gov)... 128.55.206.24, 128.55.206.26
Connecting to cdiac.ess-dive.lbl.gov (cdiac.ess-dive.lbl.gov)|128.55.206.24|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/x-gzip]
Saving to: ‘state01_AL.txt.gz’

state01_AL.txt.gz       [    <=>             ]   2.16M  1.97MB/s    in 1.1s    

2019-11-22 05:47:04 (1.97 MB/s) - ‘state01_AL.txt.gz’ saved [2266952]

--2019-11-22 05:47:05--  https://cdiac.ess-dive.lbl.gov/ftp/ushcn_daily/state02_AZ.txt.gz
Resolving cdiac.ess-dive.lbl.gov (cdiac.ess-dive.lbl.gov)... 128.55.206.24, 128.55.206.26
Connecting to cdiac.ess-dive.lbl.gov (cdiac.ess-dive.lbl.gov)|128.55.206.24|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/x-gzip]
Saving to: ‘state02_AZ.txt.gz’

state02_AZ.txt.gz       [    <=>             ]   2.99M  2.

### Exploring Data

In [0]:
sparkDataFrame = spark.read.text("state03_AR.txt.gz")

In [102]:
sparkDataFrame.show(10)

+--------------------+
|               value|
+--------------------+
|030936188304TMAX-...|
|030936188304PRCP ...|
|030936188304SNOW ...|
|030936188304SNWD ...|
|030936188305TMAX ...|
|030936188305TMIN-...|
|030936188305PRCP ...|
|030936188305SNOW ...|
|030936188305SNWD ...|
|030936188306TMAX ...|
+--------------------+
only showing top 10 rows



In [0]:
sparkData_rdd = sparkDataFrame.rdd

In [0]:
sparkDataTMAX_rdd = sparkData_rdd.filter(lambda line: 'TMAX' in line.value)

In [0]:
sparkDataTMAXdict_rdd = sparkDataTMAX_rdd.map(lambda line: 
                                          (
                                          line.value[0:6],
                                          datetime.datetime.strptime(line.value[6:10]+"-"+line.value[10:12]+"-01",'%Y-%m-%d'),
                                          (0 if len([int(data) for data in line.value[16:].split(" ") if data!=0 and data!='6' and data!="7" and data.strip()!=""  and data.isnumeric()])==0 else max([int(data) for data in line.value[16:].split(" ") if data!=0 and data!='6' and data!="7" and data.strip()!=""  and data.isnumeric()])),
                                          (0 if len([int(data) for data in line.value[16:].split(" ") if data!=0 and data!='6' and data!="7" and data.strip()!=""  and data.isnumeric()])==0 else min([int(data) for data in line.value[16:].split(" ") if data!=0 and data!='6' and data!="7" and data.strip()!=""  and data.isnumeric()]))
                                           ))

In [120]:
sparkDataTMAXdict_rdd.collect()[0:12]

[('040693', datetime.datetime(1893, 1, 1, 0, 0), 57, 46),
 ('040693', datetime.datetime(1893, 2, 1, 0, 0), 66, 48),
 ('040693', datetime.datetime(1893, 3, 1, 0, 0), 75, 49),
 ('040693', datetime.datetime(1893, 4, 1, 0, 0), 68, 52),
 ('040693', datetime.datetime(1893, 5, 1, 0, 0), 80, 59),
 ('040693', datetime.datetime(1893, 6, 1, 0, 0), 91, 66),
 ('040693', datetime.datetime(1893, 7, 1, 0, 0), 77, 64),
 ('040693', datetime.datetime(1893, 8, 1, 0, 0), 76, 61),
 ('040693', datetime.datetime(1893, 9, 1, 0, 0), 74, 63),
 ('040693', datetime.datetime(1893, 10, 1, 0, 0), 77, 57),
 ('040693', datetime.datetime(1893, 11, 1, 0, 0), 71, 53),
 ('040693', datetime.datetime(1893, 12, 1, 0, 0), 67, 41)]

### Generic function for parsing data and consolidation

Parsing the data and getting the min and max for each time period. An empty list will cause an error hence handled the empty list exception.

In [0]:
def parseDataFile(fileName,columnName):
  sparkDataFrame = spark.read.text(fileName)
  sparkData_rdd = sparkDataFrame.rdd
  sparkDataTMAX_rdd = sparkData_rdd.filter(lambda line: columnName in line.value)
  sparkDataTMAXdict_rdd = sparkDataTMAX_rdd.map(lambda line: 
                                          (
                                          line.value[0:6],
                                          datetime.datetime.strptime(line.value[6:10]+"-"+line.value[10:12]+"-01",'%Y-%m-%d'),
                                          (0 if len([int(data) for data in line.value[16:].split(" ") if data!=0 and data!='6' and data!="7" and data.strip()!=""  and data.isnumeric()])==0 else max([int(data) for data in line.value[16:].split(" ") if data!=0 and data!='6' and data!="7" and data.strip()!=""  and data.isnumeric()])),
                                          (0 if len([int(data) for data in line.value[16:].split(" ") if data!=0 and data!='6' and data!="7" and data.strip()!=""  and data.isnumeric()])==0 else min([int(data) for data in line.value[16:].split(" ") if data!=0 and data!='6' and data!="7" and data.strip()!=""  and data.isnumeric()]))
                                           ))
  return (sparkDataTMAXdict_rdd)

In [0]:
def consolidateDataFiles(fileNameList):
  for loop in range(len(fileNameList)):
    if (loop == 0):
      consolidatedTMAXdict_rdd = parseDataFile(fileNameList[loop],"TMAX")
    else:
      consolidatedTMAXdict_rdd = consolidatedTMAXdict_rdd.union(parseDataFile(fileNameList[loop],"TMAX"))
  return (consolidatedTMAXdict_rdd)

In [0]:
consolidatedTMAXdict_rdd = consolidateDataFiles(fileNameList)

In [123]:
consolidatedTMAXdict_rdd.first()

('011084', datetime.datetime(1926, 1, 1, 0, 0), 75, 41)

In [82]:
print("The total number of data points ",consolidatedTMAXdict_rdd.count())

The total number of data points  129894


In [124]:
print("The number of partitions ",consolidatedTMAXdict_rdd.getNumPartitions())

The number of partitions  4


In [0]:
consolidatedTMAXdict_df = consolidatedTMAXdict_rdd.toDF(["key","date","maxTemp","minTemp"])

In [126]:
consolidatedTMAXdict_df.show(10)

+------+-------------------+-------+-------+
|   key|               date|maxTemp|minTemp|
+------+-------------------+-------+-------+
|011084|1926-01-01 00:00:00|     75|     41|
|011084|1926-02-01 00:00:00|     77|     56|
|011084|1926-03-01 00:00:00|     85|     52|
|011084|1926-04-01 00:00:00|     86|     72|
|011084|1926-05-01 00:00:00|     97|     73|
|011084|1926-06-01 00:00:00|     99|     79|
|011084|1926-07-01 00:00:00|     99|     82|
|011084|1926-08-01 00:00:00|    101|     80|
|011084|1926-09-01 00:00:00|     99|     79|
|011084|1926-10-01 00:00:00|     98|     61|
+------+-------------------+-------+-------+
only showing top 10 rows



Creating a new datapoint avgTemp using  func.when construct.

In [0]:
consolidatedTMAXdict_df = consolidatedTMAXdict_df.withColumn('avgTemp',func.when((consolidatedTMAXdict_df['maxTemp'] !=0)  & (consolidatedTMAXdict_df['minTemp'] != 0),(consolidatedTMAXdict_df['maxTemp']+consolidatedTMAXdict_df['minTemp'])/2).otherwise(consolidatedTMAXdict_df['maxTemp']+consolidatedTMAXdict_df['minTemp']))

In [202]:
consolidatedTMAXdict_df.show(10)

+------+-------------------+-------+-------+-------+
|   key|               date|maxTemp|minTemp|avgTemp|
+------+-------------------+-------+-------+-------+
|011084|1926-01-01 00:00:00|     75|     41|   58.0|
|011084|1926-02-01 00:00:00|     77|     56|   66.5|
|011084|1926-03-01 00:00:00|     85|     52|   68.5|
|011084|1926-04-01 00:00:00|     86|     72|   79.0|
|011084|1926-05-01 00:00:00|     97|     73|   85.0|
|011084|1926-06-01 00:00:00|     99|     79|   89.0|
|011084|1926-07-01 00:00:00|     99|     82|   90.5|
|011084|1926-08-01 00:00:00|    101|     80|   90.5|
|011084|1926-09-01 00:00:00|     99|     79|   89.0|
|011084|1926-10-01 00:00:00|     98|     61|   79.5|
+------+-------------------+-------+-------+-------+
only showing top 10 rows



In [0]:
sdf = consolidatedTMAXdict_df

Implementing the moving average functionality using windowSpec. In this window spec, the data is partitioned by key. The data is ordered by date. And, the window frame is defined as starting from -1 (one row before the current row) and ending at 1 (one row after the current row), for a total of 3 rows in the sliding window.

In [0]:
windowSpec = Window.partitionBy("key").orderBy("date").rowsBetween(-1, 1)

In [0]:
sdf_withMA = sdf.withColumn('movAvg', func.avg("avgTemp").over(windowSpec))

In [216]:
sdf_withMA.show(10)

+------+-------------------+-------+-------+-------+------------------+
|   key|               date|maxTemp|minTemp|avgTemp|            movAvg|
+------+-------------------+-------+-------+-------+------------------+
|029652|1920-09-01 00:00:00|    107|      0|  107.0|             103.0|
|029652|1920-10-01 00:00:00|     99|      0|   99.0| 96.33333333333333|
|029652|1920-11-01 00:00:00|     83|      0|   83.0| 85.33333333333333|
|029652|1920-12-01 00:00:00|     74|      0|   74.0| 78.33333333333333|
|029652|1921-01-01 00:00:00|     78|      0|   78.0|              81.0|
|029652|1921-02-01 00:00:00|     91|      0|   91.0| 86.66666666666667|
|029652|1921-03-01 00:00:00|     91|      0|   91.0| 94.33333333333333|
|029652|1921-04-01 00:00:00|    101|      0|  101.0| 98.33333333333333|
|029652|1921-05-01 00:00:00|    103|      0|  103.0|104.33333333333333|
|029652|1921-06-01 00:00:00|    109|      0|  109.0|107.33333333333333|
+------+-------------------+-------+-------+-------+------------

In [0]:
sdf_withMA = sdf_withMA.withColumn("sqrdDiff_avg_mov",(sdf_withMA['avgTemp']-sdf_withMA['movAvg'])*(sdf_withMA['avgTemp']-sdf_withMA['movAvg']))

In [218]:
sdf_withMA.show(10)

+------+-------------------+-------+-------+-------+------------------+------------------+
|   key|               date|maxTemp|minTemp|avgTemp|            movAvg|  sqrdDiff_avg_mov|
+------+-------------------+-------+-------+-------+------------------+------------------+
|029652|1920-09-01 00:00:00|    107|      0|  107.0|             103.0|              16.0|
|029652|1920-10-01 00:00:00|     99|      0|   99.0| 96.33333333333333|7.1111111111111365|
|029652|1920-11-01 00:00:00|     83|      0|   83.0| 85.33333333333333| 5.444444444444422|
|029652|1920-12-01 00:00:00|     74|      0|   74.0| 78.33333333333333|18.777777777777736|
|029652|1921-01-01 00:00:00|     78|      0|   78.0|              81.0|               9.0|
|029652|1921-02-01 00:00:00|     91|      0|   91.0| 86.66666666666667|18.777777777777736|
|029652|1921-03-01 00:00:00|     91|      0|   91.0| 94.33333333333333|11.111111111111079|
|029652|1921-04-01 00:00:00|    101|      0|  101.0| 98.33333333333333|7.1111111111111365|

In [220]:
sdf_withMA.groupby(['key'])\
.agg({"sqrdDiff_avg_mov": "AVG"})\
.show(5)

+------+---------------------+
|   key|avg(sqrdDiff_avg_mov)|
+------+---------------------+
|029652|   18.798756798756752|
|013160|    7.536277859118267|
|024089|    24.53927918950335|
|043257|   11.860532407407401|
|026796|   10.056261599289948|
+------+---------------------+
only showing top 5 rows



In [0]:
pd_029652 = sdf_withMA.filter(sdf_withMA["key"]=="029652").toPandas()