In [0]:
%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.regression._
import org.apache.spark.ml.feature.VectorAssembler

val KAFKA_BROKER = "kafka:29092"
val INPUT_TOPIC = "weather-data"
val MODEL_PATH = "/models/weather_prediction_model"

println("✅ Configuration chargée")

In [1]:
%spark
val schema = new StructType()
  .add("city", StringType)
  .add("country", StringType)
  .add("timestamp", StringType)
  .add("temperature", DoubleType)
  .add("feels_like", DoubleType)
  .add("temp_min", DoubleType)
  .add("temp_max", DoubleType)
  .add("pressure", IntegerType)
  .add("humidity", IntegerType)
  .add("weather_main", StringType)
  .add("weather_description", StringType)
  .add("clouds", IntegerType)
  .add("wind_speed", DoubleType)
  .add("wind_deg", IntegerType)
  .add("visibility", IntegerType)
  .add("sunrise", StringType)
  .add("sunset", StringType)
  .add("latitude", DoubleType)
  .add("longitude", DoubleType)
  .add("rain_1h", DoubleType)
  .add("rain_3h", DoubleType)
  .add("snow_1h", DoubleType)
  .add("snow_3h", DoubleType)

println("✅ Schéma défini")

In [2]:
%spark
println("⏳ Connexion à Kafka...")

val dfRaw = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BROKER)
  .option("subscribe", INPUT_TOPIC)
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load()

val dfParsed = dfRaw
  .selectExpr("CAST(value AS STRING) as json_str")
  .select(from_json(col("json_str"), schema).alias("data"))
  .select("data.*")

println("✅ Stream Kafka connecté")

In [3]:
%spark
val dfFeatures = dfParsed
  .withColumn("data_timestamp", to_timestamp(col("timestamp")))

val dfWithTime = dfFeatures
  .withColumn("ts_hour", hour(col("data_timestamp")))
  .withColumn("ts_day_of_week", dayofweek(col("data_timestamp")))
  .withColumn("ts_month", month(col("data_timestamp")))

val dfEngineered = dfWithTime
  .withColumn("is_weekend", 
    when(col("ts_day_of_week") === 1 || col("ts_day_of_week") === 7, 1.0).otherwise(0.0))
  .withColumn("is_peak_hour",
    when((col("ts_hour") >= 7 && col("ts_hour") < 10) || 
         (col("ts_hour") >= 16 && col("ts_hour") < 19), 1.0).otherwise(0.0))

val dfFinal = dfEngineered
  .withColumn("is_morning", 
    when(col("ts_hour") >= 6 && col("ts_hour") < 12, 1.0).otherwise(0.0))
  .withColumn("is_afternoon", 
    when(col("ts_hour") >= 12 && col("ts_hour") < 18, 1.0).otherwise(0.0))
  .withColumn("is_evening", 
    when(col("ts_hour") >= 18, 1.0).otherwise(0.0))

println("✅ Features créées")


In [4]:
%spark
val featureCols = Array(
  "humidity",
  "pressure", 
  "wind_speed",
  "clouds",
  "visibility",
  "ts_hour",
  "is_weekend",
  "is_peak_hour",
  "is_morning",
  "is_afternoon",
  "is_evening"
)

val assembler = new VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features")

val dfAssembled = assembler.transform(dfFinal)

println("✅ Features assemblées")


In [5]:
%spark
import org.apache.spark.ml.regression.GBTRegressionModel

println("📂 Chargement du modèle GBT...")

val model = GBTRegressionModel.load(MODEL_PATH)

println("✅ Modèle GBT chargé avec succès")

val predictions = model.transform(dfAssembled)

val outputDf = predictions.select(
  col("city"),
  col("timestamp"),
  col("temperature").alias("temp_actuelle"),
  col("humidity"),
  col("wind_speed"),
  col("weather_main"),
  col("prediction").alias("temp_predite"),
  round(abs(col("temperature") - col("prediction")), 2).alias("erreur")
)

println("✅ Pipeline de prédiction créé")


In [6]:
%spark
println("🚀 Démarrage du streaming...")

val query = outputDf.writeStream
  .outputMode("append")
  .format("console")
  .option("truncate", false)
  .option("numRows", 10)
  .option("checkpointLocation",
  s"/tmp/zeppelin/weather_checkpoint")
  .start()

println("✅ Streaming démarré - Les prédictions apparaîtront ci-dessous")

In [7]:
%spark
// Pour voir les infos du stream
query.status

// Pour arrêter le stream après 60 secondes
 Thread.sleep(60000)
 query.stop()


In [8]:
%spark
