# 1er étape :
   ###       collecter les données a partir de OpenWeather et les stocker dans HDFS

In [1]:
import requests
import pandas as pd
import time
from datetime import datetime
import subprocess
import os

In [2]:
# API key 
API_KEY = "4edc9281ea0d683c6caf71c7fdfdb7fa"

# Define the list of cities and country code
cities = ["Casablanca", "Tétouan", "Ouarzazate", "Ifrane", "Laayoune"]
country_code = "MA"

# Initialize an empty DataFrame to store weather data
columns = ["City", "Timestamp", "Temperature (°C)", "Humidity (%)", "Winds (m/s)", "Clouds (%)", "Rain_1h (mm)"]
weather_df = pd.DataFrame(columns=columns)

# Function to retrieve weather data
def get_weather_data(city):
    try:
        # API endpoint for current weather data
        url = f"http://api.openweathermap.org/data/2.5/weather?q={city},{country_code}&appid={API_KEY}"
        response = requests.get(url)
        data = response.json()

        # Extract relevant weather information
        temperature_kelvin = data["main"]["temp"]
        temperature_celsius = temperature_kelvin - 273.15
        humidity = data["main"]["humidity"]
        winds = data["wind"]["speed"]
        clouds = data["clouds"]["all"]
        rain = data["rain"]["1h"] if "rain" in data and "1h" in data["rain"] else 0

        # Get the current timestamp
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # Append data to the DataFrame
        weather_df.loc[len(weather_df)] = [city, current_time, temperature_celsius, humidity, winds, clouds, rain]

    except Exception as e:
        print(f"Error fetching weather data for {city}: {e}")

# Main loop to collect data every 10 seconds
while True:
    for city in cities:
        get_weather_data(city)
    
    if os.path.exists("BIG_DATA.csv"):
        # Remove the existing file
        os.remove("BIG_DATA.csv")

    # Save the DataFrame to a CSV file
    weather_df.to_csv("BIG_DATA.csv", index=False)
    
    # Define the command to execute
    command = ["hdfs", "dfs", "-put", "-f", "BIG_DATA.csv", "/user/hdfs/BIG_DATA.csv"]

    # Execute the command
    subprocess.run(command, shell=True)

    time.sleep(10)

KeyboardInterrupt: 

In [3]:
weather_df.shape

(430, 7)

# 2ème étape:
  ### prétraitement et traitement

# 2.1 prétraitement


In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, col

# Créer une SparkSession
spark = SparkSession.builder \
    .appName("Read CSV") \
    .getOrCreate()

# Lire le fichier CSV dans un DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .load("hdfs://localhost:9000/user/hdfs/BIG_DATA.csv")

In [11]:
# Afficher le DataFrame 
df.show()

+----------+-------------------+------------------+------------+-----------+----------+------------+
|      City|          Timestamp|  Temperature (°C)|Humidity (%)|Winds (m/s)|Clouds (%)|Rain_1h (mm)|
+----------+-------------------+------------------+------------+-----------+----------+------------+
|Casablanca|2024-05-28 20:19:32|22.230000000000018|          73|       3.09|         2|         0.0|
|   Tétouan|2024-05-28 20:19:32|21.430000000000007|          83|       6.69|        40|         0.0|
|Ouarzazate|2024-05-28 20:19:33|             34.06|           7|       4.63|        20|         0.0|
|    Ifrane|2024-05-28 20:19:33|21.090000000000032|          35|       6.78|       100|         0.0|
|  Laayoune|2024-05-28 20:19:33|              24.0|          46|       8.23|         0|         0.0|
|Casablanca|2024-05-28 20:19:48|22.230000000000018|          73|       3.09|         2|         0.0|
|   Tétouan|2024-05-28 20:19:49|21.430000000000007|          83|       6.69|        40|    

In [12]:
# Convert the temperature column to integer type
df = df.withColumn("Temperature (°C)", col("Temperature (°C)").cast("int"))

