In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

## Read the data

In [26]:
df=spark.read.csv([\
'data/2019/part-00000-890686c0-c142-4c69-a744-dfdc9eca7df4-c000.csv.gz',\
'data/2019/part-00001-890686c0-c142-4c69-a744-dfdc9eca7df4-c000.csv.gz',\
'data/2019/part-00002-890686c0-c142-4c69-a744-dfdc9eca7df4-c000.csv.gz',\
'data/2019/part-00003-890686c0-c142-4c69-a744-dfdc9eca7df4-c000.csv.gz',\
'data/2019/part-00004-890686c0-c142-4c69-a744-dfdc9eca7df4-c000.csv.gz']\
                ,header=True, inferSchema=True) 
df.show(10)

+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
| 10260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7| 29.8|21.7*|0.02G| 18.5|  1000|
| 10260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1*| 20.7|0.48G| 22.8|  1000|
| 10260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4*|26.8*|0.25G|999.9| 11000|
| 10260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9| 36.1| 31.8|0.52G|999.9|  1000|
| 10260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|38.5*| 32.7|0.02G| 23.6| 10000|
| 10260|99999|20190106|38.5|34.1|1008.2| 994.2| 12.8|10.0| 17.5|28.9| 41.4|33.8*|0.12G| 23.2| 10000|
| 10260|99999|20190107|32.1|29.8| 996.8| 982.7|  6.9|11.3| 15.5|28.6|35.1*| 30.4|0.00G|999.

In [54]:
df.printSchema()

