In [None]:
#Run Once
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark
#Run Once
import os
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
spark

START HERE

In [None]:
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [None]:
# Creating a list of the schema in the format column_name, data_type
labels = [
     ('date',StringType()),
     ('ind1',DoubleType()),
     ('rain',IntegerType()),
     ('ind3',DoubleType()),
     ('temp',DoubleType()),
     ('ind5',DoubleType()),
     ('wetb',DoubleType()),
     ('dewpt',IntegerType()),
     ('vappr',IntegerType()),
     ('rhum', IntegerType()),
     ('msl',DoubleType()),
     ('ind11',DoubleType()),
     ('wdsp',DoubleType()),
     ('ind13',DoubleType()),
     ('wddir',IntegerType())
]

In [None]:
# Creating the schema that will be passed when reading the csv
schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema

StructType(List(StructField(date,StringType,true),StructField(ind1,DoubleType,true),StructField(rain,IntegerType,true),StructField(ind3,DoubleType,true),StructField(temp,DoubleType,true),StructField(ind5,DoubleType,true),StructField(wetb,DoubleType,true),StructField(dewpt,IntegerType,true),StructField(vappr,IntegerType,true),StructField(rhum,IntegerType,true),StructField(msl,DoubleType,true),StructField(ind11,DoubleType,true),StructField(wdsp,DoubleType,true),StructField(ind13,DoubleType,true),StructField(wddir,IntegerType,true)))

In [None]:
df = spark.read.csv('/content/drive/MyDrive/Colab datasets/Dublin Airport.csv', header=True, sep=",", schema=schema)
df.printSchema()
# The schema comes as we gave!

root
 |-- date: string (nullable = true)
 |-- ind1: double (nullable = true)
 |-- rain: integer (nullable = true)
 |-- ind3: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- ind5: double (nullable = true)
 |-- wetb: double (nullable = true)
 |-- dewpt: integer (nullable = true)
 |-- vappr: integer (nullable = true)
 |-- rhum: integer (nullable = true)
 |-- msl: double (nullable = true)
 |-- ind11: double (nullable = true)
 |-- wdsp: double (nullable = true)
 |-- ind13: double (nullable = true)
 |-- wddir: integer (nullable = true)



In [None]:
df.show()

+----------------+----+----+----+----+----+----+-----+-----+----+------+-----+----+-----+-----+
|            date|ind1|rain|ind3|temp|ind5|wetb|dewpt|vappr|rhum|   msl|ind11|wdsp|ind13|wddir|
+----------------+----+----+----+----+----+----+-----+-----+----+------+-----+----+-----+-----+
|01/01/1991 00:00| 3.0|   0| 0.0| 2.1| 0.0| 1.4| null| null|  87|1010.1|  2.0|11.0|  2.0|  230|
|01/01/1991 01:00| 3.0|   0| 0.0| 2.0| 0.0| 1.0| null| null|  82|1009.9|  2.0|11.0|  2.0|  230|
|01/01/1991 02:00| 3.0|   0| 0.0| 1.9| 0.0| 0.9| null| null|  82|1010.1|  2.0| 8.0|  2.0|  220|
|01/01/1991 03:00| 3.0|   0| 0.0| 2.6| 0.0| 1.3| null| null|  77|1009.5|  2.0| 9.0|  2.0|  220|
|01/01/1991 04:00| 3.0|   0| 0.0| 4.0| 0.0| 2.2| null| null|  70|1008.8|  2.0| 9.0|  2.0|  200|
|01/01/1991 05:00| 3.0|   0| 0.0| 4.9| 0.0| 3.1| null| null|  72|1007.6|  2.0|11.0|  2.0|  200|
|01/01/1991 06:00| 3.0|   0| 0.0| 5.9| 0.0| 4.2| null| null|  74|1006.3|  2.0|13.0|  2.0|  190|
|01/01/1991 07:00| 2.0|   0| 0.0| 7.3| 0

In [None]:
df1 = df.drop('ind1', 'ind3', 'ind5', 'ind11',  'ind13','vappr','dewpt', )
df1.show()

