# **Tentative d'extraction de données de la nasa sur les paramètres de l'énergie renouvelable**

In [2]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=2cc75bc6da0247e508c6e0ec1a7bd257d277a1630b5775bd1df4b1062920e215
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark import SparkContext

import pandas as pd
import requests
import json as js
import time


In [4]:
power_url = "https://power.larc.nasa.gov/"
api_route = "api/temporal/monthly/point"
POWER_Monthly_Annual_API_URL = power_url + api_route

Parameters = {
              "QV2M": "MERRA-2 Specific Humidity at 2 Meters (g/kg)",
              "RH2M": "MERRA-2 Relative Humidity at 2 Meters (%)",
              "ALLSKY_KT": "CERES SYN1deg All Sky Insolation Clearness Index (dimensionless)",
              "CLOUD_AMT": "CERES SYN1deg Cloud Amount (%)",
              "CLRSKY_KT": "CERES SYN1deg Clear Sky Insolation Clearness Index (dimensionless)",
              "TOA_SW_DWN": "CERES SYN1deg Top-Of-Atmosphere Shortwave Downward Irradiance (kW-hr/m^2/day)",
              "PRECTOTCORR": "MERRA-2 Precipitation Corrected (mm/day)",
              "ALLSKY_SFC_UVA": "CERES SYN1deg All Sky Surface UVA Irradiance (W/m^2)",
              "ALLSKY_SFC_UVB": "CERES SYN1deg All Sky Surface UVB Irradiance (W/m^2)",
              "ALLSKY_SRF_ALB": "CERES SYN1deg All Sky Surface Albedo (dimensionless)",
              "PRECTOTCORR_SUM": "MERRA-2 Precipitation Corrected Sum (mm)",
              "ALLSKY_SFC_SW_DNI": "CERES SYN1deg All Sky Surface Shortwave Downward Direct Normal Irradiance (kW-hr/m^2/day)",
              "ALLSKY_SFC_SW_DWN": "CERES SYN1deg All Sky Surface Shortwave Downward Irradiance (kW-hr/m^2/day)",
              "CLRSKY_SFC_SW_DWN": "CERES SYN1deg Clear Sky Surface Shortwave Downward Irradiance (kW-hr/m^2/day)",
              "ALLSKY_SFC_PAR_TOT": "CERES SYN1deg All Sky Surface PAR Total (W/m^2)",
              "ALLSKY_SFC_SW_DIFF": "CERES SYN1deg All Sky Surface Shortwave Diffuse Irradiance (kW-hr/m^2/day)",
              "CLRSKY_SFC_PAR_TOT": "CERES SYN1deg Clear Sky Surface PAR Total (W/m^2)",
              "ALLSKY_SFC_UV_INDEX": "CERES SYN1deg All Sky Surface UV Index (dimensionless)"
}

attribute_series = pd.Series(Parameters)
display(attribute_series)

parameters = list(Parameters.keys())

