In [28]:
#!pip install wget
#!pip install sqlalchemy

from sqlalchemy import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col
from pyspark.sql.window import Window
from pyspark import SparkFiles
import requests
import re
import os
from multiprocessing import Pool
import wget
import pandas as pd
import datetime
from dateutil import parser

spark = SparkSession.builder.getOrCreate()

In [18]:
# Récupération de tous les DATASETS selectionnés en ligne

choosed_stations = ["07002099999", "93546099999", "71043599999", "80372099999", "43376099999"]
choosed_years = [1980, 1995, 2002, 2010] # et supérieur à 2010

def get_choosed_years():
    selected_years = choosed_years
    currentYear = datetime.datetime.now().year
    [selected_years.append(x) for x in range(choosed_years[-1] + 1, currentYear + 1)] 
    return selected_years

def is_choosed_stations(filename):
    file = filename.split('.')[0]
    return file in choosed_stations
    
def downloadFiles(file):
    fullLocalPath = "data/" + str(year) + "/" + str(year) + "_" + file
    if not os.path.exists(fullLocalPath):
        wget.download(url + "/" + file, fullLocalPath)
        
def printEndPool(test):
    print('end!', test)

# Définir le paramétrage du parallèlisme
chunk_size = 2
nb_process = 10

# Parcourir les années
for year in get_choosed_years():
    # Ajouter le dossier
    subDir = "data/" + str(year)
    if not os.path.exists(subDir):
        os.mkdir(subDir)
    
    # Parcourir les noms de fichiers
    url = "https://www.ncei.noaa.gov/data/global-hourly/access/" + str(year)
    txt = requests.get(url).text
    files = list(re.findall(" *(\d{11}.csv)", txt))
    all_files = []
    [all_files.append(x) for x in files if (x not in all_files) and is_choosed_stations(x)]

    # Execute la fonction [downloadFiles] en parallèle avec l'argument [all_files]
    # et divisé en plusiseurs groupe de longueur [chunk_size]
    pool = Pool()
    pool.map_async(downloadFiles, all_files, chunk_size, printEndPool)

end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]
end! [None, None, None, None, None]


In [4]:
# Récupérer les datasets en local
path = "./data/*/*.csv"
df = spark.read.format('csv').options(header=True, inferSchema=True).load(path)

# Toutes les colonnes du dataframe
# ['STATION', 'DATE', 'SOURCE', 'LATITUDE', 'LONGITUDE', 'ELEVATION', \
# 'NAME', 'REPORT_TYPE', 'CALL_SIGN', 'QUALITY_CONTROL', 'WND', 'CIG', \
# 'VIS', 'TMP', 'DEW', 'SLP', 'AA1', 'AA2', 'AY1', 'AY2', 'GA1', 'GA2', \
# 'GA3', 'GE1', 'GF1', 'IA1', 'KA1', 'KA2', 'MA1', 'MD1', 'MW1', 'OC1', \
# 'OD1', 'UA1', 'REM', 'EQD']

In [5]:
# Diviser les colonnes pour mieux filtrer
udf_get_year = udf(lambda date: parser.parse(date).year)
udf_get_degree = udf(lambda tmp: float(int(tmp)/10))

df = df.withColumn('TMP_DEGREE', F.split(df['TMP'], ',').getItem(0)).withColumn('TMP_QUALITY', F.split(df['TMP'], ',').getItem(1)) 
df = df.withColumn('TMP_DEGREE', udf_get_degree(col('TMP_DEGREE')))
df = df.withColumn('TMP_DEGREE', col('TMP_DEGREE').cast(FloatType()))
    
df = df.withColumn('WND_SPEED', F.split(df['WND'], ',').getItem(3)).withColumn('WND_SPEED_QUALITY', F.split(df['WND'], ',').getItem(4)) 
df = df.withColumn('WND_ANGLE', F.split(df['WND'], ',').getItem(0)).withColumn('WND_ANGLE_QUALITY', F.split(df['WND'], ',').getItem(1)).withColumn('WND_TYPE', F.split(df['WND'], ',').getItem(2))  
df = df.withColumn('CIG_HEIGHT', F.split(df['CIG'], ',').getItem(0)).withColumn('CIG_QUALITY', F.split(df['CIG'], ',').getItem(1))
df = df.withColumn('CIG_METHOD', F.split(df['CIG'], ',').getItem(2)).withColumn('CIG_CAVOK', F.split(df['CIG'], ',').getItem(3))    
df = df.withColumn('VIS_DISTANCE', F.split(df['VIS'], ',').getItem(0)).withColumn('VIS_DISTANCE_QUALITY', F.split(df['VIS'], ',').getItem(1))    
df = df.withColumn('VIS_VARIABILITY', F.split(df['VIS'], ',').getItem(2)).withColumn('VIS_VARIABILITY_QUALITY', F.split(df['VIS'], ',').getItem(3))    
df = df.withColumn('SLP_AIR_PRESSURE', F.split(df['SLP'], ',').getItem(0)).withColumn('SLP_AIR_PRESSURE_QUALITY', F.split(df['SLP'], ',').getItem(1))
df = df.withColumn('YEAR', udf_get_year(col('DATE')))