+----------------+----+----+----+----+------+----+-----+
|            date|rain|temp|wetb|rhum|   msl|wdsp|wddir|
+----------------+----+----+----+----+------+----+-----+
|01/01/1991 00:00|   0| 2.1| 1.4|  87|1010.1|11.0|  230|
|01/01/1991 01:00|   0| 2.0| 1.0|  82|1009.9|11.0|  230|
|01/01/1991 02:00|   0| 1.9| 0.9|  82|1010.1| 8.0|  220|
|01/01/1991 03:00|   0| 2.6| 1.3|  77|1009.5| 9.0|  220|
|01/01/1991 04:00|   0| 4.0| 2.2|  70|1008.8| 9.0|  200|
|01/01/1991 05:00|   0| 4.9| 3.1|  72|1007.6|11.0|  200|
|01/01/1991 06:00|   0| 5.9| 4.2|  74|1006.3|13.0|  190|
|01/01/1991 07:00|   0| 7.3| 5.3|  71|1004.3|17.0|  190|
|01/01/1991 08:00|null| 7.5| 5.9|  77|1002.4|17.0|  190|
|01/01/1991 09:00|   0| 8.6| 7.2|  81|1000.0|18.0|  190|
|01/01/1991 10:00|   0| 9.6| 7.8|  76| 998.4|20.0|  200|
|01/01/1991 11:00|   0|10.1| 8.1|  74| 996.3|25.0|  200|
|01/01/1991 12:00|   0|10.4| 9.1|  83| 994.1|25.0|  200|
|01/01/1991 13:00|null|10.7| 8.9|  77| 990.9|27.0|  210|
|01/01/1991 14:00|null|10.8| 9.

In [None]:
df2 = df1.withColumn("TS_date", to_timestamp(df1["date"], 'dd/MM/yyyy HH:mm'))
df2.show()

+----------------+----+----+----+----+------+----+-----+-------------------+
|            date|rain|temp|wetb|rhum|   msl|wdsp|wddir|            TS_date|
+----------------+----+----+----+----+------+----+-----+-------------------+
|01/01/1991 00:00|   0| 2.1| 1.4|  87|1010.1|11.0|  230|1991-01-01 00:00:00|
|01/01/1991 01:00|   0| 2.0| 1.0|  82|1009.9|11.0|  230|1991-01-01 01:00:00|
|01/01/1991 02:00|   0| 1.9| 0.9|  82|1010.1| 8.0|  220|1991-01-01 02:00:00|
|01/01/1991 03:00|   0| 2.6| 1.3|  77|1009.5| 9.0|  220|1991-01-01 03:00:00|
|01/01/1991 04:00|   0| 4.0| 2.2|  70|1008.8| 9.0|  200|1991-01-01 04:00:00|
|01/01/1991 05:00|   0| 4.9| 3.1|  72|1007.6|11.0|  200|1991-01-01 05:00:00|
|01/01/1991 06:00|   0| 5.9| 4.2|  74|1006.3|13.0|  190|1991-01-01 06:00:00|
|01/01/1991 07:00|   0| 7.3| 5.3|  71|1004.3|17.0|  190|1991-01-01 07:00:00|
|01/01/1991 08:00|null| 7.5| 5.9|  77|1002.4|17.0|  190|1991-01-01 08:00:00|
|01/01/1991 09:00|   0| 8.6| 7.2|  81|1000.0|18.0|  190|1991-01-01 09:00:00|

In [None]:
df3 = df2.drop('date')
df3.show()