QV2M                        MERRA-2 Specific Humidity at 2 Meters (g/kg)
RH2M                           MERRA-2 Relative Humidity at 2 Meters (%)
ALLSKY_KT              CERES SYN1deg All Sky Insolation Clearness Ind...
CLOUD_AMT                                 CERES SYN1deg Cloud Amount (%)
CLRSKY_KT              CERES SYN1deg Clear Sky Insolation Clearness I...
TOA_SW_DWN             CERES SYN1deg Top-Of-Atmosphere Shortwave Down...
PRECTOTCORR                     MERRA-2 Precipitation Corrected (mm/day)
ALLSKY_SFC_UVA         CERES SYN1deg All Sky Surface UVA Irradiance (...
ALLSKY_SFC_UVB         CERES SYN1deg All Sky Surface UVB Irradiance (...
ALLSKY_SRF_ALB         CERES SYN1deg All Sky Surface Albedo (dimensio...
PRECTOTCORR_SUM                 MERRA-2 Precipitation Corrected Sum (mm)
ALLSKY_SFC_SW_DNI      CERES SYN1deg All Sky Surface Shortwave Downwa...
ALLSKY_SFC_SW_DWN      CERES SYN1deg All Sky Surface Shortwave Downwa...
CLRSKY_SFC_SW_DWN      CERES SYN1deg Clear Sky Surf

In [5]:
def function_exec_time(funct):
  def wrapper(*args, **kwargs):
    start = time.time()
    result = funct(*args, **kwargs)
    end = time.time()
    execution_time = end - start
    print(f"execution time of this function is : {(end - start):.4f} s")
    return result
  return wrapper

In [26]:
def process_partition(iterator, ville, longitude, latitude):


   data = []
   for i in iterator:
        params = {
            "start": 2013,
            "end": 2022,
            'latitude': latitude,
            "longitude": longitude,
            "community": "re",  # re as Renewable Energy,
                                # sb as Sustainable Buildings
                                # ag as AGroclimatology
            "parameters": i,
            "format": "json",
            "user": None,
            "header": "False",
            "site-elevation": None,
            "wind-elevation": None,
            "wind-surface": None
            }

        city_table_response = requests.get(POWER_Monthly_Annual_API_URL,
                                           params=params)

        city_table_response = city_table_response.json()
        city_table_response = city_table_response.get('properties') \
                                                          ['parameter'][i]

        new_rows = [(i, ville, *city_table_response.values())]

        data.append(new_rows)

   return iter(data)


def process_partition_wrapper(iterator):
    """
    Wrapper pour capturer les arguments supplémentaires.
    """
    return process_partition(iterator, ville, longitude, latitude)


In [31]:
# rows by a point(here a city) defined with latitude and longitude
# and any parameters it has
@function_exec_time
def POWER_Monthly_Annual_city_data(ville: str,
                                   latitude: float, longitude: float,
                                   parameters: str, columns: list):
    """
    give the parameters and parameters value of a city each month a year
    from start year to end year
    """

    spark = SparkSession.builder.appName("POWER_Monthly_Annual_city_data") \
                                .getOrCreate()

    # Création de la RDD à partir des paramètres
    # params_rdd = spark.sparkContext.parallelize([(parameter,) for parameter in parameters])

    # Define the structure of the dataframe
    schema_list = [(StructField('parameters', StringType(), True)),
                   (StructField('ville', StringType(), True))] + \
                   [(StructField(col, FloatType(), True)) for col in columns]

    schema = StructType(schema_list)


    sub_data_collect = spark.createDataFrame([], schema)

    params_df = spark.createDataFrame([(element,) for element in parameters],
                                                           ["parameters"])

    new_data = params_df.rdd.mapPartitions(process_partition_wrapper)

    # Application de la fonction de traitement à chaque partition de la RDD
    # processed_rdd = params_rdd.mapPartitions(process_partition_wrapper)

    # Application de la fonction de traitement à chaque partition de la RDD
    # processed_rdd = params_rdd.mapPartitions(lambda iterator:
                                          #  process_partition(iterator, ville,
                                            #          longitude, latitude))

    # Création du DataFrame Spark à partir de la RDD traitée
    processed_df = spark.createDataFrame(new_data,
                                              ["parameters", "ville"] + \
                                         columns)



    sub_data_collect = sub_data_collect.union(processed_df)

    return sub_data_collect


In [20]:
#test
spark = SparkSession.builder.appName("POWER_Monthly_Annual_city_data") \
                                                                  .getOrCreate()
schema_list = [(StructField('parameters', StringType(), True)),
                (StructField('ville', StringType(), True))] + \
                [(StructField(col, FloatType(), True)) for col in col_name]

schema = StructType(schema_list)


sub_data_collect = spark.createDataFrame([], schema)
sub_data_collect.show()

+----------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+
|parameters|ville|201301|201302|201303|201304|201305|201306|201307|2013

In [9]:
ville_position = pd.read_csv("cities-data-and-location.csv")
ville_position = ville_position[["ville", 'longitude', 'latitude']]
display(ville_position)

Unnamed: 0,ville,longitude,latitude
0,Abidjan,-4.016107,5.320357
1,Bouaké,-5.029841,7.690606
2,Korhogo,-5.634247,9.457472
3,Daloa,-6.45161,6.87669
4,San-Pédro,-6.641549,4.758159
5,Anyama,-4.056921,5.49589
6,Yamoussoukro,-5.277603,6.820007
7,Divo,-5.359225,5.829404
8,Gagnoa,-5.95177,6.132975
9,Soubré,-6.592833,5.785267


In [22]:
# Exemple pour obtention des derniers features
params = {
    "start": 2013,
    "end": 2022,
    'latitude': 5.320357,
    "longitude": -4.016107,
    "community": "re",  # re as Renewable Energy,
                        # sb as Sustainable Buildings
                        # ag as AGroclimatology
    "parameters": "QV2M",
    "format": "json",
    "user": None,
    "header": "False",
    "site-elevation": None,
    "wind-elevation": None,
    "wind-surface": None
    }

response = requests.get(POWER_Monthly_Annual_API_URL, params=params)
response = response.json()
col_name = response.get('properties')['parameter'] \
                                                    [params["parameters"]] \
                                                    .keys()
print(col_name)


dict_keys(['201301', '201302', '201303', '201304', '201305', '201306', '201307', '201308', '201309', '201310', '201311', '201312', '201313', '201401', '201402', '201403', '201404', '201405', '201406', '201407', '201408', '201409', '201410', '201411', '201412', '201413', '201501', '201502', '201503', '201504', '201505', '201506', '201507', '201508', '201509', '201510', '201511', '201512', '201513', '201601', '201602', '201603', '201604', '201605', '201606', '201607', '201608', '201609', '201610', '201611', '201612', '201613', '201701', '201702', '201703', '201704', '201705', '201706', '201707', '201708', '201709', '201710', '201711', '201712', '201713', '201801', '201802', '201803', '201804', '201805', '201806', '201807', '201808', '201809', '201810', '201811', '201812', '201813', '201901', '201902', '201903', '201904', '201905', '201906', '201907', '201908', '201909', '201910', '201911', '201912', '201913', '202001', '202002', '202003', '202004', '202005', '202006', '202007', '202008',

# Autre approche:
- récupération de la liste

In [100]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from urllib.parse import urlencode
import requests

def fetch_data(params):
    """
    Effectue une requête HTTP avec les paramètres donnés et retourne
    les données.
    """
    url = POWER_Monthly_Annual_API_URL
    response = requests.get(url, params=params)
    response_json = response.json()
    return response_json.get('properties', {}).get('parameter', {}).get(params["parameters"], {})

def process_partition(iterator):
    """
    Processus pour chaque partition du DataFrame.
    """
    data = []
    for params in iterator:

      # ajout d'une boucle for
        city_table_response = fetch_data(list(params.values())[0])
        if city_table_response:
            new_row = (list(params.values())[0]["parameters"],
                       list(params.keys())[0], *city_table_response.values())

            data.append(new_row)
    return iter(data)
@function_exec_time
def POWER_Monthly_Annual_city_data(city_data: list, parameters: list, columns: list):
    """
    Donner les paramètres et les valeurs des paramètres de chaque ville chaque
    mois d'une année de l'année de début à l'année de fin.
    """
    spark = SparkSession.builder.appName("POWER_Monthly_Annual_city_data").getOrCreate()

    # Définir le schéma du DataFrame
    schema = StructType([
        StructField("parameters", StringType(), True),
        StructField("ville", StringType(), True)
                 ] + [StructField(col, FloatType(), True) for col in columns])

    # Créer un DataFrame vide
    sub_data_collect = spark.createDataFrame([], schema)

    # Créer une RDD à partir des paramètres et des villes
    params_rdd = spark.sparkContext.parallelize(city_data)

    # Appliquer la fonction de traitement à chaque partition de la RDD
    processed_rdd = params_rdd.mapPartitions(process_partition)

    # Convertir les données traitées en DataFrame Spark
    processed_df = processed_rdd.toDF(["parameters", "ville"] + columns)

    # Union avec le DataFrame vide
    sub_data_collect = sub_data_collect.union(processed_df)

    return sub_data_collect

# Exemple d'appel




In [54]:
# création de la lsite qui est l'itérateur every_params

spark = SparkSession.builder.appName("POWER_Monthly_Annual_city_data") \
                                                                  .getOrCreate()
spark_ville_position = spark.createDataFrame(ville_position)
spark_ville_position = spark_ville_position.collect()

every_params = []
for (i, record) in enumerate(spark_ville_position):
  ville, longitude, latitude =  spark_ville_position[i]

  for param in parameters:
    params_position = {ville:
                        {
                              "start": 2013,
                              "end": 2022,
                              'latitude': latitude,
                              "longitude": longitude,
                              "community": "re",  # re as Renewable Energy,
                                                  # sb as Sustainable Buildings
                                                  # ag as AGroclimatology
                              "parameters": param,
                              "format": "json",
                              "user": None,
                              "header": "False",
                              "site-elevation": None,
                              "wind-elevation": None,
                              "wind-surface": None
                              }
                      }

    every_params.append(params_position)

print(every_params)

[{'Abidjan': {'start': 2013, 'end': 2022, 'latitude': 5.320357, 'longitude': -4.016107, 'community': 're', 'parameters': 'QV2M', 'format': 'json', 'user': None, 'header': 'False', 'site-elevation': None, 'wind-elevation': None, 'wind-surface': None}}, {'Abidjan': {'start': 2013, 'end': 2022, 'latitude': 5.320357, 'longitude': -4.016107, 'community': 're', 'parameters': 'RH2M', 'format': 'json', 'user': None, 'header': 'False', 'site-elevation': None, 'wind-elevation': None, 'wind-surface': None}}, {'Abidjan': {'start': 2013, 'end': 2022, 'latitude': 5.320357, 'longitude': -4.016107, 'community': 're', 'parameters': 'ALLSKY_KT', 'format': 'json', 'user': None, 'header': 'False', 'site-elevation': None, 'wind-elevation': None, 'wind-surface': None}}, {'Abidjan': {'start': 2013, 'end': 2022, 'latitude': 5.320357, 'longitude': -4.016107, 'community': 're', 'parameters': 'CLOUD_AMT', 'format': 'json', 'user': None, 'header': 'False', 'site-elevation': None, 'wind-elevation': None, 'wind-sur

In [55]:
len(every_params)


792

In [None]:
for param in every_params:
  print(list(param.keys())[0], list(param.values())[0]["parameters"])


In [80]:
col_name

dict_keys(['201301', '201302', '201303', '201304', '201305', '201306', '201307', '201308', '201309', '201310', '201311', '201312', '201313', '201401', '201402', '201403', '201404', '201405', '201406', '201407', '201408', '201409', '201410', '201411', '201412', '201413', '201501', '201502', '201503', '201504', '201505', '201506', '201507', '201508', '201509', '201510', '201511', '201512', '201513', '201601', '201602', '201603', '201604', '201605', '201606', '201607', '201608', '201609', '201610', '201611', '201612', '201613', '201701', '201702', '201703', '201704', '201705', '201706', '201707', '201708', '201709', '201710', '201711', '201712', '201713', '201801', '201802', '201803', '201804', '201805', '201806', '201807', '201808', '201809', '201810', '201811', '201812', '201813', '201901', '201902', '201903', '201904', '201905', '201906', '201907', '201908', '201909', '201910', '201911', '201912', '201913', '202001', '202002', '202003', '202004', '202005', '202006', '202007', '202008',

In [94]:
print(every_params[0]["Abidjan"])

{'start': 2013, 'end': 2022, 'latitude': 5.320357, 'longitude': -4.016107, 'community': 're', 'parameters': 'QV2M', 'format': 'json', 'user': None, 'header': 'False', 'site-elevation': None, 'wind-elevation': None, 'wind-surface': None}


In [97]:
params_entree = every_params[0]["Abidjan"]
resultat_requete_test = fetch_data(params_entree)
display(resultat_requete_test)

<generator object fetch_data at 0x79d7aacaa490>

In [101]:
city_data = every_params
parameters = parameters # Ajoutez d'autres paramètres ici

result = POWER_Monthly_Annual_city_data(city_data, parameters, list(col_name))
result.show(1)

execution time of this function is : 2618.6273 s
+----------+-------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+
|parameters|  ville|

In [None]:
# sauvegarde de la données
result.toPandas().to_csv("nasa-renewable-energy-data-pyspark.csv", index=False)
