# Wind Statistics

### Introduction:

The data have been modified to contain some missing values, identified by NaN.  
Using pandas should make this exercise
easier, in particular for the bonus question.

You should be able to perform all of these operations without using
a for loop or other looping construct.


1. The data in 'wind.data' has the following format:

In [153]:
"""
Yr Mo Dy   RPT   VAL   ROS   KIL   SHA   BIR   DUB   CLA   MUL   CLO   BEL   MAL
61  1  1 15.04 14.96 13.17  9.29   NaN  9.87 13.67 10.25 10.83 12.58 18.50 15.04
61  1  2 14.71   NaN 10.83  6.50 12.62  7.67 11.50 10.04  9.79  9.67 17.54 13.83
61  1  3 18.50 16.88 12.33 10.13 11.17  6.17 11.25   NaN  8.50  7.67 12.75 12.71
"""

'\nYr Mo Dy   RPT   VAL   ROS   KIL   SHA   BIR   DUB   CLA   MUL   CLO   BEL   MAL\n61  1  1 15.04 14.96 13.17  9.29   NaN  9.87 13.67 10.25 10.83 12.58 18.50 15.04\n61  1  2 14.71   NaN 10.83  6.50 12.62  7.67 11.50 10.04  9.79  9.67 17.54 13.83\n61  1  3 18.50 16.88 12.33 10.13 11.17  6.17 11.25   NaN  8.50  7.67 12.75 12.71\n'

   The first three columns are year, month and day.  The

*   List item
*   List item


   remaining 12 columns are average windspeeds in knots at 12
   locations in Ireland on that day.   

   More information about the dataset go [here](wind.desc).

### Step 1. Import the necessary libraries

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 59 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 84.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=e31c0bca03675f258badcf40c2d30e06ef9cb837874dbe70c60d8d948bfad96b
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
from pyspark.sql import SparkSession, functions as f
from pyspark.sql.types import StringType
import pandas as pd

### Step 2. Import the dataset from this [address](https://raw.githubusercontent.com/guipsamora/pandas_exercises/master/06_Stats/Wind_Stats/wind.data)

In [3]:
!wget "https://raw.githubusercontent.com/guipsamora/pandas_exercises/master/06_Stats/Wind_Stats/wind.data"

--2022-09-15 12:29:14--  https://raw.githubusercontent.com/guipsamora/pandas_exercises/master/06_Stats/Wind_Stats/wind.data
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 532576 (520K) [text/plain]
Saving to: ‘wind.data’


2022-09-15 12:29:14 (64.4 MB/s) - ‘wind.data’ saved [532576/532576]



In [64]:
spark = SparkSession.builder.appName("Exercise62").getOrCreate()

pandas_df = pd.read_csv("wind.data", sep="\s+")

df = spark.createDataFrame(pandas_df)
df.printSchema()
df.show()

root
 |-- Yr: long (nullable = true)
 |-- Mo: long (nullable = true)
 |-- Dy: long (nullable = true)
 |-- RPT: double (nullable = true)
 |-- VAL: double (nullable = true)
 |-- ROS: double (nullable = true)
 |-- KIL: double (nullable = true)
 |-- SHA: double (nullable = true)
 |-- BIR: double (nullable = true)
 |-- DUB: double (nullable = true)
 |-- CLA: double (nullable = true)
 |-- MUL: double (nullable = true)
 |-- CLO: double (nullable = true)
 |-- BEL: double (nullable = true)
 |-- MAL: double (nullable = true)

+---+---+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
| Yr| Mo| Dy|  RPT|  VAL|  ROS|  KIL|  SHA|  BIR|  DUB|  CLA|  MUL|  CLO|  BEL|  MAL|
+---+---+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
| 61|  1|  1|15.04|14.96|13.17| 9.29|  NaN| 9.87|13.67|10.25|10.83|12.58| 18.5|15.04|
| 61|  1|  2|14.71|  NaN|10.83|  6.5|12.62| 7.67| 11.5|10.04| 9.79| 9.67|17.54|13.83|
| 61|  1|  3| 18.5|16.88|12.33|10.13|11.17| 6.17

