# Wind Statistics

### Introduction:

The data have been modified to contain some missing values, identified by NaN.  
Using pandas should make this exercise
easier, in particular for the bonus question.

You should be able to perform all of these operations without using
a for loop or other looping construct.


1. The data in 'wind.data' has the following format:

In [1]:
"""
Yr Mo Dy   RPT   VAL   ROS   KIL   SHA   BIR   DUB   CLA   MUL   CLO   BEL   MAL
61  1  1 15.04 14.96 13.17  9.29   NaN  9.87 13.67 10.25 10.83 12.58 18.50 15.04
61  1  2 14.71   NaN 10.83  6.50 12.62  7.67 11.50 10.04  9.79  9.67 17.54 13.83
61  1  3 18.50 16.88 12.33 10.13 11.17  6.17 11.25   NaN  8.50  7.67 12.75 12.71
"""

'\nYr Mo Dy   RPT   VAL   ROS   KIL   SHA   BIR   DUB   CLA   MUL   CLO   BEL   MAL\n61  1  1 15.04 14.96 13.17  9.29   NaN  9.87 13.67 10.25 10.83 12.58 18.50 15.04\n61  1  2 14.71   NaN 10.83  6.50 12.62  7.67 11.50 10.04  9.79  9.67 17.54 13.83\n61  1  3 18.50 16.88 12.33 10.13 11.17  6.17 11.25   NaN  8.50  7.67 12.75 12.71\n'

   The first three columns are year, month and day.  The
   remaining 12 columns are average windspeeds in knots at 12
   locations in Ireland on that day.   

   More information about the dataset go [here](wind.desc).

### Step 1. Import the necessary libraries

In [2]:
! sudo apt update
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
! wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
! tar xf spark-3.2.1-bin-hadoop3.2.tgz
! pip install -q findspark
! pip install pyspark
! pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List

import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Ign:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy Release [5,713 B]
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy Release.gpg [793 B]
Get:9 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:13 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,555 kB]
Get:14 https://r2u.stat.illinois.

### Step 2. Import the dataset from this [address](https://raw.githubusercontent.com/guipsamora/pandas_exercises/master/06_Stats/Wind_Stats/wind.data)

In [67]:
with open('/content/wind.data', 'r') as f:
    lines = f.readlines()

with open('/content/wind.data', 'w') as f:
    for line in lines:
        f.write(' '.join(line.split()) + '\n')

In [68]:
df = spark.read.csv("/content/wind.data", inferSchema=True, header=True, sep=" ")

In [69]:
df.show()

+---+---+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
| Yr| Mo| Dy|  RPT|  VAL|  ROS|  KIL|  SHA|  BIR|  DUB|  CLA|  MUL|  CLO|  BEL|  MAL|
+---+---+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
| 61|  1|  1|15.04|14.96|13.17| 9.29|  NaN| 9.87|13.67|10.25|10.83|12.58| 18.5|15.04|
| 61|  1|  2|14.71|  NaN|10.83|  6.5|12.62| 7.67| 11.5|10.04| 9.79| 9.67|17.54|13.83|
| 61|  1|  3| 18.5|16.88|12.33|10.13|11.17| 6.17|11.25|  NaN|  8.5| 7.67|12.75|12.71|
| 61|  1|  4|10.58| 6.63|11.75| 4.58| 4.54| 2.88| 8.63| 1.79| 5.83| 5.88| 5.46|10.88|
| 61|  1|  5|13.33|13.25|11.42| 6.17|10.71| 8.21|11.92| 6.54|10.92|10.34|12.92|11.83|
| 61|  1|  6|13.21| 8.12| 9.96| 6.67| 5.37|  4.5|10.67| 4.42| 7.17|  7.5| 8.12|13.17|
| 61|  1|  7| 13.5|14.29|  9.5| 4.96|12.29| 8.33| 9.17| 9.29| 7.58| 7.96|13.96|13.79|
| 61|  1|  8|10.96| 9.75| 7.62| 5.91| 9.62| 7.29|14.29| 7.62| 9.25|10.46|16.62|16.46|
| 61|  1|  9|12.58|10.83| 10.0| 4.75|10.37| 6.79| 8.04