# Show the DataFrame after preprocessing
df.show()

+----------+-------------------+----------------+------------+-----------+----------+------------+
|      City|          Timestamp|Temperature (°C)|Humidity (%)|Winds (m/s)|Clouds (%)|Rain_1h (mm)|
+----------+-------------------+----------------+------------+-----------+----------+------------+
|Casablanca|2024-05-28 20:19:32|              22|          73|       3.09|         2|         0.0|
|   Tétouan|2024-05-28 20:19:32|              21|          83|       6.69|        40|         0.0|
|Ouarzazate|2024-05-28 20:19:33|              34|           7|       4.63|        20|         0.0|
|    Ifrane|2024-05-28 20:19:33|              21|          35|       6.78|       100|         0.0|
|  Laayoune|2024-05-28 20:19:33|              24|          46|       8.23|         0|         0.0|
|Casablanca|2024-05-28 20:19:48|              22|          73|       3.09|         2|         0.0|
|   Tétouan|2024-05-28 20:19:49|              21|          83|       6.69|        40|         0.0|
|Ouarzazat

In [13]:
# Renaming the column to remove "%" and dividing its values by 100
df = df.withColumn("Humidity", col("Humidity (%)") / 100).drop("Humidity (%)")

# Rename the column to remove "%" and divide its values by 100
df = df.withColumn("Clouds", col("Clouds (%)") / 100).drop("Clouds (%)")

# Show the DataFrame after preprocessing
df.show()

+----------+-------------------+----------------+-----------+------------+--------+------+
|      City|          Timestamp|Temperature (°C)|Winds (m/s)|Rain_1h (mm)|Humidity|Clouds|
+----------+-------------------+----------------+-----------+------------+--------+------+
|Casablanca|2024-05-28 20:19:32|              22|       3.09|         0.0|    0.73|  0.02|
|   Tétouan|2024-05-28 20:19:32|              21|       6.69|         0.0|    0.83|   0.4|
|Ouarzazate|2024-05-28 20:19:33|              34|       4.63|         0.0|    0.07|   0.2|
|    Ifrane|2024-05-28 20:19:33|              21|       6.78|         0.0|    0.35|   1.0|
|  Laayoune|2024-05-28 20:19:33|              24|       8.23|         0.0|    0.46|   0.0|
|Casablanca|2024-05-28 20:19:48|              22|       3.09|         0.0|    0.73|  0.02|
|   Tétouan|2024-05-28 20:19:49|              21|       6.69|         0.0|    0.83|   0.4|
|Ouarzazate|2024-05-28 20:19:49|              34|       4.63|         0.0|    0.07|   0.2|

In [14]:
# Convert the Timestamp column to timestampType
df = df.withColumn("Timestamp", to_timestamp("Timestamp","yyyy-MM-dd HH:mm:ss"))

# Convert the winds (m/s) and rain_1h (mm) column to float type
df = df.withColumn("winds (m/s)", col("winds (m/s)").cast("float"))
df = df.withColumn("rain_1h (mm)", col("rain_1h (mm)").cast("float"))

In [15]:
# Print the schema of the DataFrame
df.printSchema()

