In [1]:
from pyspark.sql.types import IntegerType, BooleanType
from pyspark.sql.functions import udf

In [2]:
weather_data = spark.read.csv('data/2019', header=True, inferSchema=True)
station_list = spark.read.csv('stationlist.csv', header=True, inferSchema=True)
country_list = spark.read.csv('countrylist.csv', header=True, inferSchema=True)

In [3]:
weather_data.printSchema()

root
 |-- STN---: integer (nullable = true)
 |-- WBAN: integer (nullable = true)
 |-- YEARMODA: integer (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: double (nullable = true)
 |-- FRSHTT: integer (nullable = true)



In [4]:
# UDFs to extract year, month and date in weather data

def extract_tornados(ind):
    data = str(ind)
    if len(data) > 4:
        return data[4] == "1"
    else:
        return False

extract_year = udf(lambda date: int(str(date)[:4]), IntegerType())
extract_month = udf(lambda date: int(str(date)[4:6]), IntegerType())
extract_day = udf(lambda date: int(str(date)[6:]), IntegerType())
extract_tornado = udf(extract_tornados, BooleanType())

weather_data = weather_data.select('STN---', 'TEMP', 'WDSP', 'FRSHTT', 'YEARMODA')

weather_data = (weather_data.withColumn(
                            'YEAR', extract_year('YEARMODA')
                            ).withColumn(
                            'MONTH', extract_month('YEARMODA')
                            ).withColumn(
                            'DAY', extract_day('YEARMODA')
                            ).withColumn(
                                "Tornadoes", extract_tornado('FRSHTT')
                            ).drop('FRSHTT')
                )

In [5]:
country_stations = country_list.join(station_list, 
                  country_list.COUNTRY_ABBR == station_list.COUNTRY_ABBR,
                 'inner').select(country_list.COUNTRY_FULL, 
                                 country_list.COUNTRY_ABBR,
                                 station_list.STN_NO)

In [6]:
final_weather_data = country_stations.join(weather_data, 
                      country_stations.STN_NO == weather_data['STN---'],
                     'inner').drop(weather_data['STN---'])

In [7]:
final_weather_data.show(2)

+------------+------------+------+----+----+----+-----+---+---------+
|COUNTRY_FULL|COUNTRY_ABBR|STN_NO|TEMP|WDSP|YEAR|MONTH|DAY|Tornadoes|
+------------+------------+------+----+----+----+-----+---+---------+
|      NORWAY|          NO|013840|38.6|11.7|2019|    1|  1|    false|
|      NORWAY|          NO|013840|30.5| 8.0|2019|    1|  2|    false|
+------------+------------+------+----+----+----+-----+---+---------+
only showing top 2 rows



In [9]:
"""
Which country had the most consecutive days of tornadoes/funnel cloud
formations?


"""
window = Window.partitionBy("COUNTRY_FULL").orderby("YEARMODA")
processed_data = (final_weather_data.withColumn(
                    "lag_date",  lag("YEARMODA", 1, "").over(w1))
                        .withColumn(
                    "lag_val", lag("Tornadoes", 1, "").over(w1))
                  .withColumn("index", sum("jump").over(w1))
                  .withColumn('RowCount', )
                 )


'\nWhich country had the most consecutive days of tornadoes/funnel cloud\nformations?\n\n\n'

In [20]:
def get_mean_average_sorted(df, mean_col, group_by_col, missing_val):
    """
    Given dataframe it applies group by operation on group_by_col and 
    computes mean of the mean_col. Returns data sorted in descending order
    on final average of mean_col.
    
    Arguments:
        dataframe spark.DataFrame
            Dataframe on which the mean operation is to be applied on
        mean_col string
            Column on which mean is to be computed on
        group_by_col list(string)
            String of column by which the group by is supposed to happen
        missing_val int
            Missing value replacement (used to filter data)
    
    Returns:
        aggregated_df spark.DataFrame
            Aggregated spark dataframe sorted in descending order
    """
    aggregated_df = (df.filter(
                    df[mean_col] != missing_val
                ).groupby(
                    group_by_col
                ).mean(
                    mean_col
                ).withColumnRenamed(
                    f"avg({mean_col})",f"mean_{mean_col}"
                )
           )
    return aggregated_df.orderBy(aggregated_df[f"mean_{mean_col}"].desc())

In [22]:
"""
Which country had the hottest average mean temperature over the year?

Approach:
Missing temprature data should be filtered out. Also yearmoda should be
split into 3 different columns each holding a year, month and day.

And average temprature should be computed using groupby query of year and
country and applying aggregation function called mean
"""
average_temp = get_mean_average_sorted(
                    final_weather_data, 
                    'TEMP', 
                    ['YEAR', 'COUNTRY_FULL'],
                    9999.9
                )
country = average_temp.select('COUNTRY_FULL').take(1)
print(f"Country with hottest mean average temprature is {country[0].COUNTRY_FULL}")

Country with hottest mean average temprature is DJIBOUTI


In [23]:
"""
Which country had the second highest average mean wind speed over the year?

Approach:
Missing wind speed data should be filtered out.
Then average winds spped over the year can be computed as 
groupby query of year and country 
and applying aggregation function called mean
"""
average_wind_speed = get_mean_average_sorted(
                    final_weather_data, 
                    'WDSP', 
                    ['YEAR', 'COUNTRY_FULL'],
                    999.9
                )
country = average_wind_speed.select('COUNTRY_FULL').take(2)
print(f"Country with second highest mean wind speed over the year is {country[1].COUNTRY_FULL}")

Country with second highest mean wind speed over the year is BERMUDA
