# NOAA : Extraction de la pluviométrie

Dataset :
- le resultat précédent
- La liste des pays


Exercice :
- Fusionner les deux fichiers pour obtenir un nouveau jeux de données avec les codes sur 3 lettres (ISO3166-1-Alpha-3)
- Le format du fichier NOAA est "FIPS"
- Attention à vérifier que vous ne laisser pas de pays de côté
- Corriger le code pays manquant
- Regarder le plan d'exécution
- Sauver

In [2]:
#!hdfs dfs -ls /stagiaire/430-Benoit/noaa/daily_rain_by_country_2

In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import shutil

In [2]:
SPARK_MASTER = 'spark://localhost:7077'
APP_NAME = 'Join Country - 430 -BCO'

noaa_csv_path = '/stagiaire/430-Benoit/noaa/daily_rain_by_country_2' 
output = '/stagiaire/430-Benoit/noaa/daily_rain_by_country_ISO3'

# Create Spark session
spark = SparkSession.builder.master(SPARK_MASTER).appName(APP_NAME).getOrCreate()
sc = spark.sparkContext

In [4]:
df = spark.read.format('csv').option('header',True).option('multiLine', True).load(noaa_csv_path)
df.show(3)

+-------+----------+---+------------------+--------+--------+------------------+
|COUNTRY|      date|cnt|          avg_prcp|sum_prcp|max_prcp|       stddev_prcp|
+-------+----------+---+------------------+--------+--------+------------------+
|     TZ|1980-05-20| 10|              24.9|   249.0|     138|  50.7114933279977|
|     BE|1980-05-19|  1|               0.0|     0.0|       0|               NaN|
|     FR|1980-05-19| 88|14.136363636363637|  1244.0|     216|36.977527135567776|
+-------+----------+---+------------------+--------+--------+------------------+
only showing top 3 rows



In [6]:
!hdfs dfs -ls /demo/noaa

Found 3 items
drwxr-xr-x   - root supergroup          0 2020-11-25 09:33 /demo/noaa/.ipynb_checkpoints
-rw-r--r--   1 root supergroup     129984 2020-11-25 09:33 /demo/noaa/country-codes.csv
drwxr-xr-x   - root supergroup          0 2020-11-25 09:33 /demo/noaa/noaa-raw-data


In [5]:
# Charger la liste
print('Load table of country code')
df_country_codes = spark.read.format('csv').option('header',True).option('multiLine', True).load('/demo/noaa/country-codes.csv')
df_country_codes.createOrReplaceTempView("country_codes")

Load table of country code


In [6]:
df_country_codes.show(2)

+----+----+-----------------+----+--------------+-----------------+----+----+----+-----------------+----+---+---+---------------------+-----------+------------------------+----------------+--------------------+---------------------+--------------------------------+---------------------+--------------------+--------------------------------+-------------------------------------+--------------------+-----------------------------+---------------------+--------------------+--------------------+----+---------------+-----------+----------------+---------------------------+--------------------+--------------------+---------------------------------------+------------------------+----------------+---------------------+----------------+----------------+-----------------------------+-------------------------------+-----------+-------------------+---------------+----------------+-----------+-------+---------+---+-----------------+----------+-----------------+------+
|FIFA|Dial|ISO3166-1-Alpha-3|MAR

In [7]:
df.show(2)

+-------+----------+---+--------+--------+--------+----------------+
|COUNTRY|      date|cnt|avg_prcp|sum_prcp|max_prcp|     stddev_prcp|
+-------+----------+---+--------+--------+--------+----------------+
|     TZ|1980-05-20| 10|    24.9|   249.0|     138|50.7114933279977|
|     BE|1980-05-19|  1|     0.0|     0.0|       0|             NaN|
+-------+----------+---+--------+--------+--------+----------------+
only showing top 2 rows