### Step 3. Assign it to a variable called data and replace the first 3 columns by a proper datetime index.


In [70]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, concat, lit, when

# Assuming you already have a SparkSession created and a DataFrame 'df' with columns 'Yr', 'Mo', 'Dy', etc.

df = df.withColumn("Yr_last_two", df["Yr"] % 100)
df = df.withColumn("Century", when(df["Yr"] < 70, lit("19")))
df = df.withColumn("Date_str", concat(df["Century"], df["Yr_last_two"], lit("-"), df["Mo"], lit("-"), df["Dy"]))
df = df.withColumn("Datetime", to_timestamp("Date_str", "yyyy-M-d"))

df = df.drop("Yr", "Mo", "Dy", "Date_str", "Yr_last_two", "Century")

df.show()

+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------------------+
|  RPT|  VAL|  ROS|  KIL|  SHA|  BIR|  DUB|  CLA|  MUL|  CLO|  BEL|  MAL|           Datetime|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------------------+
|15.04|14.96|13.17| 9.29|  NaN| 9.87|13.67|10.25|10.83|12.58| 18.5|15.04|1961-01-01 00:00:00|
|14.71|  NaN|10.83|  6.5|12.62| 7.67| 11.5|10.04| 9.79| 9.67|17.54|13.83|1961-01-02 00:00:00|
| 18.5|16.88|12.33|10.13|11.17| 6.17|11.25|  NaN|  8.5| 7.67|12.75|12.71|1961-01-03 00:00:00|
|10.58| 6.63|11.75| 4.58| 4.54| 2.88| 8.63| 1.79| 5.83| 5.88| 5.46|10.88|1961-01-04 00:00:00|
|13.33|13.25|11.42| 6.17|10.71| 8.21|11.92| 6.54|10.92|10.34|12.92|11.83|1961-01-05 00:00:00|
|13.21| 8.12| 9.96| 6.67| 5.37|  4.5|10.67| 4.42| 7.17|  7.5| 8.12|13.17|1961-01-06 00:00:00|
| 13.5|14.29|  9.5| 4.96|12.29| 8.33| 9.17| 9.29| 7.58| 7.96|13.96|13.79|1961-01-07 00:00:00|
|10.96| 9.75| 7.62| 5.91| 9.62| 7.29|14.29| 7.62| 9.25|10.46

In [71]:
df.printSchema()

root
 |-- RPT: double (nullable = true)
 |-- VAL: double (nullable = true)
 |-- ROS: double (nullable = true)
 |-- KIL: double (nullable = true)
 |-- SHA: double (nullable = true)
 |-- BIR: double (nullable = true)
 |-- DUB: double (nullable = true)
 |-- CLA: double (nullable = true)
 |-- MUL: double (nullable = true)
 |-- CLO: double (nullable = true)
 |-- BEL: double (nullable = true)
 |-- MAL: double (nullable = true)
 |-- Datetime: timestamp (nullable = true)



### Step 5. Set the right dates as the index. Pay attention at the data type, it should be datetime64[ns].

In [72]:
df = df.withColumn("Datetime", df["Datetime"].cast("timestamp"))
df.show()