+----+----+----+----+------+----+-----+-------------------+
|rain|temp|wetb|rhum|   msl|wdsp|wddir|            TS_date|
+----+----+----+----+------+----+-----+-------------------+
|   0| 2.1| 1.4|  87|1010.1|11.0|  230|1991-01-01 00:00:00|
|   0| 2.0| 1.0|  82|1009.9|11.0|  230|1991-01-01 01:00:00|
|   0| 1.9| 0.9|  82|1010.1| 8.0|  220|1991-01-01 02:00:00|
|   0| 2.6| 1.3|  77|1009.5| 9.0|  220|1991-01-01 03:00:00|
|   0| 4.0| 2.2|  70|1008.8| 9.0|  200|1991-01-01 04:00:00|
|   0| 4.9| 3.1|  72|1007.6|11.0|  200|1991-01-01 05:00:00|
|   0| 5.9| 4.2|  74|1006.3|13.0|  190|1991-01-01 06:00:00|
|   0| 7.3| 5.3|  71|1004.3|17.0|  190|1991-01-01 07:00:00|
|null| 7.5| 5.9|  77|1002.4|17.0|  190|1991-01-01 08:00:00|
|   0| 8.6| 7.2|  81|1000.0|18.0|  190|1991-01-01 09:00:00|
|   0| 9.6| 7.8|  76| 998.4|20.0|  200|1991-01-01 10:00:00|
|   0|10.1| 8.1|  74| 996.3|25.0|  200|1991-01-01 11:00:00|
|   0|10.4| 9.1|  83| 994.1|25.0|  200|1991-01-01 12:00:00|
|null|10.7| 8.9|  77| 990.9|27.0|  210|1

In [None]:
df3.filter(df3['TS_date'] >= "2010-03-01 07:00:00").show(truncate=False)
#df4.show()

+----+----+----+----+------+----+-----+-------------------+
|rain|temp|wetb|rhum|msl   |wdsp|wddir|TS_date            |
+----+----+----+----+------+----+-----+-------------------+
|0   |0.3 |0.0 |94  |1008.7|8.0 |250  |2010-03-01 07:00:00|
|0   |0.8 |0.4 |92  |1009.1|8.0 |250  |2010-03-01 08:00:00|
|0   |2.4 |1.7 |87  |1009.8|8.0 |250  |2010-03-01 09:00:00|
|0   |3.5 |2.7 |86  |1010.4|8.0 |250  |2010-03-01 10:00:00|
|0   |5.1 |3.7 |78  |1011.3|9.0 |250  |2010-03-01 11:00:00|
|0   |6.4 |4.9 |78  |1011.5|8.0 |260  |2010-03-01 12:00:00|
|0   |7.3 |4.8 |65  |1011.7|7.0 |270  |2010-03-01 13:00:00|
|0   |7.4 |4.3 |57  |1011.9|7.0 |290  |2010-03-01 14:00:00|
|0   |7.8 |4.5 |55  |1012.1|6.0 |290  |2010-03-01 15:00:00|
|0   |8.1 |4.5 |51  |1012.6|6.0 |290  |2010-03-01 16:00:00|
|0   |6.9 |3.8 |56  |1013.3|6.0 |270  |2010-03-01 17:00:00|
|0   |3.7 |1.8 |68  |1014.0|4.0 |320  |2010-03-01 18:00:00|
|0   |-0.1|-0.8|86  |1014.8|3.0 |150  |2010-03-01 19:00:00|
|0   |0.7 |0.1 |88  |1015.3|3.0 |180  |2

In [None]:
finalDF12 = df3.select('TS_date', "rain",'temp','rhum', 'msl', 'wdsp', 'wddir').filter((df3['TS_date'] >= "2010-01-01 00:00:00") & 
                                                                                       (df3['TS_date'] <= "2021-01-01 00:00:00"))
finalDF12.show()