In [8]:
#     Fusionner les deux fichiers pour obtenir un nouveau jeux de données avec les codes sur 3 lettres (ISO3166-1-Alpha-3)
df.createOrReplaceTempView("noaa")
print('Join NOAA and countries')
df_full = spark.sql("""SELECT COUNTRY, country_codes.`ISO3166-1-Alpha-3` as country_ISO3, date, avg_prcp, sum_prcp, max_prcp, cnt 
                            FROM noaa LEFT JOIN country_codes ON noaa.COUNTRY=country_codes.FIPS
                            """)
df_full.show(3)

Join NOAA and countries
+-------+------------+----------+------------------+--------+--------+---+
|COUNTRY|country_ISO3|      date|          avg_prcp|sum_prcp|max_prcp|cnt|
+-------+------------+----------+------------------+--------+--------+---+
|     TZ|         TZA|1980-05-20|              24.9|   249.0|     138| 10|
|     BE|         BEL|1980-05-19|               0.0|     0.0|       0|  1|
|     FR|         FRA|1980-05-19|14.136363636363637|  1244.0|     216| 88|
+-------+------------+----------+------------------+--------+--------+---+
only showing top 3 rows



In [13]:
df_full.createOrReplaceTempView("noaa_join")
#     Attention à vérifier que vous ne laisser pas de pays de côté
spark.sql("""SELECT COUNTRY, country_ISO3
                            FROM noaa_join
                            WHERE country_ISO3 IS NULL
                            """).show(3)

+-------+------------+
|COUNTRY|country_ISO3|
+-------+------------+
|     RI|        null|
|     RI|        null|
|     RI|        null|
+-------+------------+
only showing top 3 rows



In [11]:
spark.sql("""SELECT  country_codes.`ISO3166-1-Alpha-3` as country_ISO3, country_codes.FIPS
                            FROM country_codes
                            WHERE country_codes.`ISO3166-1-Alpha-3` = 'SRB'
                            """).show(3)

+------------+-----+
|country_ISO3| FIPS|
+------------+-----+
|         SRB|RI,KV|
+------------+-----+



In [16]:
df_full.fillna({"country_ISO3":'SRB'}).show(5)

+-------+------------+----------+------------------+--------+--------+---+
|COUNTRY|country_ISO3|      date|          avg_prcp|sum_prcp|max_prcp|cnt|
+-------+------------+----------+------------------+--------+--------+---+
|     TZ|         TZA|1980-05-20|              24.9|   249.0|     138| 10|
|     BE|         BEL|1980-05-19|               0.0|     0.0|       0|  1|
|     FR|         FRA|1980-05-19|14.136363636363637|  1244.0|     216| 88|
|     RI|         SRB|1980-05-19|45.333333333333336|   136.0|     132|  3|
|     TZ|         TZA|1980-05-19|             114.0|  1140.0|     686| 10|
+-------+------------+----------+------------------+--------+--------+---+
only showing top 5 rows



In [17]:
#     Corriger le code pays manquant
df_full.withColumn("Country_ISO3-2", F.when(F.col("COUNTRY") == 'RI', 'SRB').otherwise(F.col("country_ISO3"))).show(5)
#df_full

+-------+------------+----------+------------------+--------+--------+---+--------------+
|COUNTRY|country_ISO3|      date|          avg_prcp|sum_prcp|max_prcp|cnt|Country_ISO3-2|
+-------+------------+----------+------------------+--------+--------+---+--------------+
|     TZ|         TZA|1980-05-20|              24.9|   249.0|     138| 10|           TZA|
|     BE|         BEL|1980-05-19|               0.0|     0.0|       0|  1|           BEL|
|     FR|         FRA|1980-05-19|14.136363636363637|  1244.0|     216| 88|           FRA|
|     RI|        null|1980-05-19|45.333333333333336|   136.0|     132|  3|           SRB|
|     TZ|         TZA|1980-05-19|             114.0|  1140.0|     686| 10|           TZA|
+-------+------------+----------+------------------+--------+--------+---+--------------+
only showing top 5 rows



In [18]:
#     Corriger le code pays manquant
df_full.withColumn("country_ISO3", F.when(F.col("COUNTRY") == 'RI', 'SRB').otherwise(F.col("country_ISO3"))).show(5)
#df_full