+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------------------+
|  RPT|  VAL|  ROS|  KIL|  SHA|  BIR|  DUB|  CLA|  MUL|  CLO|  BEL|  MAL|           Datetime|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------------------+
|15.04|14.96|13.17| 9.29|  NaN| 9.87|13.67|10.25|10.83|12.58| 18.5|15.04|1961-01-01 00:00:00|
|14.71|  NaN|10.83|  6.5|12.62| 7.67| 11.5|10.04| 9.79| 9.67|17.54|13.83|1961-01-02 00:00:00|
| 18.5|16.88|12.33|10.13|11.17| 6.17|11.25|  NaN|  8.5| 7.67|12.75|12.71|1961-01-03 00:00:00|
|10.58| 6.63|11.75| 4.58| 4.54| 2.88| 8.63| 1.79| 5.83| 5.88| 5.46|10.88|1961-01-04 00:00:00|
|13.33|13.25|11.42| 6.17|10.71| 8.21|11.92| 6.54|10.92|10.34|12.92|11.83|1961-01-05 00:00:00|
|13.21| 8.12| 9.96| 6.67| 5.37|  4.5|10.67| 4.42| 7.17|  7.5| 8.12|13.17|1961-01-06 00:00:00|
| 13.5|14.29|  9.5| 4.96|12.29| 8.33| 9.17| 9.29| 7.58| 7.96|13.96|13.79|1961-01-07 00:00:00|
|10.96| 9.75| 7.62| 5.91| 9.62| 7.29|14.29| 7.62| 9.25|10.46

### Step 6. Compute how many values are missing for each location over the entire record.  
#### They should be ignored in all calculations below.

In [73]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, FloatType
numeric_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, (DoubleType, FloatType))]
df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in numeric_cols]).show()

+---+---+---+---+---+---+---+---+---+---+---+---+
|RPT|VAL|ROS|KIL|SHA|BIR|DUB|CLA|MUL|CLO|BEL|MAL|
+---+---+---+---+---+---+---+---+---+---+---+---+
|  6|  3|  2|  5|  2|  0|  3|  2|  3|  1|  0|  4|
+---+---+---+---+---+---+---+---+---+---+---+---+



### Step 7. Compute how many non-missing values there are in total.

In [74]:
non_missing_count = df.select([F.count(F.when(F.col(c).isNotNull(), c)).alias(c) for c in numeric_cols]).collect()[0]
non_missing_count

Row(RPT=6574, VAL=6574, ROS=6574, KIL=6574, SHA=6574, BIR=6574, DUB=6574, CLA=6574, MUL=6574, CLO=6574, BEL=6574, MAL=6574)

### Step 8. Calculate the mean windspeeds of the windspeeds over all the locations and all the times.
#### A single number for the entire dataset.

In [75]:
# replace the nan with 0
df = df.fillna(0)

In [76]:
wind_speed_mean = df.select([F.mean(c).alias(c) for c in numeric_cols]).collect()
wind_speed_mean

[Row(RPT=12.35170368116826, VAL=10.63945695162761, ROS=11.656979008214156, KIL=6.301671737146327, SHA=10.452652874961945, BIR=7.092254335260125, DUB=9.792871919683598, CLA=8.492468816550042, MUL=8.489714024946739, CLO=8.706006997261914, BEL=13.121006997261915, MAL=15.589587770002998)]

### Step 9. Create a DataFrame called loc_stats and calculate the min, max and mean windspeeds and standard deviations of the windspeeds at each location over all the days

#### A different set of numbers for each location.

In [77]:
loc_stats = df.describe()
loc_stats.show()

+-------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+
|summary|              RPT|               VAL|               ROS|               KIL|               SHA|               BIR|              DUB|              CLA|              MUL|              CLO|               BEL|               MAL|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+
|  count|             6574|              6574|              6574|              6574|              6574|              6574|             6574|             6574|             6574|             6574|              6574|              6574|
|   mean|12.35170368116826| 10.63945695162761|11.656979008214156| 6.

### Step 10. Create a DataFrame called day_stats and calculate the min, max and mean windspeed and standard deviations of the windspeeds across all the locations at each day.

#### A different set of numbers for each day.

In [78]:
# day_stats = df.groupBy("Datetime").agg(
#     F.min(F.least(*numeric_cols)).alias("min"),
#     F.max(F.greatest(*numeric_cols)).alias("max"),
#     F.array_agg(*[F.mean(c) for c in numeric_cols]).alias("mean")
# )