#df.select("YEAR").distinct().show()

In [6]:
# Filtrer par le code qualité
quality = [0, 1, 4, 5, 9]
df_quality = df.where(df.TMP_QUALITY.isin(quality)).where(df.WND_SPEED_QUALITY.isin(quality)).where(df.WND_ANGLE_QUALITY.isin(quality))\
            .where(df.CIG_QUALITY.isin(quality)).where(df.VIS_DISTANCE_QUALITY.isin(quality)).where(df.VIS_VARIABILITY_QUALITY.isin(quality))\
            .where(df.SLP_AIR_PRESSURE_QUALITY.isin(quality))

# Retirer les données manquantes (TEMPERATURE)
missing_tmp = 999.9
#df_quality_tmp = df_quality.filter(df_quality.TMP_DEGREE != missing_tmp)
df_quality.createOrReplaceTempView('df_quality_view')
df_quality_tmp = spark.sql("SELECT * FROM df_quality_view WHERE FLOAT(TMP_DEGREE) != FLOAT(" + str(missing_tmp) + ")")


# Retirer les données manquantes (VENT)
missing_wind_angle = 999
missing_wind_speed = 9999
df_quality_wind = df_quality.where(df_quality.WND_ANGLE != missing_wind_angle) \
                .where(df_quality.WND_SPEED != missing_wind_speed)

# Retirer les données manquantes (CIEL)
missing_cig_height = 99999
missing_cig_method = 9
df_quality_sky = df_quality.where(df_quality.CIG_HEIGHT != missing_cig_height) \
                .where(df_quality.CIG_METHOD != missing_cig_method)

# Retirer les données manquantes (VISIBILITE)
missing_vis_distance = 999999
missing_vis_variability = 9
df_quality_vis = df_quality.where(df_quality.VIS_DISTANCE != missing_vis_distance) \
                .where(df_quality.VIS_VARIABILITY != missing_vis_variability)

#df_quality.groupby('YEAR').count().orderBy("YEAR").show()


In [38]:
def to_database_format(df, col_name):
    return df.groupBy(col('YEAR'), col('STATION')) \
            .agg(F.mean(col_name).alias('mean'), \
                 F.stddev(col_name).alias('std'), \
                 F.min(col_name).alias('min'), \
                 F.expr('percentile(' + col_name + ', array(0.25))')[0].alias('%25'), \
                 F.expr('percentile(' + col_name + ', array(0.5))')[0].alias('%50'), \
                 F.expr('percentile(' + col_name + ', array(0.75))')[0].alias('%75'), \
                 F.max(col_name).alias('max'))

In [41]:
to_database = {
    # Données pour la température
    'TEMP_CALCUL': to_database_format(df_quality_tmp, 'TMP_DEGREE').toPandas(),
    
    # Données pour les stations
    'STATION': df.select(['STATION', 'LATITUDE', 'LONGITUDE', 'NAME']).distinct().toPandas()
}

#df_quality_wind.describe(['WND_SPEED']).show()
#df_quality_sky.describe(['CIG_HEIGHT']).show()
#df_quality_vis.describe(['VIS_DISTANCE']).show()
#df_quality_tmp.select(['STATION','TMP_DEGREE']).distinct().orderBy(col('TMP_DEGREE').desc())

In [47]:
# Insertion en base de données
engine = create_engine("mysql://ateam:ateam@mysql:3306/ateam")

for table_name, df in to_database.items():
    df.to_sql(table_name, con=engine, if_exists='replace')

In [None]:
# Insertion en base de données distante
engine = create_engine("mysql://266906_spark:ateamateam1234@mysql-sarahipssi.alwaysdata.net/sarahipssi_ateam")

for table_name, df in to_database.items():
    df.to_sql(table_name, con=engine, if_exists='replace')

In [49]:
for table_name, df in to_database.items():
    df.to_csv(table_name+".csv", sep=";")