+-------+------------+----------+------------------+--------+--------+---+
|COUNTRY|country_ISO3|      date|          avg_prcp|sum_prcp|max_prcp|cnt|
+-------+------------+----------+------------------+--------+--------+---+
|     TZ|         TZA|1980-05-20|              24.9|   249.0|     138| 10|
|     BE|         BEL|1980-05-19|               0.0|     0.0|       0|  1|
|     FR|         FRA|1980-05-19|14.136363636363637|  1244.0|     216| 88|
|     RI|         SRB|1980-05-19|45.333333333333336|   136.0|     132|  3|
|     TZ|         TZA|1980-05-19|             114.0|  1140.0|     686| 10|
+-------+------------+----------+------------------+--------+--------+---+
only showing top 5 rows



In [20]:
#     Regarder le plan d'exécution
df_full.fillna({"country_ISO3":'SRB'}).explain()

== Physical Plan ==
*(2) Project [COUNTRY#50, coalesce(ISO3166-1-Alpha-3#102, SRB) AS country_ISO3#896, date#51, avg_prcp#53, sum_prcp#54, max_prcp#55, cnt#52]
+- *(2) BroadcastHashJoin [COUNTRY#50], [FIPS#107], LeftOuter, BuildRight
   :- FileScan csv [COUNTRY#50,date#51,cnt#52,avg_prcp#53,sum_prcp#54,max_prcp#55] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[hdfs://localhost/stagiaire/430-Benoit/noaa/daily_rain_by_country_2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<COUNTRY:string,date:string,cnt:string,avg_prcp:string,sum_prcp:string,max_prcp:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true])), [id=#430]
      +- *(1) Project [ISO3166-1-Alpha-3#102, FIPS#107]
         +- *(1) Filter isnotnull(FIPS#107)
            +- FileScan csv [ISO3166-1-Alpha-3#102,FIPS#107] Batched: false, DataFilters: [isnotnull(FIPS#107)], Format: CSV, Location: InMemoryFileIndex[hdfs://localhost/demo/noaa/country-codes.csv],

In [19]:
#     Corriger le code pays manquant
df_full.withColumn("Country_ISO3-2", F.when(F.col("COUNTRY") == 'RI', 'SRB').otherwise(F.col("country_ISO3"))).explain()
#df_full

== Physical Plan ==
*(2) Project [COUNTRY#50, ISO3166-1-Alpha-3#102 AS country_ISO3#529, date#51, avg_prcp#53, sum_prcp#54, max_prcp#55, cnt#52, CASE WHEN (COUNTRY#50 = RI) THEN SRB ELSE ISO3166-1-Alpha-3#102 END AS Country_ISO3-2#880]
+- *(2) BroadcastHashJoin [COUNTRY#50], [FIPS#107], LeftOuter, BuildRight
   :- FileScan csv [COUNTRY#50,date#51,cnt#52,avg_prcp#53,sum_prcp#54,max_prcp#55] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[hdfs://localhost/stagiaire/430-Benoit/noaa/daily_rain_by_country_2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<COUNTRY:string,date:string,cnt:string,avg_prcp:string,sum_prcp:string,max_prcp:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true])), [id=#398]
      +- *(1) Project [ISO3166-1-Alpha-3#102, FIPS#107]
         +- *(1) Filter isnotnull(FIPS#107)
            +- FileScan csv [ISO3166-1-Alpha-3#102,FIPS#107] Batched: false, DataFilters: [isnotnull(FIPS#107)], Format: CSV

In [21]:
#     Sauver
sqlDF = df_full.withColumn("country_ISO3", F.when(F.col("COUNTRY") == 'RI', 'SRB').otherwise(F.col("country_ISO3")))


In [22]:
sqlDF.write.csv(output, header=True)
print(sqlDF.columns)

['COUNTRY', 'country_ISO3', 'date', 'avg_prcp', 'sum_prcp', 'max_prcp', 'cnt']


In [23]:
sc.stop()