# day_stats.show()

### Step 11. Find the average windspeed in January for each location.  
#### Treat January 1961 and January 1962 both as January.

In [80]:
from pyspark.sql.functions import month
january_data = df.filter(month(df["Datetime"]) == 1)

january_avg_windspeed = january_data.select([F.mean(c).alias(c) for c in numeric_cols])

january_avg_windspeed.show()


+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+
|               RPT|               VAL|               ROS|              KIL|               SHA|               BIR|               DUB|              CLA|              MUL|              CLO|               BEL|              MAL|
+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+
|14.394086021505375|12.548566308243732|13.170286738351255|7.474193548387101|11.647634408602153|7.8015770609318995|11.761577060931899|9.432329749103943|8.623978494623662|10.22870967741935|14.233189964157702|16.66146953405018|
+------------------+------------------+------------------+-----------------+------------------+-----

### Step 12. Downsample the record to a yearly frequency for each location.

In [84]:
yearly_data = df.groupBy(F.year("Datetime")).agg(
    *[F.mean(c).alias(c) for c in numeric_cols]
)

yearly_data.sort(F.desc(yearly_data["year(Datetime)"])).show()

+--------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+
|year(Datetime)|               RPT|               VAL|               ROS|               KIL|               SHA|              BIR|               DUB|               CLA|              MUL|               CLO|               BEL|               MAL|
+--------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+
|          1969| 11.16635616438356| 9.723698630136992|            10.902| 5.767972602739723| 9.873917808219185|6.189972602739724| 8.564493150684937|7.7113972602739755|7.924520547945215| 7.754383561643838|12.621232876712329|15.762904109589044|
|          1968|11.835628415

### Step 13. Downsample the record to a monthly frequency for each location.

In [91]:
monthly_data = df.groupBy(F.year("Datetime"), F.month("Datetime")).agg(
    *[F.mean(c).alias(c) for c in numeric_cols]
)

monthly_data.orderBy("year(Datetime)", "month(Datetime)").show()

+--------------+---------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|year(Datetime)|month(Datetime)|               RPT|               VAL|               ROS|              KIL|               SHA|               BIR|               DUB|               CLA|               MUL|               CLO|               BEL|               MAL|
+--------------+---------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|          NULL|           NULL|12.350888348037703|10.617791299056874|11.580733191359897|5.766306662610267| 9.821524186187993| 6.848643139641013| 9.443465165804701| 8.049933069668409| 8.779589291146955| 7.996799513233959

### Step 14. Downsample the record to a weekly frequency for each location.

In [92]:
weekly_data = df.groupBy(F.year("Datetime"), F.weekofyear("Datetime")).agg(
    *[F.mean(c).alias(c) for c in numeric_cols]
)

weekly_data.orderBy("year(Datetime)", "weekofyear(Datetime)").show()

+--------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|year(Datetime)|weekofyear(Datetime)|               RPT|               VAL|               ROS|               KIL|               SHA|               BIR|               DUB|               CLA|               MUL|               CLO|               BEL|               MAL|
+--------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|          NULL|                NULL|12.350888348037703|10.617791299056874|11.580733191359897| 5.766306662610267| 9.821524186187993| 6.848643139641013| 9.443465165804701| 8.049933069668409| 8.7795892911

### Step 15. Calculate the min, max and mean windspeeds and standard deviations of the windspeeds across all locations for each week (assume that the first week starts on January 2 1961) for the first 52 weeks.

In [94]:
from pyspark.sql.window import Window

w = Window.orderBy("Datetime")
df_with_week = df.withColumn("WeekNumber", F.floor(F.datediff(df["Datetime"], F.to_date(F.lit("1961-01-02"))) / 7) + 1)
df_with_week.show()

