In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, hour, mean, month, year, to_date
from pyspark.sql.window import Window
from tqdm.notebook import tqdm

In [2]:
#!pip install tqdm

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [5]:
spark

# Schema de données

In [6]:
schema = StructType() \
    .add("STATION", IntegerType(), False) \
    .add("DATE", TimestampType(), False) \
    .add("SOURCE", IntegerType(), True) \
    .add("LATITUDE", FloatType(), True) \
    .add("LONGITUDE", FloatType(), True) \
    .add("ELEVATION", StringType(), True) \
    .add("NAME", StringType(), True) \
    .add("REPORT_TYPE", StringType(), True) \
    .add("CALL_SIGN", StringType(), True) \
    .add("QUALITY_CONTROL", StringType(), True) \
    .add("WND", StringType(), True) \
    .add("CIG", StringType(), True) \
    .add("VIS", StringType(), True) \
    .add("TMP", StringType(), True) \
    .add("DEW", StringType(), True) \
    .add("SLP", StringType(), True) \
    .add("GA1", StringType(), True) \
    .add("GA2", StringType(), True) \
    .add("GA3", StringType(), True) \
    .add("GA4", StringType(), True) \
    .add("GF1", StringType(), True) \
    .add("MA1", StringType(), True) \
    .add("MW1", StringType(), True) \
    .add("MW2", StringType(), True) \
    .add("MW3", StringType(), True) \
    .add("OC1", StringType(), True) \
    .add("REM", StringType(), True) \
    .add("EQD", StringType(), True)

# Chargement des données

In [7]:
# station_2018 = spark.read.load("./data/2018", format="csv", header=True, schema=schema, inferSchema=False)
# station_2018.show()

In [8]:
cols_of_interest = ("STATION","DATE","SOURCE","LATITUDE","LONGITUDE","ELEVATION","NAME","REPORT_TYPE","CALL_SIGN","QUALITY_CONTROL","WND","CIG","VIS","TMP","DEW","SLP")

all_stations = spark.read.load("./data/*", format="csv", header=True, schema=schema, inferSchema=False).select(*cols_of_interest)

In [9]:
all_stations

STATION,DATE,SOURCE,LATITUDE,LONGITUDE,ELEVATION,NAME,REPORT_TYPE,CALL_SIGN,QUALITY_CONTROL,WND,CIG,VIS,TMP,DEW,SLP
826099999,2008-01-01 00:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"280,1,N,0062,1","22000,1,9,N","024140,1,N,1",201,-401,101701
826099999,2008-01-01 00:53:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KCCR,V020,"100,1,N,0021,1","22000,1,9,N","016093,1,N,1",99999,-301,102311
826099999,2008-01-01 00:53:00,4,0.0,0.0,0.0,WXPOD8270,FM-16,KDAL,V020,"360,1,N,0057,1","22000,1,9,N","016093,1,N,1",99999,99999,999999
826099999,2008-01-01 01:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"270,1,N,0046,1","22000,1,9,N","024140,1,N,1",101,-401,101811
826099999,2008-01-01 01:53:00,4,0.0,0.0,0.0,WXPOD8270,AUTO,KCGI,V020,"300,1,N,0067,1","22000,1,9,N","016093,1,N,1",301,-401,999999
826099999,2008-01-01 01:53:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KCCR,V020,"100,1,N,0031,1","22000,1,9,N","016093,1,N,1",99999,-301,102291
826099999,2008-01-01 02:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"270,1,N,0051,1","22000,1,9,N","024140,1,N,1",101,-401,101921
826099999,2008-01-01 02:53:00,4,0.0,0.0,0.0,WXPOD8270,AUTO,KCGI,V020,"280,1,N,0041,1","22000,1,9,N","016093,1,N,1",201,-401,101901
826099999,2008-01-01 02:53:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KCCR,V020,"080,1,N,0041,1","22000,1,9,N","016093,1,N,1",99999,-301,102311
826099999,2008-01-01 03:00:00,4,0.0,0.0,0.0,WXPOD8270,SY-MT,KFMH,V020,"300,1,N,0041,1","22000,1,9,N","016000,1,N,1",171,-441,102031


In [10]:
# Nombre de lignes
all_stations.count()

9573561

## Supprimer les lignes du champ TMP avec des valeurs vides OU des +9999

In [31]:
all_stations = all_stations.na.drop(how="any", subset=["TMP"]).filter(~all_stations.TMP.contains("+9999"))

## Supprimer les lignes du champ ELEVATION avec des valeurs vides OU des +9999

In [32]:
all_stations = all_stations.na.drop(how="any", subset=["ELEVATION"]).filter(~all_stations.ELEVATION.contains("+9999"))

## Supprimer les lignes du champ DEW avec des valeurs vides OU des +9999