[link text](https://)### Step 3. Assign it to a variable called data and replace the first 3 columns by a proper datetime index.

In [82]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
data = df.withColumn("date", 
                     f.to_date(
                         f.concat(f.col("Yr").cast("string"),
                                  f.lit("-"),
                                  f.col("Mo").cast("string"),
                                  f.lit("-"),
                                  f.col("Dy").cast("string")
                                  ), "y-MM-dd") )

data = data.drop("Yr","Mo","Dy")
data.show()

+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----------+
|  RPT|  VAL|  ROS|  KIL|  SHA|  BIR|  DUB|  CLA|  MUL|  CLO|  BEL|  MAL|      date|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----------+
|15.04|14.96|13.17| 9.29|  NaN| 9.87|13.67|10.25|10.83|12.58| 18.5|15.04|1961-01-01|
|14.71|  NaN|10.83|  6.5|12.62| 7.67| 11.5|10.04| 9.79| 9.67|17.54|13.83|1961-01-02|
| 18.5|16.88|12.33|10.13|11.17| 6.17|11.25|  NaN|  8.5| 7.67|12.75|12.71|1961-01-03|
|10.58| 6.63|11.75| 4.58| 4.54| 2.88| 8.63| 1.79| 5.83| 5.88| 5.46|10.88|1961-01-04|
|13.33|13.25|11.42| 6.17|10.71| 8.21|11.92| 6.54|10.92|10.34|12.92|11.83|1961-01-05|
|13.21| 8.12| 9.96| 6.67| 5.37|  4.5|10.67| 4.42| 7.17|  7.5| 8.12|13.17|1961-01-06|
| 13.5|14.29|  9.5| 4.96|12.29| 8.33| 9.17| 9.29| 7.58| 7.96|13.96|13.79|1961-01-07|
|10.96| 9.75| 7.62| 5.91| 9.62| 7.29|14.29| 7.62| 9.25|10.46|16.62|16.46|1961-01-08|
|12.58|10.83| 10.0| 4.75|10.37| 6.79| 8.04|10.13| 7.79| 9.08|13.0

### Step 4. Year 2061? Do we really have data from this year? Create a function to fix it and apply it.

### Step 5. Set the right dates as the index. Pay attention at the data type, it should be datetime64[ns].

### Step 6. Compute how many values are missing for each location over the entire record.  
#### They should be ignored in all calculations below. 

In [7]:
data.select(*map(lambda c : f.isnan(c).alias(c) ,data.columns[:-1]))\
    .select(*map(lambda c : f.sum(f.col(c).cast("int")).alias(c), data.columns[:-1]))\
    .show()

+---+---+---+---+---+---+---+---+---+---+---+---+
|RPT|VAL|ROS|KIL|SHA|BIR|DUB|CLA|MUL|CLO|BEL|MAL|
+---+---+---+---+---+---+---+---+---+---+---+---+
|  6|  3|  2|  5|  2|  0|  3|  2|  3|  1|  0|  4|
+---+---+---+---+---+---+---+---+---+---+---+---+



### Step 7. Compute how many non-missing values there are in total.

In [8]:
data.select(*map(lambda c : (~f.isnan(c)).alias(c) ,data.columns[:-1]))\
    .select(*map(lambda c : f.sum(f.col(c).cast("int")).alias(c), data.columns[:-1]))\
    .show()

+----+----+----+----+----+----+----+----+----+----+----+----+
| RPT| VAL| ROS| KIL| SHA| BIR| DUB| CLA| MUL| CLO| BEL| MAL|
+----+----+----+----+----+----+----+----+----+----+----+----+
|6568|6571|6572|6569|6572|6574|6571|6572|6571|6573|6574|6570|
+----+----+----+----+----+----+----+----+----+----+----+----+



Converts NaN to null

In [89]:
data = data.select(*map(lambda c: f.when(~f.isnan(c), f.col(c)).otherwise(None).alias(c),data.columns[:-1]), data[-1])
data.show()

+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----------+
|  RPT|  VAL|  ROS|  KIL|  SHA|  BIR|  DUB|  CLA|  MUL|  CLO|  BEL|  MAL|      date|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----------+
|15.04|14.96|13.17| 9.29| null| 9.87|13.67|10.25|10.83|12.58| 18.5|15.04|1961-01-01|
|14.71| null|10.83|  6.5|12.62| 7.67| 11.5|10.04| 9.79| 9.67|17.54|13.83|1961-01-02|
| 18.5|16.88|12.33|10.13|11.17| 6.17|11.25| null|  8.5| 7.67|12.75|12.71|1961-01-03|
|10.58| 6.63|11.75| 4.58| 4.54| 2.88| 8.63| 1.79| 5.83| 5.88| 5.46|10.88|1961-01-04|
|13.33|13.25|11.42| 6.17|10.71| 8.21|11.92| 6.54|10.92|10.34|12.92|11.83|1961-01-05|
|13.21| 8.12| 9.96| 6.67| 5.37|  4.5|10.67| 4.42| 7.17|  7.5| 8.12|13.17|1961-01-06|
| 13.5|14.29|  9.5| 4.96|12.29| 8.33| 9.17| 9.29| 7.58| 7.96|13.96|13.79|1961-01-07|
|10.96| 9.75| 7.62| 5.91| 9.62| 7.29|14.29| 7.62| 9.25|10.46|16.62|16.46|1961-01-08|
|12.58|10.83| 10.0| 4.75|10.37| 6.79| 8.04|10.13| 7.79| 9.08|13.0

### Step 8. Calculate the mean windspeeds of the windspeeds over all the locations and all the times.
#### A single number for the entire dataset.

In [10]:
data.select(*map(f.mean,data.columns[:-1]))\
    .show()

+-----------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+---------------+----------------+-----------------+------------------+------------------+
|         avg(RPT)|          avg(VAL)|          avg(ROS)|         avg(KIL)|          avg(SHA)|         avg(BIR)|         avg(DUB)|       avg(CLA)|        avg(MUL)|         avg(CLO)|          avg(BEL)|          avg(MAL)|
+-----------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+---------------+----------------+-----------------+------------------+------------------+
|12.36298721071864|10.644314411809459|11.660526475958603|6.306468260009125|10.455833840535595|7.092254335260125|9.797342870187192|8.4950532562386|8.49359001674023|8.707331507682957|13.121006997261933|15.599079147640811|
+-----------------+------------------+------------------+-----------------+------------------+-----------------+--------

### Step 9. Create a DataFrame called loc_stats and calculate the min, max and mean windspeeds and standard deviations of the windspeeds at each location over all the days

> Indented block



#### A different set of numbers for each location.

In [11]:
loc_stats = data.describe()
loc_stats.show()

+-------+-----------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+
|summary|              RPT|               VAL|               ROS|              KIL|               SHA|              BIR|              DUB|              CLA|              MUL|              CLO|               BEL|               MAL|
+-------+-----------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+
|  count|             6568|              6571|              6572|             6569|              6572|             6574|             6571|             6572|             6571|             6573|              6574|              6570|
|   mean|12.36298721071864|10.644314411809459|11.660526475958603|6.306468260

### Step 10. Create a DataFrame called day_stats and calculate the min, max and mean windspeed and standard deviations of the windspeeds across all the locations at each day.

#### A different set of numbers for each day.

In [20]:
def null_to_zero(*columns):
  return [(f.when(~f.isnull(c), f.col(c)).otherwise(0)) for c in columns]

def row_mean(*columns):
  N = len(columns)
  columns = null_to_zero(*columns)
  return  sum(columns) / N

def row_stddev(*columns):
  N = len(columns)
  mu = row_mean(*columns) 
  return f.sqrt((1 / N) * sum(f.pow(col - mu, 2) for col in null_to_zero(*columns)))

day_stats = data.select(f.least(*data.columns[:-1]).alias("min"),
                        f.greatest(*data.columns[:-1]).alias("max"),
                        row_mean(*data.columns[:-1]).alias("mean"),
                        row_stddev(*data.columns[:-1]).alias("stddev"),
                        data.columns[-1])\
                        .show()

#day_stats = data.groupby("date").pivot("date").min().show() <- pivot is too expensive

+----+-----+------------------+------------------+----------+
| min|  max|              mean|            stddev|      date|
+----+-----+------------------+------------------+----------+
|9.29| 18.5|11.933333333333332| 4.418220481395448|1961-01-01|
| 6.5|17.54|10.391666666666667| 4.276881327426433|1961-01-02|
|6.17| 18.5|10.671666666666667| 4.652977242822301|1961-01-03|
|1.79|11.75| 6.619166666666666|  3.06197229013516|1961-01-04|
|6.17|13.33|             10.63|2.3412496663107074|1961-01-05|
|4.42|13.21|              8.24|2.8704267975337747|1961-01-06|
|4.96|14.29|10.384999999999998|2.9413248148864257|1961-01-07|
|5.91|16.62|10.487500000000002|  3.39622124573768|1961-01-08|
|4.75|15.37|            9.8975| 2.782238681949004|1961-01-09|
|6.54| 19.5|           10.4775|  3.29604845484205|1961-01-10|
|2.79|20.71| 9.624999999999998| 4.600515369680518|1961-01-11|
|9.46|19.75|13.524166666666668|3.2313605450680094|1961-01-12|
|0.58| 9.92|            4.2775|2.7921918660197163|1961-01-13|
| 0.5| 9

### Step 11. Find the average windspeed in January for each location.  
#### Treat January 1961 and January 1962 both as January.

In [90]:
data.withColumn("month", f.month("date")).groupby("month").mean().filter(f.col("month")==1).show()


+-----+------------------+------------------+------------------+-----------------+-----------------+----------------+------------------+----------------+-----------------+------------------+-----------------+------------------+----------+
|month|          avg(RPT)|          avg(VAL)|          avg(ROS)|         avg(KIL)|         avg(SHA)|        avg(BIR)|          avg(DUB)|        avg(CLA)|         avg(MUL)|          avg(CLO)|         avg(BEL)|          avg(MAL)|avg(month)|
+-----+------------------+------------------+------------------+-----------------+-----------------+----------------+------------------+----------------+-----------------+------------------+-----------------+------------------+----------+
|    1|14.847324955116695|12.914560143626572|13.299623655913978|7.199498207885307|11.66773381294964|8.05483870967742|11.819354838709673|9.51204667863555|9.543207885304664|10.053566308243726|14.55051971326165|18.028763440860207|       1.0|
+-----+------------------+------------------

### Step 12. Downsample the record to a yearly frequency for each location.

In [85]:
data.withColumn("year", f.year("date")).groupby("year").mean().show()

+----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+---------+
|year|          avg(RPT)|          avg(VAL)|          avg(ROS)|          avg(KIL)|          avg(SHA)|          avg(BIR)|          avg(DUB)|          avg(CLA)|         avg(MUL)|          avg(CLO)|          avg(BEL)|          avg(MAL)|avg(year)|
+----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+---------+
|1961|               NaN|               NaN|               NaN|               NaN|               NaN| 7.729726027397262|               NaN|               NaN|              NaN|               NaN| 13.50279452054795|               NaN|   1961.0|
|1968|11.835628415300544

### Step 13. Downsample the record to a monthly frequency for each location.

In [91]:
data.withColumn("month", f.month("date")).groupby("month").mean().show()

+-----+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+----------+
|month|          avg(RPT)|          avg(VAL)|          avg(ROS)|          avg(KIL)|          avg(SHA)|         avg(BIR)|          avg(DUB)|         avg(CLA)|          avg(MUL)|          avg(CLO)|          avg(BEL)|          avg(MAL)|avg(month)|
+-----+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+----------+
|   12|14.446397849462366|12.353602150537636|13.212275985663084| 6.829910394265227|11.301254480286735|7.963709677419357|11.849050179211474|9.209354838709684| 9.447258064516129| 9.627670250896056|14.259516129032258|18.697598566308233|      12.0|
|    1|14.8473249551

### Step 14. Downsample the record to a weekly frequency for each location.

In [95]:
data.withColumn("week-year", f.concat(f.weekofyear("date"),f.lit("-") , f.year("date"))).groupby("week-year").mean().show()

+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|week-year|          avg(RPT)|          avg(VAL)|          avg(ROS)|          avg(KIL)|          avg(SHA)|          avg(BIR)|          avg(DUB)|          avg(CLA)|          avg(MUL)|          avg(CLO)|          avg(BEL)|          avg(MAL)|
+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  33-1966|7.1657142857142855| 8.677142857142856| 9.314285714285715| 4.142857142857143| 6.531428571428571| 3.795714285714286| 5.667142857142857| 5.654285714285714| 4.602857142857142| 5.404285714285715|12.302857142857144|10.285714285714286|
|  27-1967|11.334285714285713|10.2328571

### Step 15. Calculate the min, max and mean windspeeds and standard deviations of the windspeeds across all locations for each week (assume that the first week starts on January 2 1961) for the first 52 weeks.

In [126]:
data.withColumn("week-year", f.concat(f.weekofyear("date"),f.lit("-") , f.year("date"))).groupby("week-year")\
  .agg( *[fun(col) for fun in [f.min, f.max, f.mean, f.stddev] for col in data.columns[:-1]] ).show()

+---------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|week-year|min(RPT)|min(VAL)|min(ROS)|min(KIL)|min(SHA)|min(BIR)|min(DUB)|min(CLA)|min(MUL)|min(CLO)|min(BEL)|min(MAL)|max(RPT)|max(VAL)|max(ROS)|max(KIL)|max(SHA)|max(BIR)|max(DUB)|max(CLA)|max(MUL)|max(CLO)|max(BEL)|max(MAL)|          avg(RPT)|          avg(VAL)|          avg(ROS)|          avg(KIL)|          avg