+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------------------+----------+
|  RPT|  VAL|  ROS|  KIL|  SHA|  BIR|  DUB|  CLA|  MUL|  CLO|  BEL|  MAL|           Datetime|WeekNumber|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------------------+----------+
|15.04|14.96|13.17| 9.29|  0.0| 9.87|13.67|10.25|10.83|12.58| 18.5|15.04|1961-01-01 00:00:00|         0|
|14.71|  0.0|10.83|  6.5|12.62| 7.67| 11.5|10.04| 9.79| 9.67|17.54|13.83|1961-01-02 00:00:00|         1|
| 18.5|16.88|12.33|10.13|11.17| 6.17|11.25|  0.0|  8.5| 7.67|12.75|12.71|1961-01-03 00:00:00|         1|
|10.58| 6.63|11.75| 4.58| 4.54| 2.88| 8.63| 1.79| 5.83| 5.88| 5.46|10.88|1961-01-04 00:00:00|         1|
|13.33|13.25|11.42| 6.17|10.71| 8.21|11.92| 6.54|10.92|10.34|12.92|11.83|1961-01-05 00:00:00|         1|
|13.21| 8.12| 9.96| 6.67| 5.37|  4.5|10.67| 4.42| 7.17|  7.5| 8.12|13.17|1961-01-06 00:00:00|         1|
| 13.5|14.29|  9.5| 4.96|12.29| 8.33| 9.17| 9.29| 7.58|

In [95]:
first_52_weeks = df_with_week.filter(df_with_week["WeekNumber"] <= 52)
first_52_weeks.show()

+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------------------+----------+
|  RPT|  VAL|  ROS|  KIL|  SHA|  BIR|  DUB|  CLA|  MUL|  CLO|  BEL|  MAL|           Datetime|WeekNumber|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------------------+----------+
|15.04|14.96|13.17| 9.29|  0.0| 9.87|13.67|10.25|10.83|12.58| 18.5|15.04|1961-01-01 00:00:00|         0|
|14.71|  0.0|10.83|  6.5|12.62| 7.67| 11.5|10.04| 9.79| 9.67|17.54|13.83|1961-01-02 00:00:00|         1|
| 18.5|16.88|12.33|10.13|11.17| 6.17|11.25|  0.0|  8.5| 7.67|12.75|12.71|1961-01-03 00:00:00|         1|
|10.58| 6.63|11.75| 4.58| 4.54| 2.88| 8.63| 1.79| 5.83| 5.88| 5.46|10.88|1961-01-04 00:00:00|         1|
|13.33|13.25|11.42| 6.17|10.71| 8.21|11.92| 6.54|10.92|10.34|12.92|11.83|1961-01-05 00:00:00|         1|
|13.21| 8.12| 9.96| 6.67| 5.37|  4.5|10.67| 4.42| 7.17|  7.5| 8.12|13.17|1961-01-06 00:00:00|         1|
| 13.5|14.29|  9.5| 4.96|12.29| 8.33| 9.17| 9.29| 7.58|

In [96]:
weekly_stats = first_52_weeks.groupBy("WeekNumber").agg(
    F.min(F.least(*numeric_cols)).alias("min"),
    F.max(F.greatest(*numeric_cols)).alias("max"),
    *[F.mean(c).alias(c + "_mean") for c in numeric_cols],
    *[F.stddev(c).alias(c + "_stddev") for c in numeric_cols]
)

In [97]:
weekly_stats.show()

+----------+----+-----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|WeekNumber| min|  max|          RPT_mean|          VAL_mean|          ROS_mean|          KIL_mean|          SHA_mean|          BIR_mean|          DUB_mean|          CLA_mean|          MUL_mean|          CLO_mean|          BEL_mean|          MAL_mean|        RPT_stddev|        VAL_stddev|        ROS_stddev|        KIL_stddev|        SHA_stddev|        BIR_stddev|        DUB_stddev|        CLA_stddev|        MUL_stddev|        CLO_stddev|        BEL_stddev|        MAL_stddev|
+----------+----+-----+-----------------