In [33]:
all_stations = all_stations.na.drop(how="any", subset=["DEW"]).filter(~all_stations.DEW.contains("+9999"))

## Supprimer les lignes du champ WND avec des valeurs vides OU des 9999

In [65]:
all_stations = all_stations.na.drop(how="any", subset=["WND"]).filter(~all_stations.WND.contains("9999"))

## Supprimer les lignes du champ CIG avec des valeurs vides OU des 99999

In [66]:
all_stations = all_stations.na.drop(how="any", subset=["CIG"]).filter(~all_stations.CIG.contains("99999"))

In [67]:
# Nombre de lignes après le drop
all_stations.count()

5344143

In [68]:
# Le schema de données
all_stations.printSchema()

root
 |-- STATION: integer (nullable = true)
 |-- DATE: timestamp (nullable = true)
 |-- SOURCE: integer (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- CALL_SIGN: string (nullable = true)
 |-- QUALITY_CONTROL: string (nullable = true)
 |-- WND: string (nullable = true)
 |-- CIG: string (nullable = true)
 |-- VIS: string (nullable = true)
 |-- TMP: string (nullable = true)
 |-- DEW: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- temperature: float (nullable = true)
 |-- precipitation: float (nullable = true)
 |-- speed_rate: float (nullable = true)
 |-- ceiling_height: integer (nullable = true)
 |-- season: string (nullable = true)



## Split de la colonne température

In [69]:
@udf(returnType=FloatType())
def extract_tmp(tmp_col: str):
    return int(tmp_col.split(',')[0].lstrip('+')) / 10

all_stations = all_stations.withColumn('temperature', extract_tmp(all_stations['TMP']))

## Split de la colonne DEW

In [70]:
@udf(returnType=FloatType())
def extract_dew(dew_col: str):
    return int(dew_col.split(',')[0].lstrip('+')) / 10

all_stations = all_stations.withColumn('precipitation', extract_dew(all_stations['DEW']))

## Split de la colonne WND

In [71]:
@udf(returnType=FloatType())
def extract_wnd(wnd_col: str):
    return int(wnd_col.split(',')[3].lstrip('+')) / 10

all_stations = all_stations.withColumn('speed_rate', extract_wnd(all_stations['WND']))

## Split de la colonne CIG

In [72]:
@udf(returnType=IntegerType())
def extract_cig(cig_col: str):
    return int(cig_col.split(',')[0])

all_stations = all_stations.withColumn('ceiling_height', extract_cig(all_stations['CIG']))

In [73]:
all_stations.printSchema()

root
 |-- STATION: integer (nullable = true)
 |-- DATE: timestamp (nullable = true)
 |-- SOURCE: integer (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- CALL_SIGN: string (nullable = true)
 |-- QUALITY_CONTROL: string (nullable = true)
 |-- WND: string (nullable = true)
 |-- CIG: string (nullable = true)
 |-- VIS: string (nullable = true)
 |-- TMP: string (nullable = true)
 |-- DEW: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- temperature: float (nullable = true)
 |-- precipitation: float (nullable = true)
 |-- speed_rate: float (nullable = true)
 |-- ceiling_height: integer (nullable = true)
 |-- season: string (nullable = true)



## Création du champ season

In [74]:
@udf(returnType=StringType())
def create_season(month: int):
    if month in [7, 8, 9]:
        season = 'Summer'
    elif month in [10, 11, 12]:
        season = 'Autumn'
    elif month in [1, 2, 3]:
        season = 'Winter'
    else:
        season = 'Spring'
    return season

all_stations = all_stations.withColumn('season', create_season(month("DATE")))

In [75]:
all_stations.printSchema()

root
 |-- STATION: integer (nullable = true)
 |-- DATE: timestamp (nullable = true)
 |-- SOURCE: integer (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- CALL_SIGN: string (nullable = true)
 |-- QUALITY_CONTROL: string (nullable = true)
 |-- WND: string (nullable = true)
 |-- CIG: string (nullable = true)
 |-- VIS: string (nullable = true)
 |-- TMP: string (nullable = true)
 |-- DEW: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- temperature: float (nullable = true)
 |-- precipitation: float (nullable = true)
 |-- speed_rate: float (nullable = true)
 |-- ceiling_height: integer (nullable = true)
 |-- season: string (nullable = true)



In [76]:
all_stations

STATION,DATE,SOURCE,LATITUDE,LONGITUDE,ELEVATION,NAME,REPORT_TYPE,CALL_SIGN,QUALITY_CONTROL,WND,CIG,VIS,TMP,DEW,SLP,temperature,precipitation,speed_rate,ceiling_height,season
826099999,2008-01-01 00:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"280,1,N,0062,1","22000,1,9,N","024140,1,N,1",201,-401,101701,2.0,-4.0,6.2,22000,Winter
826099999,2008-01-01 01:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"270,1,N,0046,1","22000,1,9,N","024140,1,N,1",101,-401,101811,1.0,-4.0,4.6,22000,Winter
826099999,2008-01-01 01:53:00,4,0.0,0.0,0.0,WXPOD8270,AUTO,KCGI,V020,"300,1,N,0067,1","22000,1,9,N","016093,1,N,1",301,-401,999999,3.0,-4.0,6.7,22000,Winter
826099999,2008-01-01 02:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"270,1,N,0051,1","22000,1,9,N","024140,1,N,1",101,-401,101921,1.0,-4.0,5.1,22000,Winter
826099999,2008-01-01 02:53:00,4,0.0,0.0,0.0,WXPOD8270,AUTO,KCGI,V020,"280,1,N,0041,1","22000,1,9,N","016093,1,N,1",201,-401,101901,2.0,-4.0,4.1,22000,Winter
826099999,2008-01-01 03:00:00,4,0.0,0.0,0.0,WXPOD8270,SY-MT,KFMH,V020,"300,1,N,0041,1","22000,1,9,N","016000,1,N,1",171,-441,102031,1.7,-4.4,4.1,22000,Winter
826099999,2008-01-01 03:53:00,4,0.0,0.0,0.0,WXPOD8270,AUTO,KCGI,V020,"280,1,N,0041,1","22000,1,9,N","016093,1,N,1",101,-301,102001,1.0,-3.0,4.1,22000,Winter
826099999,2008-01-01 04:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"999,9,V,0010,1","22000,1,9,N","024140,1,N,1",-301,-501,101961,-3.0,-5.0,1.0,22000,Winter
826099999,2008-01-01 04:53:00,4,0.0,0.0,0.0,WXPOD8270,AUTO,KCGI,V020,"260,1,N,0036,1","22000,1,9,N","016093,1,N,1",1,-301,102071,0.0,-3.0,3.6,22000,Winter
826099999,2008-01-01 05:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"210,1,N,0015,1","22000,1,9,N","024140,1,N,1",-401,-501,101991,-4.0,-5.0,1.5,22000,Winter


## Moyenne des températures, elevations, speed_rates, ceiling_heights et des précipitations par année/mois/journée/saison

In [78]:
# Par année
means_by_year = all_stations.groupBy([year("DATE").alias("year")]).agg(F.round(mean("temperature"), 2).alias("mean_tmp"), F.round(mean("ELEVATION"), 2).alias("mean_elevation"), F.round(mean("speed_rate"), 2).alias("mean_speed_rate"), F.round(mean("ceiling_height"), 2).alias("mean_ceiling_height"), F.round(mean("precipitation"), 2).alias("mean_dew"))
means_by_year = means_by_year.sort("year")
means_by_year

year,mean_tmp,mean_elevation,mean_speed_rate,mean_ceiling_height,mean_dew
2000,6.5,83.99,3.97,9158.99,3.07
2002,5.1,99.39,3.78,10364.9,1.29
2004,5.34,99.69,4.13,10119.5,1.55
2006,4.66,49.06,5.17,6679.71,0.92
2008,3.81,73.26,5.02,6909.59,0.0
2010,3.03,79.17,5.17,7082.15,-0.51
2012,4.48,63.71,7.08,8738.69,0.8
2014,5.25,82.24,7.04,2883.6,1.82
2016,3.95,80.42,4.71,2751.0,0.64
2018,3.97,47.19,6.02,3987.21,0.66


In [79]:
# Par mois
means_by_month = all_stations.groupBy([year("DATE").alias("year"), month("DATE").alias("month")]).agg(F.round(mean("temperature"), 2).alias("mean_tmp"), F.round(mean("ELEVATION"), 2).alias("mean_elevation"), F.round(mean("speed_rate"), 2).alias("mean_speed_rate"), F.round(mean("ceiling_height"), 2).alias("mean_ceiling_height"), F.round(mean("precipitation"), 2).alias("mean_dew"))
means_by_month = means_by_month.sort("year", "month")
means_by_month

year,month,mean_tmp,mean_elevation,mean_speed_rate,mean_ceiling_height,mean_dew
2000,1,-0.17,84.45,4.95,9041.95,-2.73
2000,2,-0.14,85.04,4.83,9051.08,-2.75
2000,3,1.1,84.16,4.62,10068.88,-2.8
2000,4,4.33,85.19,3.69,9123.43,0.43
2000,5,9.5,84.23,3.67,12021.23,4.03
2000,6,11.77,83.83,4.0,9326.8,6.93
2000,7,14.1,81.31,3.22,8645.74,10.16
2000,8,13.63,82.84,2.99,9537.76,10.18
2000,9,10.7,85.1,3.52,10872.76,7.15
2000,10,8.66,83.85,4.26,7911.04,5.94


## Création du champ 'year'

In [80]:
all_stations = all_stations.withColumn("year", year(all_stations["DATE"]))
all_stations

STATION,DATE,SOURCE,LATITUDE,LONGITUDE,ELEVATION,NAME,REPORT_TYPE,CALL_SIGN,QUALITY_CONTROL,WND,CIG,VIS,TMP,DEW,SLP,temperature,precipitation,speed_rate,ceiling_height,season,year
826099999,2008-01-01 00:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"280,1,N,0062,1","22000,1,9,N","024140,1,N,1",201,-401,101701,2.0,-4.0,6.2,22000,Winter,2008
826099999,2008-01-01 01:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"270,1,N,0046,1","22000,1,9,N","024140,1,N,1",101,-401,101811,1.0,-4.0,4.6,22000,Winter,2008
826099999,2008-01-01 01:53:00,4,0.0,0.0,0.0,WXPOD8270,AUTO,KCGI,V020,"300,1,N,0067,1","22000,1,9,N","016093,1,N,1",301,-401,999999,3.0,-4.0,6.7,22000,Winter,2008
826099999,2008-01-01 02:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"270,1,N,0051,1","22000,1,9,N","024140,1,N,1",101,-401,101921,1.0,-4.0,5.1,22000,Winter,2008
826099999,2008-01-01 02:53:00,4,0.0,0.0,0.0,WXPOD8270,AUTO,KCGI,V020,"280,1,N,0041,1","22000,1,9,N","016093,1,N,1",201,-401,101901,2.0,-4.0,4.1,22000,Winter,2008
826099999,2008-01-01 03:00:00,4,0.0,0.0,0.0,WXPOD8270,SY-MT,KFMH,V020,"300,1,N,0041,1","22000,1,9,N","016000,1,N,1",171,-441,102031,1.7,-4.4,4.1,22000,Winter,2008
826099999,2008-01-01 03:53:00,4,0.0,0.0,0.0,WXPOD8270,AUTO,KCGI,V020,"280,1,N,0041,1","22000,1,9,N","016093,1,N,1",101,-301,102001,1.0,-3.0,4.1,22000,Winter,2008
826099999,2008-01-01 04:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"999,9,V,0010,1","22000,1,9,N","024140,1,N,1",-301,-501,101961,-3.0,-5.0,1.0,22000,Winter,2008
826099999,2008-01-01 04:53:00,4,0.0,0.0,0.0,WXPOD8270,AUTO,KCGI,V020,"260,1,N,0036,1","22000,1,9,N","016093,1,N,1",1,-301,102071,0.0,-3.0,3.6,22000,Winter,2008
826099999,2008-01-01 05:00:00,4,0.0,0.0,0.0,WXPOD8270,FM-15,KFMH,V020,"210,1,N,0015,1","22000,1,9,N","024140,1,N,1",-401,-501,101991,-4.0,-5.0,1.5,22000,Winter,2008


## Longitude et latitude par station

In [82]:
annee = int(input("Quelle année? : "))
longitude_latitude_par_station = all_stations.filter(all_stations.year == annee).groupBy(["STATION"]).agg(mean("LATITUDE").alias("lat"), mean("LONGITUDE").alias("long"))
longitude_latitude_par_station.show()

Quelle année? : 2016
+----------+-----------------+------------------+
|   STATION|              lat|              long|
+----------+-----------------+------------------+
|1001499999|59.79192352294922| 5.340849876403809|
|1083099999|70.86666870117188| 29.03333282470703|
|1068099999|71.01667022705078|25.983333587646484|
|1046099999| 69.7868423461914|20.959444046020508|
|1045099999|69.83333587646484|21.883333206176758|
|1059099999|70.06881713867188| 24.97348976135254|
|1074099999|71.03333282470703| 27.83333396911621|
|1052099999|70.68333435058594|23.683332443237305|
|1023099999|69.05575561523438|18.540355682373047|
|1062099999|             76.5|25.066667556762695|
|1007099999|78.91666412353516|11.933333396911621|
|1023199999| 64.3499984741211| 7.800000190734863|
|1010099999| 69.2925033569336|16.144166946411133|
|1028099999|74.51667022705078|19.016666412353516|
|1078099999| 71.0999984741211|28.233333587646484|
|1057099999|69.36666870117188|24.433332443237305|
| 702699999|              0.0

In [83]:
longitude_latitude_par_station.count()

25

In [84]:
filtre_norvege = longitude_latitude_par_station.filter(longitude_latitude_par_station.long >= 5).filter(longitude_latitude_par_station.long <= 12).filter(longitude_latitude_par_station.lat >= 57).filter(longitude_latitude_par_station.lat <= 65)
filtre_norvege

STATION,lat,long
1001499999,59.79192352294922,5.340849876403809
1023199999,64.3499984741211,7.800000190734863


In [87]:
# Par jour, pour l'année choisie et pour une station choisie
num_station = int(input("Quelle station ? : "))
means_by_day = all_stations.filter(all_stations.year == annee).filter(all_stations.STATION == num_station).groupBy([to_date("DATE").cast("date").alias("date")]).agg(F.round(mean("temperature"), 2).alias("mean_tmp"), F.round(mean("ELEVATION"), 2).alias("mean_elevation"), F.round(mean("speed_rate"), 2).alias("mean_speed_rate"), F.round(mean("ceiling_height"), 2).alias("mean_ceiling_height"), F.round(mean("precipitation"), 2).alias("mean_dew"))
means_by_day = means_by_day.sort("date")
means_by_day.show(365)

Quelle station ? : 1023199999
+----------+--------+--------------+---------------+-------------------+--------+
|      date|mean_tmp|mean_elevation|mean_speed_rate|mean_ceiling_height|mean_dew|
+----------+--------+--------------+---------------+-------------------+--------+
|2016-01-01|    7.67|           0.0|          15.27|             2438.0|    -1.0|
|2016-01-04|    -3.0|           0.0|          11.85|              610.0|   -10.0|
|2016-01-05|   -0.33|           0.0|           10.3|              437.0|   -6.33|
|2016-01-06|     1.0|           0.0|            8.5|             495.25|   -1.75|
|2016-01-07|    -0.5|           0.0|            8.0|              610.0|    -7.5|
|2016-01-08|    -5.0|           0.0|          11.05|              503.0|   -10.5|
|2016-01-10|    -3.5|           0.0|           19.3|              853.0|   -11.5|
|2016-01-11|    2.67|           0.0|            9.8|            2712.67|    -6.0|
|2016-01-12|    -2.0|           0.0|          14.27|              68

In [88]:
means_by_day.count()

296

# Apprentissage

## Définition features/valeur cible + splits

In [90]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Load training data
training = means_by_day

vectorAssembler = VectorAssembler(inputCols = ["mean_tmp", "mean_elevation", "mean_speed_rate", "mean_ceiling_height"], outputCol = 'features')
v_training = vectorAssembler.transform(training)
v_training = v_training.select(['features', 'mean_dew'])
v_training.show(3)

+--------------------+--------+
|            features|mean_dew|
+--------------------+--------+
|[7.67,0.0,15.27,2...|    -1.0|
|[-3.0,0.0,11.85,6...|   -10.0|
|[-0.33,0.0,10.3,4...|   -6.33|
+--------------------+--------+
only showing top 3 rows



In [91]:
splits = v_training.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

## 1) Linear regression

In [92]:
lr = LinearRegression(featuresCol = 'features', labelCol='mean_dew', maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(train_df)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

Coefficients: [1.1431101409859672,0.0,0.0,-0.00033128444261112727]
Intercept: -4.893950497476235


In [93]:
lr_predictions = lrModel.transform(train_df)
lr_predictions.select("prediction","mean_dew","features").show()

+-------------------+--------+--------------------+
|         prediction|mean_dew|            features|
+-------------------+--------+--------------------+
| -8.525364430426924|   -10.0|[-3.0,0.0,11.85,6...|
| 3.0660328128002057|    -1.0|[7.67,0.0,15.27,2...|
| -5.415948145422666|   -6.33|[-0.33,0.0,10.3,4...|
|-3.9149089766934284|   -1.75|[1.0,0.0,8.5,495.25]|
|-10.776137277039467|   -10.5|[-5.0,0.0,11.05,5...|
| -2.740511789981629|    -6.0|[2.67,0.0,9.8,271...|
| -7.407431907079403|    -9.5|[-2.0,0.0,14.27,6...|
| -8.628125671112617|    -9.0|[-1.5,0.0,12.35,6...|
| -4.827272917522609|    -3.5|[0.5,0.0,7.5,1524.0]|
|-1.8348867574915113|   -5.33|[3.0,0.0,3.93,111...|
| 2.2081281785262306|     4.5|[6.5,0.0,13.9,990.5]|
|  3.057465254148644|     6.0|[7.0,0.0,11.3,152.0]|
| 0.6195166974608135|    -1.0|[5.0,0.0,19.6,610.0]|
|-0.5234841196590923|   -6.33|[4.0,0.0,11.0,609...|
|  1.459832857900211|    -2.5|[6.0,0.0,14.2,152...|
| 1.9043099462756388|  -11.67|[6.33,0.0,15.97,1...|
|-2.07053932

In [94]:
# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 2.295731
r2: 0.823873


In [95]:
train_df.describe().show()

+-------+------------------+
|summary|          mean_dew|
+-------+------------------+
|  count|               212|
|   mean|3.6852358490566037|
| stddev|5.4832056767802015|
|    min|             -14.0|
|    max|              14.0|
+-------+------------------+



In [96]:
lr_predictions = lrModel.transform(test_df)
lr_predictions.select("prediction","mean_dew","features").show()

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="mean_dew",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+--------------------+--------+--------------------+
|          prediction|mean_dew|            features|
+--------------------+--------+--------------------+
|  -5.667589077962006|    -7.5|[-0.5,0.0,8.0,610.0]|
|  -9.177421620474412|   -11.5|[-3.5,0.0,19.3,85...|
|  -2.541052635550675|    -4.5|[2.5,0.0,5.15,152...|
| -2.8501211036295837|    1.67|[2.0,0.0,15.63,73...|
|  -3.305504658632123|    -4.5|[1.5,0.0,10.55,38...|
|  -3.532682965152704|   -3.25|[1.5,0.0,4.12,106...|
|  -3.213649461040052|    -5.0|[2.0,0.0,11.8,182...|
|   2.501901243889783|     5.0|[7.0,0.0,6.7,1829.0]|
|  0.5188062269070306|     3.0| [5.0,0.0,6.2,914.0]|
| -2.7086626466346324|   -3.67|[2.0,0.0,12.37,30...|
| -2.5086314636691784|    -2.5|[2.25,0.0,11.57,5...|
|  1.4359776877748525|     0.0|[5.67,0.0,11.17,4...|
|-0.46793765716648394|    1.25|[4.0,0.0,12.72,44...|
| -0.6848136401089278|   -0.33|[4.33,0.0,8.4,223...|
|   0.208721298365659|     0.0|[4.67,0.0,4.8,711...|
| -2.9055549294117036|    -1.5| [2.0,0.0,7.7,8

In [97]:
test_result = lrModel.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 2.16682


## 2) Decision tree regression

In [107]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="features", labelCol = 'mean_dew')

In [108]:
# Train model
dt_model = dt.fit(train_df)

In [110]:
# Make predictions
# TRAIN
dt_predictions = dt_model.transform(train_df)
dt_predictions.select("prediction","mean_dew","features").show()

+------------------+--------+--------------------+
|        prediction|mean_dew|            features|
+------------------+--------+--------------------+
|             -10.0|   -10.0|[-3.0,0.0,11.85,6...|
| 1.342962962962963|    -1.0|[7.67,0.0,15.27,2...|
|             -6.33|   -6.33|[-0.33,0.0,10.3,4...|
|            -0.767|   -1.75|[1.0,0.0,8.5,495.25]|
|             -10.0|   -10.5|[-5.0,0.0,11.05,5...|
|            -4.499|    -6.0|[2.67,0.0,9.8,271...|
|             -10.0|    -9.5|[-2.0,0.0,14.27,6...|
|              -8.5|    -9.0|[-1.5,0.0,12.35,6...|
|              -3.5|    -3.5|[0.5,0.0,7.5,1524.0]|
|            -4.499|   -5.33|[3.0,0.0,3.93,111...|
|2.9292307692307693|     4.5|[6.5,0.0,13.9,990.5]|
|           4.72625|     6.0|[7.0,0.0,11.3,152.0]|
|            -0.125|    -1.0|[5.0,0.0,19.6,610.0]|
|-2.388888888888889|   -6.33|[4.0,0.0,11.0,609...|
| 1.342962962962963|    -2.5|[6.0,0.0,14.2,152...|
|            -11.67|  -11.67|[6.33,0.0,15.97,1...|
|            -4.499|   -14.0|[3

In [116]:
# Select (prediction, true label) and compute train error
dt_evaluator = RegressionEvaluator(
    labelCol="mean_dew", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse)

Root Mean Squared Error (RMSE) on train data = 1.54206


In [115]:
dt_model.featureImportances

SparseVector(4, {0: 0.9012, 2: 0.0462, 3: 0.0526})

In [118]:
# TEST
dt_predictions = dt_model.transform(test_df)
dt_predictions.select("prediction","mean_dew","features").show()

+------------------+--------+--------------------+
|        prediction|mean_dew|            features|
+------------------+--------+--------------------+
|             -6.33|    -7.5|[-0.5,0.0,8.0,610.0]|
|              -8.5|   -11.5|[-3.5,0.0,19.3,85...|
|            -4.499|    -4.5|[2.5,0.0,5.15,152...|
|-2.388888888888889|    1.67|[2.0,0.0,15.63,73...|
|-2.388888888888889|    -4.5|[1.5,0.0,10.55,38...|
|            -4.499|   -3.25|[1.5,0.0,4.12,106...|
|            -4.499|    -5.0|[2.0,0.0,11.8,182...|
| 1.342962962962963|     5.0|[7.0,0.0,6.7,1829.0]|
|            -0.125|     3.0| [5.0,0.0,6.2,914.0]|
|-2.388888888888889|   -3.67|[2.0,0.0,12.37,30...|
|-2.388888888888889|    -2.5|[2.25,0.0,11.57,5...|
|             1.918|     0.0|[5.67,0.0,11.17,4...|
|-2.388888888888889|    1.25|[4.0,0.0,12.72,44...|
|          -2.59375|   -0.33|[4.33,0.0,8.4,223...|
|            -0.125|     0.0|[4.67,0.0,4.8,711...|
|            -0.767|    -1.5| [2.0,0.0,7.7,899.0]|
| 1.342962962962963|     0.0|[5

In [119]:
# Select (prediction, true label) and compute test error
dt_evaluator = RegressionEvaluator(
    labelCol="mean_dew", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 2.1913


## 3) Random forest regression

In [121]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="features", labelCol = 'mean_dew')

In [122]:
# Train model
rf_model = rf.fit(train_df)

In [123]:
# Make predictions
# TRAIN
rf_predictions = rf_model.transform(train_df)
rf_predictions.select("prediction","mean_dew","features").show()

+-------------------+--------+--------------------+
|         prediction|mean_dew|            features|
+-------------------+--------+--------------------+
|-7.3166478062411455|   -10.0|[-3.0,0.0,11.85,6...|
| 1.3785656088069222|    -1.0|[7.67,0.0,15.27,2...|
| -3.252227167565139|   -6.33|[-0.33,0.0,10.3,4...|
| -2.088844479738949|   -1.75|[1.0,0.0,8.5,495.25]|
|  -6.86224087983422|   -10.5|[-5.0,0.0,11.05,5...|
| -2.658239422320075|    -6.0|[2.67,0.0,9.8,271...|
| -6.701168098448937|    -9.5|[-2.0,0.0,14.27,6...|
| -6.511727351695692|    -9.0|[-1.5,0.0,12.35,6...|
|-3.8800396001113517|    -3.5|[0.5,0.0,7.5,1524.0]|
| -2.033109099576781|   -5.33|[3.0,0.0,3.93,111...|
|  2.000959997118774|     4.5|[6.5,0.0,13.9,990.5]|
|  5.948583333333334|     6.0|[7.0,0.0,11.3,152.0]|
| 0.7360576486918878|    -1.0|[5.0,0.0,19.6,610.0]|
| -2.808236676770119|   -6.33|[4.0,0.0,11.0,609...|
| 0.7909390428279667|    -2.5|[6.0,0.0,14.2,152...|
|  0.404050174588363|  -11.67|[6.33,0.0,15.97,1...|
|-3.94017639

In [124]:
# Select (prediction, true label) and compute train error
rf_evaluator = RegressionEvaluator(
    labelCol="mean_dew", predictionCol="prediction", metricName="rmse")
rmse = rf_evaluator.evaluate(rf_predictions)
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse)

Root Mean Squared Error (RMSE) on train data = 1.85791


In [125]:
rf_model.featureImportances

SparseVector(4, {0: 0.8411, 2: 0.0642, 3: 0.0947})

In [126]:
# TEST
rf_predictions = rf_model.transform(test_df)
rf_predictions.select("prediction","mean_dew","features").show()

+--------------------+--------+--------------------+
|          prediction|mean_dew|            features|
+--------------------+--------+--------------------+
|  -3.831767747699059|    -7.5|[-0.5,0.0,8.0,610.0]|
|  -5.478228571428572|   -11.5|[-3.5,0.0,19.3,85...|
| -1.9263142592478535|    -4.5|[2.5,0.0,5.15,152...|
| -2.5510608535009025|    1.67|[2.0,0.0,15.63,73...|
|-0.10221854395604395|    -4.5|[1.5,0.0,10.55,38...|
| -1.8498648784429002|   -3.25|[1.5,0.0,4.12,106...|
| -3.4959253898654388|    -5.0|[2.0,0.0,11.8,182...|
|  1.2254282248827582|     5.0|[7.0,0.0,6.7,1829.0]|
|  1.3073120292482499|     3.0| [5.0,0.0,6.2,914.0]|
|-0.11850425824175823|   -3.67|[2.0,0.0,12.37,30...|
|  -2.853112206314756|    -2.5|[2.25,0.0,11.57,5...|
|  2.4642211975058226|     0.0|[5.67,0.0,11.17,4...|
| -0.2761658151059466|    1.25|[4.0,0.0,12.72,44...|
| -0.8929749221538904|   -0.33|[4.33,0.0,8.4,223...|
|-0.00967208484252...|     0.0|[4.67,0.0,4.8,711...|
| -1.4465946653226869|    -1.5| [2.0,0.0,7.7,8

In [127]:
# Select (prediction, true label) and compute test error
rf_evaluator = RegressionEvaluator(
    labelCol="mean_dew", predictionCol="prediction", metricName="rmse")
rmse = rf_evaluator.evaluate(rf_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 2.25863


## 4) Gradient-boosted tree regression

In [128]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Train a GBT model.
gbt = GBTRegressor(featuresCol="features", labelCol = 'mean_dew')

In [129]:
# Train model
gbt_model = gbt.fit(train_df)

In [130]:
# Make predictions
# TRAIN
gbt_predictions = gbt_model.transform(train_df)
gbt_predictions.select("prediction","mean_dew","features").show()

+-------------------+--------+--------------------+
|         prediction|mean_dew|            features|
+-------------------+--------+--------------------+
|-10.012993138295712|   -10.0|[-3.0,0.0,11.85,6...|
|-0.3760952783547731|    -1.0|[7.67,0.0,15.27,2...|
| -6.269155408929225|   -6.33|[-0.33,0.0,10.3,4...|
| -2.111426104563429|   -1.75|[1.0,0.0,8.5,495.25]|
|-10.294357516491063|   -10.5|[-5.0,0.0,11.05,5...|
| -4.978492349929067|    -6.0|[2.67,0.0,9.8,271...|
| -9.735104249406824|    -9.5|[-2.0,0.0,14.27,6...|
| -8.718766366590506|    -9.0|[-1.5,0.0,12.35,6...|
| -3.475472603203585|    -3.5|[0.5,0.0,7.5,1524.0]|
| -4.685440508801604|   -5.33|[3.0,0.0,3.93,111...|
|  3.509628177812976|     4.5|[6.5,0.0,13.9,990.5]|
|  5.603117433292491|     6.0|[7.0,0.0,11.3,152.0]|
|-0.5573343507200849|    -1.0|[5.0,0.0,19.6,610.0]|
| -3.727986741748262|   -6.33|[4.0,0.0,11.0,609...|
|-0.9588767284930477|    -2.5|[6.0,0.0,14.2,152...|
|-11.558122411658037|  -11.67|[6.33,0.0,15.97,1...|
|-11.1431603

In [132]:
# Select (prediction, true label) and compute train error
gbt_evaluator = RegressionEvaluator(
    labelCol="mean_dew", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.784202


In [134]:
gbt_model.featureImportances

SparseVector(4, {0: 0.7048, 2: 0.1843, 3: 0.1109})

In [135]:
# TEST
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select("prediction","mean_dew","features").show()

+-------------------+--------+--------------------+
|         prediction|mean_dew|            features|
+-------------------+--------+--------------------+
| -6.630135810659344|    -7.5|[-0.5,0.0,8.0,610.0]|
| -7.454984726787373|   -11.5|[-3.5,0.0,19.3,85...|
|-3.1213274119828216|    -4.5|[2.5,0.0,5.15,152...|
|  -2.24104683988993|    1.67|[2.0,0.0,15.63,73...|
| -2.301366904664947|    -4.5|[1.5,0.0,10.55,38...|
|-3.1731335087889625|   -3.25|[1.5,0.0,4.12,106...|
|-4.9714026112381795|    -5.0|[2.0,0.0,11.8,182...|
| 0.9756442141715357|     5.0|[7.0,0.0,6.7,1829.0]|
|0.14009298637199685|     3.0| [5.0,0.0,6.2,914.0]|
|-1.7092163591994254|   -3.67|[2.0,0.0,12.37,30...|
|-3.4196420352249652|    -2.5|[2.25,0.0,11.57,5...|
|  2.285495683899277|     0.0|[5.67,0.0,11.17,4...|
| -2.454838017124496|    1.25|[4.0,0.0,12.72,44...|
|-2.4043497754693464|   -0.33|[4.33,0.0,8.4,223...|
|-1.0100351753289767|     0.0|[4.67,0.0,4.8,711...|
|-0.5114769105755742|    -1.5| [2.0,0.0,7.7,899.0]|
| 1.46894440

In [136]:
# Select (prediction, true label) and compute test error
gbt_evaluator = RegressionEvaluator(
    labelCol="mean_dew", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 2.23431


## Conclusion
On a trouvé la valeur moyenne des précipitations pour le "01/01/2016" dans la ville la plus proche de la station puisqu'elle est en mer : 2.5° réel contre "-0.94°" prédit avec la régression linéaire et "°" avec le decision tree.


In [117]:
# viz = lr_predictions.select("prediction","mean_dew","features").toPandas()
# viz
# viz.plot.scatter("mean_dew","features")

## Choses à faire


0) Faire valider le 1er modele par Sayf
1) Arrondir toutes les valeurs
2) Refaire le split (train/test)
3) D'autres algos : bataille d'I.A.
4) Voir d'autres prédictions prtinentes à faire