root
 |-- COUNTRY_ABBR: string (nullable = true)
 |-- STN_NO: integer (nullable = true)
 |-- WBAN: integer (nullable = true)
 |-- YEARMODA: integer (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: double (nullable = true)
 |-- FRSHTT: integer (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)



### Change column name to make it easier to process

In [28]:
df=df.withColumnRenamed('STN---', 'STN_NO')
df.show(3)

+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
|STN_NO| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
| 10260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7| 29.8|21.7*|0.02G| 18.5|  1000|
| 10260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1*| 20.7|0.48G| 22.8|  1000|
| 10260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4*|26.8*|0.25G|999.9| 11000|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
only showing top 3 rows



### Read stations

In [29]:
stations=spark.read.csv("stationlist.csv", header=True, inferSchema=True)
stations.show(5)

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|012240|          NO|
|020690|          SW|
|020870|          SW|
|021190|          SW|
|032690|          UK|
+------+------------+
only showing top 5 rows



In [30]:
print(df.count())

4158416


In [31]:
df=df.join(stations, ["STN_NO"], "left")
df.show(5)

+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+------------+
|STN_NO| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|COUNTRY_ABBR|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+------------+
| 10260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7| 29.8|21.7*|0.02G| 18.5|  1000|          NO|
| 10260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1*| 20.7|0.48G| 22.8|  1000|          NO|
| 10260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4*|26.8*|0.25G|999.9| 11000|          NO|
| 10260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9| 36.1| 31.8|0.52G|999.9|  1000|          NO|
| 10260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|38.5*| 32.7|0.02G| 23.6| 10000|          NO|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-

### Read Countries

In [32]:
countries=spark.read.csv("countrylist.csv", header=True, inferSchema=True)
countries.show(5)

+------------+-------------------+
|COUNTRY_ABBR|       COUNTRY_FULL|
+------------+-------------------+
|          AA|              ARUBA|
|          AC|ANTIGUA AND BARBUDA|
|          AF|        AFGHANISTAN|
|          AG|            ALGERIA|
|          AI|   ASCENSION ISLAND|
+------------+-------------------+
only showing top 5 rows



In [33]:
df=df.join(countries, ["COUNTRY_ABBR"], "left")
df.show(5)

+------------+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+------------+
|COUNTRY_ABBR|STN_NO| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|COUNTRY_FULL|
+------------+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+------------+
|          NO| 10260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7| 29.8|21.7*|0.02G| 18.5|  1000|      NORWAY|
|          NO| 10260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1*| 20.7|0.48G| 22.8|  1000|      NORWAY|
|          NO| 10260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4*|26.8*|0.25G|999.9| 11000|      NORWAY|
|          NO| 10260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9| 36.1| 31.8|0.52G|999.9|  1000|      NORWAY|
|          NO| 10260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|38.5*| 32.7|0.02G| 23.6| 1000

## Drop missing data

### Data Exploration
Query some random columns to get a sense of which all data is missing 

In [34]:
df.filter((df.TEMP ==9999.9) ).show(10)

+------------+------+----+--------+----+----+---+---+-----+----+-----+----+---+---+----+----+------+------------+
|COUNTRY_ABBR|STN_NO|WBAN|YEARMODA|TEMP|DEWP|SLP|STP|VISIB|WDSP|MXSPD|GUST|MAX|MIN|PRCP|SNDP|FRSHTT|COUNTRY_FULL|
+------------+------+----+--------+----+----+---+---+-----+----+-----+----+---+---+----+----+------+------------+
+------------+------+----+--------+----+----+---+---+-----+----+-----+----+---+---+----+----+------+------------+



In [35]:
from pyspark.sql.functions import mean as _mean, min as _min, max as _max,col

df_stats = df.select(
    _min(col('TEMP')).alias('min'),
    _max(col('TEMP')).alias('max'),
).collect()

print('Data is between' ,df_stats[0]['min'], 'and', df_stats[0]['max'])

Data is between -114.7 and 110.0


In [56]:
df.filter((df.DEWP ==9999.9) ).show(5)

+------------+------+-----+--------+----+------+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+------------+
|COUNTRY_ABBR|STN_NO| WBAN|YEARMODA|TEMP|  DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|COUNTRY_FULL|
+------------+------+-----+--------+----+------+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+------------+
|          SW| 22190|99999|20190105|33.8|9999.9|1021.1| 953.2|  5.6| 5.2|  7.8| 15.3| 35.4| 33.1|0.13G|999.9|110000|      SWEDEN|
|          NO| 13730|99999|20191124|32.7|9999.9|1021.4|1000.4|999.9| 0.8|  1.9|999.9|33.3*|32.4*|0.00G|999.9|100000|      NORWAY|
|          SW| 24310|99999|20191128|30.4|9999.9| 988.6| 954.2|  2.2| 2.8|  5.8| 11.3| 32.0|25.7*|0.19G|999.9|  1000|      SWEDEN|
|          SW| 24310|99999|20191129|24.9|9999.9| 995.6| 960.8| 24.3| 4.2|  7.8| 16.9| 28.8| 23.4|0.29G|999.9|  1000|      SWEDEN|
|          SW| 24310|99999|20191130|23.0|9999.9|1012.1| 976.6| 29.9| 3.5|  6.4| 17.3| 28.2

In [37]:
df.filter((df.SLP ==9999.9) ).show(5)

+------------+------+-----+--------+----+----+------+-----+-----+----+-----+-----+-----+-----+-----+-----+------+------------+
|COUNTRY_ABBR|STN_NO| WBAN|YEARMODA|TEMP|DEWP|   SLP|  STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|COUNTRY_FULL|
+------------+------+-----+--------+----+----+------+-----+-----+----+-----+-----+-----+-----+-----+-----+------+------------+
|          SI|140080|99999|20190101|25.3|12.3|9999.9|750.0| 49.1|16.8| 22.5| 38.7| 31.8| 18.7|0.00A|999.9|     0|    SLOVENIA|
|          SI|140080|99999|20190102| 7.8| 1.4|9999.9|741.5| 28.5|21.8| 35.0| 64.3|18.7*|-0.2*|0.02G|  9.8|101000|    SLOVENIA|
|          SI|140080|99999|20190103| 0.3|-7.0|9999.9|742.5| 41.0|25.2| 40.2| 70.5|  2.1| -0.8|0.00G|  9.8|101000|    SLOVENIA|
|          SI|140080|99999|20190104| 3.3|-1.0|9999.9|745.3| 35.4|16.8| 26.4| 52.1|  9.1| -0.9|0.00G|  9.8|101000|    SLOVENIA|
|          SI|140080|99999|20190105|15.1| 9.1|9999.9|743.0| 46.9|20.5| 31.1|999.9| 19.8|  6.4|0.03G|  9.8|  100

In [39]:
df.filter((df.GUST ==999.9) ).show(5)

+------------+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+------------+
|COUNTRY_ABBR|STN_NO| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|COUNTRY_FULL|
+------------+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+------------+
|          NO| 10260|99999|20190116|24.8|21.8| 992.5| 978.2| 33.4| 2.7|  5.8|999.9| 26.1| 23.5|0.06G| 35.4|  1000|      NORWAY|
|          NO| 10260|99999|20190124|14.4|11.5|1019.1|1004.2| 44.7| 2.6|  3.9|999.9|17.2*| 12.2|0.00G| 40.9|     0|      NORWAY|
|          NO| 10260|99999|20190208|18.3|16.5| 997.8| 983.3| 45.4| 2.4|  5.8|999.9|24.8*| 14.4|0.00G| 37.8|     0|      NORWAY|
|          NO| 10260|99999|20190209|15.5|13.8| 993.4| 978.9| 41.6| 1.8|  3.7|999.9| 21.6|13.1*|0.00G| 37.4|     0|      NORWAY|
|          NO| 10260|99999|20190314|27.2|20.6| 990.4| 976.2| 22.1| 3.3|  5.8|999.9| 34.0|23.7*|0.07G|999

In [40]:
print(df.filter((df.DEWP ==9999.9) ).count())

201147


### Define a replace function and replace all Missing data with Null

In [61]:
from pyspark.sql.functions import when, lit, col

def replace1(column, value):
    return when(column != value, column).otherwise(lit(None))

df=df.withColumn("TEMP", replace1(col("TEMP"), 9999.9)).\
withColumn("DEWP", replace1(col("DEWP"), 9999.9)).\
withColumn("SLP", replace1(col("SLP"), 9999.9)).\
withColumn("STP", replace1(col("STP"), 9999.9)).\
withColumn("VISIB", replace1(col("VISIB"), 999.9)).\
withColumn("WDSP", replace1(col("WDSP"), 999.9)).\
withColumn("MXSPD", replace1(col("MXSPD"), 999.9)).\
withColumn("GUST", replace1(col("GUST"), 999.9)).\
withColumn("MAX", replace1(col("MAX"), 9999.9)).\
withColumn("MIN", replace1(col("MIN"), 9999.9)).\
withColumn("PRCP", replace1(col("PRCP"), 99.9)).\
withColumn("SNDP", replace1(col("SNDP"), 999.9))

df.show(10)

+------------+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+------------+
|COUNTRY_ABBR|STN_NO| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST| MAX| MIN|PRCP|SNDP|FRSHTT|COUNTRY_FULL|
+------------+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+------------+
|          NO| 10260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7|29.8|null|null|18.5|  1000|      NORWAY|
|          NO| 10260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|null|20.7|null|22.8|  1000|      NORWAY|
|          NO| 10260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|null|null|null|null| 11000|      NORWAY|
|          NO| 10260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9|36.1|31.8|null|null|  1000|      NORWAY|
|          NO| 10260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|null|32.7|null|23.6| 10000|      NORWAY|
|          NO| 1

## 1. Which country had the hottest average mean temperature over the year?
We will first calculate the average mean temp for each station and then aggregate over the country

In [62]:
from pyspark.sql.functions import sum,avg,max,min,mean,count, first, last, countDistinct
grpdf1=df.groupby('STN_NO', 'COUNTRY_FULL')\
    .agg(avg("TEMP").alias("mean_temp_station"))
grpdf1.show(10)

+------+-------------+------------------+
|STN_NO| COUNTRY_FULL| mean_temp_station|
+------+-------------+------------------+
|727755|UNITED STATES| 41.55601092896176|
|876450|    ARGENTINA| 56.35643835616438|
|110220|      AUSTRIA|52.682142857142814|
|726435|UNITED STATES|43.326502732240414|
|270370|       RUSSIA| 40.50821917808217|
|889630|   ANTARCTICA| 23.83972602739725|
|713070|       CANADA|47.952876712328774|
|122800|       POLAND|49.255616438356135|
| 42340|    GREENLAND|29.745604395604396|
| 13850|       NORWAY|41.818356164383566|
+------+-------------+------------------+
only showing top 10 rows



### Now lets aggregate over country

In [65]:
grpdf2=grpdf1.groupby('COUNTRY_FULL')\
    .agg(avg("mean_temp_station").alias("mean_temp_country"))
grpdf2.sort(grpdf2.mean_temp_country.desc()).show(20)

+--------------------+-----------------+
|        COUNTRY_FULL|mean_temp_country|
+--------------------+-----------------+
|            DJIBOUTI|90.06114457831325|
|               SUDAN|88.88602786294095|
|                CHAD| 88.1159782741869|
|               NIGER|85.06016337723437|
|         EL SALVADOR|  84.539789278576|
| JUAN DE NOVA ISLAND|84.15945945945947|
|             TOKELAU|83.96142857142854|
|BRITISH INDIAN OC...|83.91150684931505|
|       GUINEA-BISSAU|83.79837994935542|
|      CAYMAN ISLANDS|83.77186578336705|
|        BURKINA FASO|83.76038111925736|
|            MALDIVES|83.67879452054797|
|         GAMBIA  THE|83.64384615384616|
|           SINGAPORE|83.63278538812786|
|              TUVALU|83.51711850051552|
|            CAMBODIA|83.49888303477346|
|          MICRONESIA|83.43944358080198|
|            KIRIBATI| 83.3898426671738|
|ST. VINCENT AND T...|83.28311111111111|
|             COMOROS|83.27314788929272|
+--------------------+-----------------+
only showing top

### Answer DJIBOUTI has the hottest average mean temperature over the year

## 3. Which country had the second highest average mean wind speed over the year?
Similar to above we will first calculate the average mean temp for each station and then aggregate over the country

In [66]:
from pyspark.sql.functions import sum,avg,max,min,mean,count, first, last, countDistinct
grpdf3=df.groupby('STN_NO', 'COUNTRY_FULL')\
    .agg(avg("WDSP").alias("mean_windspeed_station"))
grpdf3.show(10)

+------+-------------+----------------------+
|STN_NO| COUNTRY_FULL|mean_windspeed_station|
+------+-------------+----------------------+
|727755|UNITED STATES|     8.314207650273223|
|876450|    ARGENTINA|      7.92438356164384|
|110220|      AUSTRIA|     7.207417582417587|
|726435|UNITED STATES|     6.211202185792346|
|270370|       RUSSIA|     5.137087912087906|
|889630|   ANTARCTICA|    13.710684931506844|
|713070|       CANADA|      6.57616438356164|
|122800|       POLAND|     7.201643835616438|
| 42340|    GREENLAND|     6.426923076923077|
| 13850|       NORWAY|     4.076438356164382|
+------+-------------+----------------------+
only showing top 10 rows



### Now lets aggregate over country

In [68]:
grpdf4=grpdf3.groupby('COUNTRY_FULL')\
    .agg(avg("mean_windspeed_station").alias("mean_windspeed_country"))
grpdf4.sort(grpdf4.mean_windspeed_country.desc()).show(20)

+--------------------+----------------------+
|        COUNTRY_FULL|mean_windspeed_country|
+--------------------+----------------------+
|FALKLAND ISLANDS ...|    17.757090064203215|
|               ARUBA|    15.975683060109283|
|       FAROE ISLANDS|    15.975523290644825|
|FRENCH SOUTHERN A...|    15.764206899383856|
|            BARBADOS|    14.097540983606562|
|ST. PIERRE AND MI...|     13.90767123287671|
|          CAPE VERDE|    13.723272057202525|
|     TROMELIN ISLAND|    13.005277777777751|
|          ST. HELENA|    12.727092749187655|
|             SOMALIA|    12.244368239674623|
|          ANTARCTICA|    12.083258399284333|
|COCOS (KEELING) I...|    12.054545454545453|
|            GUERNSEY|    12.021826610156673|
|        MAN  ISLE OF|    11.893424657534247|
|ST. VINCENT AND T...|    11.862222222222222|
|          MONTSERRAT|    11.808813559322036|
|             ICELAND|     11.69694194369551|
|      WESTERN SAHARA|    11.629203249998602|
|           ST. LUCIA|    11.56990

### Answer ARUBA had the second highest average mean wind speed over the year

## 2. Which country had the most consecutive days of tornadoes/funnel cloud formations?

### Last digit in FRSHTT sgnifies tornado formation. It can be extracted by taking %10

In [71]:
df=df.withColumn("tornado", df.FRSHTT % 10)
df.show(5)

+------------+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+------------+-------+
|COUNTRY_ABBR|STN_NO| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST| MAX| MIN|PRCP|SNDP|FRSHTT|COUNTRY_FULL|tornado|
+------------+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+------------+-------+
|          NO| 10260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7|29.8|null|null|18.5|  1000|      NORWAY|      0|
|          NO| 10260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|null|20.7|null|22.8|  1000|      NORWAY|      0|
|          NO| 10260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|null|null|null|null| 11000|      NORWAY|      0|
|          NO| 10260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9|36.1|31.8|null|null|  1000|      NORWAY|      0|
|          NO| 10260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|n

In [70]:
df.filter((df.tornado ==1) ).show(5)

+------------+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+------------+-------+
|COUNTRY_ABBR|STN_NO| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST| MAX| MIN|PRCP|SNDP|FRSHTT|COUNTRY_FULL|tornado|
+------------+------+-----+--------+----+----+------+------+-----+----+-----+----+----+----+----+----+------+------------+-------+
|          AU|110300|99999|20190616|71.3|60.4|1015.0| 994.4| 21.6|11.0| 23.3|null|76.3|null|null|null|    11|     AUSTRIA|      1|
|          AU|110350|99999|20190619|74.5|62.5|1011.5| 987.8| 15.9| 4.8|  7.8|null|86.9|64.0|null|null|    11|     AUSTRIA|      1|
|          NO| 10280|99999|20191030|21.7|15.1|1014.8|1012.7| 22.9| 8.8| 15.5|null|null|17.4|null|null|     1|      NORWAY|      1|
|          CA|718270|99999|20190616|38.1|37.2|1006.8| 981.9|  3.2|13.3| 17.1|21.0|null|null|null|null|110001|      CANADA|      1|
|          NO| 14920|99999|20191127|35.1|32.5| 999.5| 987.6| 23.8| 8.0|  9.7|null|3

### Aggregate (sum) over date and country to find if there was tornado on the date in the country at any station
Tornado on that day if sum>1

In [72]:
from pyspark.sql.functions import sum,avg,max,min,mean,count, first, last, countDistinct
grpdf5=df.groupby('YEARMODA', 'COUNTRY_FULL')\
    .agg(sum("tornado").alias("number_of_tornado_on_day"))
grpdf5.show(10)

+--------+------------+------------------------+
|YEARMODA|COUNTRY_FULL|number_of_tornado_on_day|
+--------+------------+------------------------+
|20190422|      NORWAY|                       0|
|20191002|      NORWAY|                       0|
|20190326|    SLOVENIA|                       0|
|20190907|    SLOVENIA|                       0|
|20190422|     AUSTRIA|                       0|
|20190724|     AUSTRIA|                       0|
|20190515|     FINLAND|                       0|
|20190715|     FINLAND|                       0|
|20190725|     FINLAND|                       0|
|20190917|     FINLAND|                       0|
+--------+------------+------------------------+
only showing top 10 rows



In [73]:
grpdf5.filter((grpdf5.number_of_tornado_on_day >1) ).show(5)

+--------+--------------+------------------------+
|YEARMODA|  COUNTRY_FULL|number_of_tornado_on_day|
+--------+--------------+------------------------+
|20191209| UNITED STATES|                       2|
|20190811|        CANADA|                       2|
|20191109|         ITALY|                       2|
|20190827| UNITED STATES|                       2|
|20190421|CAYMAN ISLANDS|                       2|
+--------+--------------+------------------------+
only showing top 5 rows



### Replace with 1 if number_of_tornado_on_day>0 (to mark there was a tornado in the country)


In [76]:
from pyspark.sql.functions import when, lit, col

def replace2(column):
    return when(column > 0, 1).otherwise(0)

grpdf5=grpdf5.withColumn("number_of_tornado_on_day", replace2(col("number_of_tornado_on_day")))
grpdf5.show(5)

+--------+------------+------------------------+
|YEARMODA|COUNTRY_FULL|number_of_tornado_on_day|
+--------+------------+------------------------+
|20190422|      NORWAY|                       0|
|20191002|      NORWAY|                       0|
|20190326|    SLOVENIA|                       0|
|20190907|    SLOVENIA|                       0|
|20190422|     AUSTRIA|                       0|
+--------+------------+------------------------+
only showing top 5 rows



In [79]:
grpdf5.filter((grpdf5.number_of_tornado_on_day >1) ).show(5)

+--------+------------+------------------------+
|YEARMODA|COUNTRY_FULL|number_of_tornado_on_day|
+--------+------------+------------------------+
+--------+------------+------------------------+



### Sum the consecutive days for every country

In [83]:
from pyspark.sql import Window
grpdf5 = grpdf5.withColumn("sum_number_of_tornado_on_day", sum('number_of_tornado_on_day').over(Window.partitionBy('COUNTRY_FULL').orderBy('YEARMODA')))
grpdf5.show(10)

+--------+------------+------------------------+----------------------------+
|YEARMODA|COUNTRY_FULL|number_of_tornado_on_day|sum_number_of_tornado_on_day|
+--------+------------+------------------------+----------------------------+
|20190101|     ARMENIA|                       0|                           0|
|20190102|     ARMENIA|                       0|                           0|
|20190103|     ARMENIA|                       0|                           0|
|20190104|     ARMENIA|                       0|                           0|
|20190105|     ARMENIA|                       0|                           0|
|20190106|     ARMENIA|                       0|                           0|
|20190107|     ARMENIA|                       0|                           0|
|20190108|     ARMENIA|                       0|                           0|
|20190109|     ARMENIA|                       0|                           0|
|20190110|     ARMENIA|                       0|                

In [84]:
grpdf5.filter((grpdf5.COUNTRY_FULL =='UNITED STATES') ).sort(grpdf5.YEARMODA).show(20)

+--------+-------------+------------------------+----------------------------+
|YEARMODA| COUNTRY_FULL|number_of_tornado_on_day|sum_number_of_tornado_on_day|
+--------+-------------+------------------------+----------------------------+
|20190101|UNITED STATES|                       0|                           0|
|20190102|UNITED STATES|                       0|                           0|
|20190103|UNITED STATES|                       0|                           0|
|20190104|UNITED STATES|                       0|                           0|
|20190105|UNITED STATES|                       0|                           0|
|20190106|UNITED STATES|                       0|                           0|
|20190107|UNITED STATES|                       0|                           0|
|20190108|UNITED STATES|                       0|                           0|
|20190109|UNITED STATES|                       0|                           0|
|20190110|UNITED STATES|                       0|   

### COunt how many time same value of 'sum_number_of_tornado_on_day' appeared 

In [96]:
grpdf6=grpdf5.groupby('COUNTRY_FULL', 'sum_number_of_tornado_on_day')\
    .agg(count("sum_number_of_tornado_on_day").alias("count_cons_tornado"))
grpdf6.show(20)

+--------------------+----------------------------+------------------+
|        COUNTRY_FULL|sum_number_of_tornado_on_day|count_cons_tornado|
+--------------------+----------------------------+------------------+
|             ARMENIA|                           0|               365|
|        SOUTH AFRICA|                           0|               365|
|               BURMA|                           0|               365|
|            CAMBODIA|                           0|               365|
|          BANGLADESH|                           0|               233|
|          BANGLADESH|                           1|               132|
|               JAPAN|                           0|                16|
|               JAPAN|                           1|                 7|
|               JAPAN|                           2|                37|
|               JAPAN|                           3|                31|
|               JAPAN|                           4|                69|
|     

In [97]:
grpdf6.filter((grpdf6.COUNTRY_FULL =='UNITED STATES') ).sort(grpdf6.count_cons_tornado).show(10)

+-------------+----------------------------+------------------+
| COUNTRY_FULL|sum_number_of_tornado_on_day|count_cons_tornado|
+-------------+----------------------------+------------------+
|UNITED STATES|                           1|                 1|
|UNITED STATES|                          16|                 1|
|UNITED STATES|                           9|                 2|
|UNITED STATES|                          12|                 2|
|UNITED STATES|                           8|                 3|
|UNITED STATES|                          27|                 3|
|UNITED STATES|                          15|                 3|
|UNITED STATES|                           7|                 3|
|UNITED STATES|                          13|                 4|
|UNITED STATES|                           6|                 4|
+-------------+----------------------------+------------------+
only showing top 10 rows



### Filter values where count_cons_tornado is 1
The count keeps changing as consecutive days hae tornado=1. So count==1 will give the max number of consecutive days


In [98]:
grpdf6=grpdf6.filter((grpdf6.count_cons_tornado ==1) ).sort(grpdf6.sum_number_of_tornado_on_day.desc())
grpdf6.show(10)

+--------------+----------------------------+------------------+
|  COUNTRY_FULL|sum_number_of_tornado_on_day|count_cons_tornado|
+--------------+----------------------------+------------------+
|         ITALY|                          22|                 1|
|CAYMAN ISLANDS|                          21|                 1|
|   BAHAMAS THE|                          16|                 1|
| UNITED STATES|                          16|                 1|
|         ITALY|                          14|                 1|
|         JAPAN|                           9|                 1|
|         INDIA|                           6|                 1|
|         JAPAN|                           5|                 1|
|         GHANA|                           4|                 1|
|        CANADA|                           4|                 1|
+--------------+----------------------------+------------------+
only showing top 10 rows



### Answer: Italy had max 22 days of tornado