root
 |-- City: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Temperature (°C): integer (nullable = true)
 |-- winds (m/s): float (nullable = true)
 |-- rain_1h (mm): float (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Clouds: double (nullable = true)



In [16]:
df_casa = df.filter(df.City == 'Casablanca')
df_tet=df.filter(df.City == 'Tétouan')
df_ifrane=df.filter(df.City == 'Ifrane')
df_oua=df.filter(df.City == 'Ouarzazate')
df_laay=df.filter(df.City == 'Laayoune')

In [17]:
df_laay.show()

+--------+-------------------+----------------+-----------+------------+--------+------+
|    City|          Timestamp|Temperature (°C)|winds (m/s)|rain_1h (mm)|Humidity|Clouds|
+--------+-------------------+----------------+-----------+------------+--------+------+
|Laayoune|2024-05-28 20:19:33|              24|       8.23|         0.0|    0.46|   0.0|
|Laayoune|2024-05-28 20:19:49|              24|       8.23|         0.0|    0.46|   0.0|
|Laayoune|2024-05-28 20:20:09|              24|       8.23|         0.0|    0.46|   0.0|
|Laayoune|2024-05-28 20:20:25|              24|       8.23|         0.0|    0.46|   0.0|
|Laayoune|2024-05-28 20:20:40|              24|       8.23|         0.0|    0.46|   0.0|
|Laayoune|2024-05-28 20:20:56|              24|       8.23|         0.0|    0.46|   0.0|
|Laayoune|2024-05-28 20:21:11|              24|       8.23|         0.0|    0.46|   0.0|
|Laayoune|2024-05-28 20:21:26|              24|       8.23|         0.0|    0.46|   0.0|
|Laayoune|2024-05-28 

# 2.2 traitement

In [18]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [19]:
# Liste des villes
cities = ["Casablanca", "Tétouan", "Ouarzazate", "Ifrane", "Laayoune"]


# Boucle à travers chaque ville
for i, city in enumerate(cities, start=1):
    print(f"Processing data for {city}...")
    
    # Filtrer le DataFrame pour la ville actuelle
    df_city = df.filter(df.City == city)
    
    # Sélection des colonnes requises
    df_city = df_city.select("Clouds", "rain_1h (mm)", "Humidity", "winds (m/s)", "Temperature (°C)")
    
    # Ajouter un index au DataFrame
    df_city = df_city.withColumn("index", monotonically_increasing_id())
    
    # Calculer le point de division pour 80% - 20%
    total_count = df_city.count()
    split_point = int(total_count * 0.8)
    
    # Diviser le DataFrame en utilisant l'index
    train_df = df_city.filter(df_city.index < split_point)
    test_df = df_city.filter(df_city.index >= split_point)
    
    # Assembler les caractéristiques
    assembler = VectorAssembler(inputCols=["Clouds", "rain_1h (mm)", "Humidity", "winds (m/s)",], outputCol="features")
    train_df = assembler.transform(train_df)
    test_df = assembler.transform(test_df)
    
    # Définir le modèle de régression linéaire
    lr = LinearRegression(featuresCol='features', labelCol='Temperature (°C)')
    
    # Ajuster le modèle
    lr_model = lr.fit(train_df)
    
    # Faire des prédictions
    predictions = lr_model.transform(test_df)
    
    # Afficher les prédictions
    predictions.show()
    
    # Sauvegarder le modèle
    model_path = f"lr_model_{i}_{city.replace(' ', '_').lower()}.model"
    lr_model.save(model_path)
    print(f"Model for {city} saved at: {model_path}")

# Arrêter la session Spark
spark.stop()

Processing data for Casablanca...
+------+------------+--------+-----------+----------------+-----+--------------------+----------+
|Clouds|rain_1h (mm)|Humidity|winds (m/s)|Temperature (°C)|index|            features|prediction|
+------+------------+--------+-----------+----------------+-----+--------------------+----------+
|   0.0|         0.0|    0.73|       3.09|              22|   68|[0.0,0.0,0.73,3.0...|      22.0|
|   0.0|         0.0|    0.73|       3.09|              22|   69|[0.0,0.0,0.73,3.0...|      22.0|
|   0.0|         0.0|    0.73|       3.09|              22|   70|[0.0,0.0,0.73,3.0...|      22.0|
|   0.0|         0.0|    0.73|       3.09|              22|   71|[0.0,0.0,0.73,3.0...|      22.0|
|   0.0|         0.0|    0.73|       3.09|              22|   72|[0.0,0.0,0.73,3.0...|      22.0|
|   0.0|         0.0|    0.73|       3.09|              22|   73|[0.0,0.0,0.73,3.0...|      22.0|
|   0.0|         0.0|    0.73|       3.09|              22|   74|[0.0,0.0,0.73,3.0..