# Wind Statistics

### Introduction:

The data have been modified to contain some missing values, identified by NaN.  
Using pandas should make this exercise
easier, in particular for the bonus question.

You should be able to perform all of these operations without using
a for loop or other looping construct.


1. The data in 'wind.data' has the following format:

In [None]:
"""
Yr Mo Dy   RPT   VAL   ROS   KIL   SHA   BIR   DUB   CLA   MUL   CLO   BEL   MAL
61  1  1 15.04 14.96 13.17  9.29   NaN  9.87 13.67 10.25 10.83 12.58 18.50 15.04
61  1  2 14.71   NaN 10.83  6.50 12.62  7.67 11.50 10.04  9.79  9.67 17.54 13.83
61  1  3 18.50 16.88 12.33 10.13 11.17  6.17 11.25   NaN  8.50  7.67 12.75 12.71
"""

'\nYr Mo Dy   RPT   VAL   ROS   KIL   SHA   BIR   DUB   CLA   MUL   CLO   BEL   MAL\n61  1  1 15.04 14.96 13.17  9.29   NaN  9.87 13.67 10.25 10.83 12.58 18.50 15.04\n61  1  2 14.71   NaN 10.83  6.50 12.62  7.67 11.50 10.04  9.79  9.67 17.54 13.83\n61  1  3 18.50 16.88 12.33 10.13 11.17  6.17 11.25   NaN  8.50  7.67 12.75 12.71\n'

   The first three columns are year, month and day.  The
   remaining 12 columns are average windspeeds in knots at 12
   locations in Ireland on that day.   

   More information about the dataset go [here](wind.desc).

### Step 1. Import the necessary libraries

In [2]:
!pip install pyspark



In [3]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import expr, col, mean, when, sum, count, desc, min, max
spark = SparkSession.builder.master("local[*]").getOrCreate()

### Step 2. Import the dataset from this [address](https://raw.githubusercontent.com/guipsamora/pandas_exercises/master/06_Stats/Wind_Stats/wind.data)

In [3]:
!wget -O wind.data https://raw.githubusercontent.com/guipsamora/pandas_exercises/master/06_Stats/Wind_Stats/wind.data

--2024-04-12 12:01:10--  https://raw.githubusercontent.com/guipsamora/pandas_exercises/master/06_Stats/Wind_Stats/wind.data
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 532576 (520K) [text/plain]
Saving to: ‘wind.data’


2024-04-12 12:01:11 (13.6 MB/s) - ‘wind.data’ saved [532576/532576]



### Step 3. Assign it to a variable called data and replace the first 3 columns by a proper datetime index.

In [17]:
data = spark.read.csv('wind.data', sep="   ", header=True, inferSchema=True)
data.show()

+--------------------+--------------------+----+----+----+----+----+----+----+----+----+----+----+
|            Yr Mo Dy|                 RPT| VAL| ROS| KIL| SHA| BIR| DUB| CLA| MUL| CLO| BEL| MAL|
+--------------------+--------------------+----+----+----+----+----+----+----+----+----+----+----+
|61  1  1 15.04 14...|NaN  9.87 13.67 1...|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|      61  1  2 14.71|NaN 10.83  6.50 1...|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|61  1  3 18.50 16...|NaN  8.50  7.67 1...|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|61  1  4 10.58  6...|                NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|61  1  5 13.33 13...|                NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|61  1  6 13.21  8...|                NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|61  1  7 13.50 14...|                NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|61  1  8 

In [18]:
wind_data = spark.read.text("wind.data")

In [20]:
from pyspark.sql.functions import split

split_data = wind_data.select(split(wind_data.value, '\s+').alias('data'))
split_data.show()