+-------------------+----+----+----+------+----+-----+
|            TS_date|rain|temp|rhum|   msl|wdsp|wddir|
+-------------------+----+----+----+------+----+-----+
|2010-01-01 00:00:00|null| 0.0|  98|1008.3| 5.0|  290|
|2010-01-01 01:00:00|null|-2.9| 100|1008.3| 6.0|  300|
|2010-01-01 02:00:00|   0|-2.5| 100|1008.4| 7.0|  300|
|2010-01-01 03:00:00|null|-2.5| 100|1008.6| 6.0|  300|
|2010-01-01 04:00:00|   0|-1.9| 100|1008.7| 7.0|  320|
|2010-01-01 05:00:00|null|-0.7|  98|1008.9| 8.0|  330|
|2010-01-01 06:00:00|null|-2.7| 100|1009.2| 6.0|  310|
|2010-01-01 07:00:00|   0|-1.9| 100|1009.2| 6.0|  330|
|2010-01-01 08:00:00|null|-1.3|  96|1009.3| 6.0|  320|
|2010-01-01 09:00:00|null|-3.2|  95|1009.9| 7.0|  310|
|2010-01-01 10:00:00|   0|-3.8|  98|1010.6| 6.0|  300|
|2010-01-01 11:00:00|null|-2.1|  95|1010.7| 8.0|  260|
|2010-01-01 12:00:00|   0|-1.4|  93|1010.8| 8.0|  270|
|2010-01-01 13:00:00|   0|-0.3|  92|1010.8| 8.0|  270|
|2010-01-01 14:00:00|   0| 0.8|  86|1010.7| 9.0|  270|
|2010-01-0

In [None]:
finalDF12 = finalDF12.selectExpr("TS_date as TS_date","rain as rain_DA", "temp as temp_DA", "rhum as rhum_DA", \
                                 "msl as msl_DA", "wdsp as wdsp_DA", "wddir as wddir_DA")

In [None]:
finalDF12.count()
#finalDF12.show()

96433

In [None]:
finalDF12.describe().show()

+-------+--------------------+-----------------+------------------+------------------+------------------+------------------+
|summary|             rain_DA|          temp_DA|           rhum_DA|            msl_DA|           wdsp_DA|          wddir_DA|
+-------+--------------------+-----------------+------------------+------------------+------------------+------------------+
|  count|               84337|            96433|             96433|             96433|             96433|             96433|
|   mean|0.007861318282604314|9.679389835429792| 82.53757531135606|1013.2395393693074|10.417460827724948|206.54900293468003|
| stddev|  0.1335324447351233|5.112596853171862|12.008338946169673|12.280792114870632| 5.298851739805074| 82.48403133479167|
|    min|                   0|            -11.5|                25|             954.3|               0.0|                 0|
|    max|                  12|             26.3|               100|            1048.2|              44.0|               360|


In [None]:
finalDF12.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in finalDF12.columns]).show()

+-------+-------+-------+-------+------+-------+--------+
|TS_date|rain_DA|temp_DA|rhum_DA|msl_DA|wdsp_DA|wddir_DA|
+-------+-------+-------+-------+------+-------+--------+
|      0|  12096|      0|      0|     0|      0|       0|
+-------+-------+-------+-------+------+-------+--------+



In [None]:
finalDF12 = finalDF12.fillna({'rain_DA':0})

In [None]:
finalDF12.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in finalDF12.columns]).show()

+-------+-------+-------+-------+------+-------+--------+
|TS_date|rain_DA|temp_DA|rhum_DA|msl_DA|wdsp_DA|wddir_DA|
+-------+-------+-------+-------+------+-------+--------+
|      0|      0|      0|      0|     0|      0|       0|
+-------+-------+-------+-------+------+-------+--------+



In [None]:
 import os
 arr = os.listdir('/content/drive/MyDrive/Colab datasets/weatherdata2')
 print(arr)

['phoenix park.csv', 'OAK PARK.csv', 'SHANNON AIRPORT.csv', 'DUBLIN AIRPORT.csv', 'MOORE PARK.csv', 'SherkinIsland.csv', 'MULLINGAR.csv', 'MALIN HEAD.csv', 'JOHNSTOWN.csv', 'MT DILLON.csv', 'FINNER.csv', 'CLAREMORRIS.csv', 'VALENTIA OBSERVATORY.csv', 'BELMULLET.csv', 'CASEMENT.csv', 'CORK AIRPORT.csv', 'KNOCK AIRPORT.csv', 'M2.csv', 'M3.csv', 'M4.csv', 'M5.csv', 'M6.csv', 'Newport.csv', 'Roches Point.csv', 'ATHENRY.csv']
