# Graded Lab : Exploring USA weather data

In this lab you will explore weather data from National Oceanic and Atmospheric Administration (NOAA).

>If your are curious, they are avalable here https://noaa-isd-pds.s3.amazonaws.com/index.html, or directly in FTP access here : ftp.ncdc.noaa.gov. You can find weather data from 1901 to today.

Raw data are stored in ISD (Integrated Surface Data) format. It's a strange format, with a mandatory section with positional fields, and a additional section with variable fields. For this lab, the dataset has been transform into json. But no other processing had been done. For exemple, missing temperature are not filtered and are coded with 999.9. The lab's dataset contains more than 100 years of weather data. Its total size is about 4Go once compressed (and around 40Go uncrompressed).

For instance, here is an exemple of a reccord

```js
{
   "weather_station":"010040",
   "latitude":78.933,
   "longitude":11.883,
   "elevation":42,
   "time":"1975-03-04T18:00:00+00:00",
   "air_temperature":{
      "value":-24.0,
      "quality":"1"
   },
   "dew_point":{
      "value":-27.0,
      "quality":"1"
   },
   "wind_speed":{
      "value":1.0,
      "quality":"1"
   },
   "wind_direction":{
      "value":"160",
      "quality":"1"
   },
   "sea_level_pressure":{
      "value":1002.1,
      "quality":""
   },
   "sky_ceiling":{
      "value":22000,
      "quality":"1"
   },
   "visibility_distance":{
      "value":50000,
      "quality":"1"
   },
   "liquid_precip":[
      {
         "hours":99,
         "depth":0.0
      }
   ],
  "sky_cover_condition":[
      {
         "base_height":50000,
         "cloud_type":"Cirrus and/or Cirrocumulus",
		 "coverage":8
      }
   ],
   "extreme_temperature":[
      {
         "hours":999,
         "code":"M",
         "temperature":{
            "value":-23.0,
            "quality":"1"
         }
      }
   ]
}
```

## Cluster setup

Is spark running ? You can start the lab once you get a message like `SparkSession available as 'spark'.`

In [1]:
# Spark session
spark

# Configuraion
spark._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1649920197808_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7efef21d6750>

Usefull import for the lab. You can import more functions if you want

In [2]:
from pyspark.sql.window import Window
from pyspark.sql.functions import count, min, max, mean, exp, first, from_json, window, col, expr, year, month, explode, sum, row_number, avg, abs
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType, TimestampType, BooleanType, LongType, DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data schema

In [9]:
schema = StructType([
    StructField("air_temperature",StructType([
        StructField("quality",StringType(),True)
        ,StructField("value",DoubleType(),True)]),True)
    ,StructField("dew_point",StructType([
        StructField("quality",StringType(),True)
        ,StructField("value",DoubleType(),True)]),True)
    ,StructField("wind_speed",StructType([
        StructField("quality",StringType(),True)
        ,StructField("value",DoubleType(),True)]),True)
    ,StructField("elevation",LongType(),True)
    ,StructField("extreme_temperature",ArrayType(StructType([
        StructField("code",StringType(),True)
        ,StructField("hours",LongType(),True)
        ,StructField("temperature",StructType([
            StructField("quality",StringType(),True)
            ,StructField("value",DoubleType(),True)]),True)]),True),True)
    ,StructField("latitude",DoubleType(),True)
    ,StructField("liquid_precip",ArrayType(StructType([
        StructField("depth",StringType(),True)
        ,StructField("hours",LongType(),True)]),True),True)
    ,StructField("longitude",DoubleType(),True)
    ,StructField("sea_level_pressure",StructType([
        StructField("quality",StringType(),True),
        StructField("value",DoubleType(),True)]),True)
    ,StructField("sky_ceiling",StructType([
        StructField("quality",StringType(),True)
        ,StructField("value",LongType(),True)]),True)
    ,StructField("sky_cover_condition",ArrayType(StructType([
        StructField("base_height",LongType(),True)
        ,StructField("cloud_type",StringType(),True)
        ,StructField("coverage",LongType(),True)]),True),True)
    ,StructField("time",TimestampType(),True)
    ,StructField("visibility_distance",StructType([
        StructField("quality",StringType(),True)
        ,StructField("value",LongType(),True)]),True)
    ,StructField("weather_station",StringType(),True)
    ,StructField("wind_direction",StructType([
        StructField("quality",StringType(),True),
        StructField("value",StringType(),True)]),True)])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Instructions