+--------------------+
|                data|
+--------------------+
|                  []|
|[Yr, Mo, Dy, RPT,...|
|[61, 1, 1, 15.04,...|
|[61, 1, 2, 14.71,...|
|[61, 1, 3, 18.50,...|
|[61, 1, 4, 10.58,...|
|[61, 1, 5, 13.33,...|
|[61, 1, 6, 13.21,...|
|[61, 1, 7, 13.50,...|
|[61, 1, 8, 10.96,...|
|[61, 1, 9, 12.58,...|
|[61, 1, 10, 13.37...|
|[61, 1, 11, 10.58...|
|[61, 1, 12, 19.75...|
|[61, 1, 13, 9.92,...|
|[61, 1, 14, 9.04,...|
|[61, 1, 15, 12.04...|
|[61, 1, 16, 16.42...|
|[61, 1, 17, 17.75...|
|[61, 1, 18, 19.83...|
+--------------------+
only showing top 20 rows



In [25]:
split_data.data

Column<'data'>

In [28]:
@F.udf("string to row", F.ArrayType(F.StringType()))
def string_to_row(data):
    # Extract date (Yr, Mo, Dy) and wind data for each location

    date = F.to_date(F.concat(year, F.lit('-'), month, F.lit('-'), day), 'yyyy-MM-dd')
    wind_data = data[3:]
    return [date] + wind_data


ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'row': extra input 'row'.(line 1, pos 10)

== SQL ==
string to row
----------^^^


Column<'substring(data, 0, 3)'>

In [22]:


row_data = split_data.select(string_to_row(split_data.data).alias("row"))
row_data

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'row': extra input 'row'.(line 1, pos 10)

== SQL ==
string to row
----------^^^


In [15]:
df.show()

+---+----+---+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
| Yr|  Mo| Dy| RPT|  VAL|  ROS|  KIL|  SHA|  BIR|  DUB|  CLA|  MUL|  CLO|  BEL|  MAL|
+---+----+---+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
| 61|NULL|  1|NULL|  1.0|15.04|14.96|13.17| NULL| 9.29| NULL| NULL|  NaN| NULL| 9.87|
| 61|NULL|  1|NULL|  2.0|14.71| NULL| NULL|  NaN|10.83| NULL|  6.5|12.62| NULL| 7.67|
| 61|NULL|  1|NULL|  3.0| 18.5|16.88|12.33|10.13|11.17| NULL| 6.17|11.25| NULL| NULL|
| 61|NULL|  1|NULL|  4.0|10.58| NULL| 6.63|11.75| NULL| 4.58| NULL| 4.54| NULL| 2.88|
| 61|NULL|  1|NULL|  5.0|13.33|13.25|11.42| NULL| 6.17|10.71| NULL| 8.21|11.92| NULL|
| 61|NULL|  1|NULL|  6.0|13.21| NULL| 8.12| NULL| 9.96| NULL| 6.67| NULL| 5.37| NULL|
| 61|NULL|  1|NULL|  7.0| 13.5|14.29| NULL|  9.5| NULL| 4.96|12.29| NULL| 8.33| NULL|
| 61|NULL|  1|NULL|  8.0|10.96| NULL| 9.75| NULL| 7.62| NULL| 5.91| NULL| 9.62| NULL|
| 61|NULL|  1|NULL|  9.0|12.58|10.83| 10.0| NULL| 4.75

In [6]:
data = data.withColumn("date", F.to_date(F.concat_ws('-', col('Yr'), data.Mo,data.Dy)))

AttributeError: 'DataFrame' object has no attribute 'Yr'

### Step 4. Year 2061? Do we really have data from this year? Create a function to fix it and apply it.

### Step 5. Set the right dates as the index. Pay attention at the data type, it should be datetime64[ns].

### Step 6. Compute how many values are missing for each location over the entire record.  
#### They should be ignored in all calculations below.

### Step 7. Compute how many non-missing values there are in total.

### Step 8. Calculate the mean windspeeds of the windspeeds over all the locations and all the times.
#### A single number for the entire dataset.

### Step 9. Create a DataFrame called loc_stats and calculate the min, max and mean windspeeds and standard deviations of the windspeeds at each location over all the days

#### A different set of numbers for each location.

### Step 10. Create a DataFrame called day_stats and calculate the min, max and mean windspeed and standard deviations of the windspeeds across all the locations at each day.

#### A different set of numbers for each day.

### Step 11. Find the average windspeed in January for each location.  
#### Treat January 1961 and January 1962 both as January.

### Step 12. Downsample the record to a yearly frequency for each location.

### Step 13. Downsample the record to a monthly frequency for each location.

### Step 14. Downsample the record to a weekly frequency for each location.

### Step 15. Calculate the min, max and mean windspeeds and standard deviations of the windspeeds across all locations for each week (assume that the first week starts on January 2 1961) for the first 52 weeks.