1. Import the dataset from `s3://spark-lab-input-data-ensai20212022/weather_data/` with the provided schema under the name `meteo`.

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2. Print the dataframe schema.What type of variable are `elevation` and `time` ?

3. Print the first 9 rows. You can use `vertical=True` to get a prettier result.

4. Count how many different stations there are with the `countDistinct()` method. What is the interest of `approx_count_distinct()`? (You will demonstrate this interest by timing three instructions: one for `countDistinct()` and two for `approx_count_distinct()` with different values of `rsd`. See the [documentation](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.approx_count_distinct.html) for more information.)

5. The dataset is way too big to work on it as is (it would take minutes to run each instruction). Create a new dataset called `meteo_small` containing a sample at a rate of a 500th.



6. Print the station at the highest altitude (`elevation`). The one at the lowest.
Cautious : an elevation equals to 9999 means "missing value". Do not count a 9999 has a real elevation.

7. Count how many stations have an altitude lower then 500m. Do the same with the stations higher than 1000m.

8. Filter missing and bad-quality air temperatures out of `meteo_small`.  You will make sure Spark actually performs the computation and stores the dataset `meteo_small` permanently. (Indeed, we will use `meteo_small`  again and again in the rest of the tutorial.) How many observations are deleted ?

- Missing temperature are given the number `999.9`
- We consider that good quality is encoded by 0, 1, 4, 5 or 9.

9. On `meteo_small`, compute the **number of records** and the **average temperature** by year. Order you results by temperature.

10. Compute the min, mean, max temperature, and count for each possible combinations of year and station. Your output should be like this :

| weather_station | year | temp_min | temp_max | temp_mean | records_count |
| --------------- | ---- | -------- | -------- | ---------- | -------------- |
| 036830          | 1992 | a        | b        | c          | d              |
| 033730          | null | e        | f        | g          | h              |
| 010010          | 1992 | i        | j        | k          | l              |
| null            | 1992 | m        | n        | o          | p              |
| 061000          | 1991 | q        | r        | s          | t              |

A `null` value means this dimension isn't use for this row. For instance the row 2 gives the min, max, mean and record count for the station 033730, and the row 4 gives the min, max, mean and record count for the year 1992.
Some points will be given if you can do the all the combinations of year and station without the null value.

11.   Add a column to `meteo_small` called `approximate_partial_humidity` computed with this formula
$$RH_{approx} = 100 - 5 (T_{air} -T_{DP}) $$

With $T_{DP}$ the dew point temperature and $T_{air}$ the air temperature.
Cautious : some dew point temperature can be missing or of bad quality. The rules to filter those value are the same as air_temperature 

12. `sky_cover_condition` is of type array. How would you do to obtain a copy of the line for each item in the array? On `meteo_small` compute how many time each `cloud_type` appears. 

13. On `meteo_small`, compute the maximum, minimum, first and third quartile of recorded **wind speed** in year 1992 with `summary()`. If year 1992 is empty on your sample, please chose an other date. Again, to get the year from the `time` variable, you can use `year("time")`. Is the computation exact? Why is it interesting to use `summary()` instead of running one different instructions for each statistic?

14. Briefly explain why computing a maximum is well-suited to the "reduce" step of map-and-reduce algorithm.

15. For this question, you will do a linear regression on `meteo_small` giving the **air temperature** as function of **year**, **elevation**, **longitude**, **dew_point** and **sky_ceiling**.

    - Filter to keep only the good value of temperature and pressure (9999.9 = missing pressure)
    - Create `VectorAssembler` and `LinearRegression` objects
    - Apply these objects to our data
    - Explore the results (coefficients, p-values)

    Is **year** a good predictor?

16. What is the difference between the `fit()` and `transform()` methods?

17. Repeat the regression for a few different years. You will use a `Pipeline` object.

18. Does the regression fall in the category of "embarassingly parallel problems"? (Explain) Is it an "inherently sequential problem" ? (Explain) Make sure you give examples of each category.