In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.elasticsearch:elasticsearch-spark-30_2.12:8.11.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, when, date_format, hour
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType
from pyspark.sql.functions import split

# Créer une session Spark
spark = SparkSession.builder \
    .appName("IoT Kafka") \
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,"
            "org.elasticsearch:elasticsearch-spark-30_2.12:8.12.2") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# ------------------ Schéma JSON ------------------
schema = StructType([
    StructField("machine_id", StringType()),
    StructField("temperature", DoubleType()),
    StructField("humidity", DoubleType()),
    StructField("soil_moisture", DoubleType()),
    StructField("vibration", DoubleType()),
    StructField("pressure", DoubleType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("timestamp", StringType())
])

# ------------------ Lecture Kafka ------------------
raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "iot_raw_data") \
    .option("startingOffsets", "latest") \
    .load()

# ------------------ Parsing JSON ------------------
parsed_df = raw_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", to_timestamp("timestamp"))

# ------------------ Colonnes enrichies ------------------
df_enriched = parsed_df \
    .withColumn("day_of_week", date_format("timestamp", "EEEE")) \
    .withColumn("hour", hour("timestamp")) \
    .withColumn("region", when(col("machine_id").startswith("DK"), "DK")
                          .when(col("machine_id").startswith("TH"), "TH")
                          .when(col("machine_id").startswith("TB"), "TB")
                          .when(col("machine_id").startswith("MT"), "MT")
                          .otherwise("Unknown")) \
    .withColumn("period", when(col("hour").between(6, 11), "Matinée")
                          .when(col("hour").between(12, 14), "Après-midi")
                          .when(col("hour").between(15, 18), "Soirée")
                          .otherwise("Nuit")) \
    .withColumn("season", when((col("day_of_week").isin("Lundi", "Mardi", "Mercredi")) & (col("hour").between(6, 18)), "Saison_Seche_Fraiche")
                          .when((col("day_of_week").isin("Jeudi", "Vendredi")) & (col("hour").between(6, 18)), "Saison_Seche_Chaude")
                          .otherwise("Hivernage"))

# ------------------ Seuils + UDF ------------------
def get_thresholds(region, season):
    thresholds = {
        "DK": {
            "Saison_Seche_Fraiche": {
                "temperature": [15, 35, 10, 45],
                "humidity": [30, 80, 20, 90],
                "soil_moisture": [20, 70, 10, 80],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Saison_Seche_Chaude": {
                "temperature": [20, 45, 15, 48],
                "humidity": [25, 75, 15, 85],
                "soil_moisture": [15, 65, 5, 75],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Hivernage": {
                "temperature": [22, 37, 18, 42],
                "humidity": [45, 90, 35, 98],
                "soil_moisture": [40, 90, 30, 98],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            }
        },
        "TH": {
            "Saison_Seche_Fraiche": {
                "temperature": [15, 45, 10, 47],
                "humidity": [25, 75, 15, 85],
                "soil_moisture": [20, 70, 10, 80],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Saison_Seche_Chaude": {
                "temperature": [20, 45, 10, 48],
                "humidity": [20, 70, 10, 80],
                "soil_moisture": [15, 65, 5, 75],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Hivernage": {
                "temperature": [20, 34, 15, 38],
                "humidity": [40, 85, 30, 95],
                "soil_moisture": [35, 85, 25, 95],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            }
        },
        "TB": {
            "Saison_Seche_Fraiche": {
                "temperature": [16, 30, 12, 35],
                "humidity": [22, 72, 12, 82],
                "soil_moisture": [18, 68, 8, 78],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Saison_Seche_Chaude": {
                "temperature": [20, 47, 15, 49],
                "humidity": [18, 68, 8, 78],
                "soil_moisture": [12, 62, 5, 72],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Hivernage": {
                "temperature": [18, 32, 13, 37],
                "humidity": [35, 80, 25, 90],
                "soil_moisture": [30, 80, 20, 90],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            }
        },
        "MT": {
            "Saison_Seche_Fraiche": {
                "temperature": [10, 28, 10, 45],
                "humidity": [18, 68, 8, 78],
                "soil_moisture": [15, 65, 5, 75],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Saison_Seche_Chaude": {
                "temperature": [18, 45, 13, 48],
                "humidity": [15, 65, 5, 75],
                "soil_moisture": [10, 60, 5, 70],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Hivernage": {
                "temperature": [16, 30, 12, 35],
                "humidity": [30, 75, 20, 85],
                "soil_moisture": [25, 75, 15, 85],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            }
        }
    }
    return thresholds.get(region, {}).get(season, {})


@udf(returnType=StringType())
def determine_sensor_status(region, season, temperature, humidity, soil_moisture, vibration, pressure):
    thresholds = get_thresholds(region, season)
    if not thresholds:
        return "Inconnu"
    if (temperature < thresholds["temperature"][2] or temperature > thresholds["temperature"][3] or
        humidity < thresholds["humidity"][2] or humidity > thresholds["humidity"][3] or
        soil_moisture < thresholds["soil_moisture"][2] or soil_moisture > thresholds["soil_moisture"][3] or
        vibration < thresholds["vibration"][2] or vibration > thresholds["vibration"][3] or
        pressure < thresholds["pressure"][2] or pressure > thresholds["pressure"][3]):
        return "Critique"
    elif (temperature < thresholds["temperature"][0] or temperature > thresholds["temperature"][1] or
          humidity < thresholds["humidity"][0] or humidity > thresholds["humidity"][1] or
          soil_moisture < thresholds["soil_moisture"][0] or soil_moisture > thresholds["soil_moisture"][1] or
          vibration < thresholds["vibration"][0] or vibration > thresholds["vibration"][1] or
          pressure < thresholds["pressure"][0] or pressure > thresholds["pressure"][1]):
        return "Alerte"
    else:
        return "Normal"

# ------------------ Ajout du statut ------------------
df_with_status = df_enriched.withColumn(
    "status",
    determine_sensor_status(
        col("region"),
        col("season"),
        col("temperature"),
        col("humidity"),
        col("soil_moisture"),
        col("vibration"),
        col("pressure")
    )
)
# ------------------ Ajout de la localisation ------------------
df_with_geo_point = df_with_status.withColumn(
    "location",
    struct(col("longitude"), col("latitude")) 
)

# ------------------ Écriture vers Elasticsearch ------------------
df_with_geo_point.writeStream \
    .outputMode("append") \
    .format("org.elasticsearch.spark.sql") \
    .option("checkpointLocation", "/tmp/checkpoint_iot") \
    .option("es.nodes", "es01") \
    .option("es.port", "9200") \
    .option("es.nodes.wan.only", "true") \
    .option("es.net.ssl", "true") \
    .option("es.net.ssl.cert.allow.self.signed", "true") \
    .option("es.net.http.auth.user", "elastic") \
    .option("es.net.http.auth.pass", "Eselpil2") \
    .option("es.resource", "agritech-iot-data/") \
    .start() \
    .awaitTermination()


In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.elasticsearch:elasticsearch-spark-30_2.12:8.11.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, when, date_format, hour
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType
from pyspark.sql.functions import split

# Créer une session Spark
spark = SparkSession.builder \
    .appName("IoT Kafka") \
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,"
            "org.elasticsearch:elasticsearch-spark-30_2.12:8.12.2") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# ------------------ Schéma JSON ------------------
schema = StructType([
    StructField("machine_id", StringType()),
    StructField("region", StringType()),
    StructField("season", StringType()),
    StructField("temperature", DoubleType()),
    StructField("humidity", DoubleType()),
    StructField("soil_moisture", DoubleType()),
    StructField("vibration", DoubleType()),
    StructField("pressure", DoubleType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("timestamp", StringType())
])

# ------------------ Lecture Kafka ------------------
raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "iot_raw_data") \
    .option("startingOffsets", "latest") \
    .load()

# ------------------ Parsing JSON ------------------
parsed_df = raw_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", to_timestamp("timestamp"))

# ------------------ Colonnes enrichies ------------------
# Définir les seuils pour chaque mesure selon les régions et les saisons
def get_thresholds(region, season):
    thresholds = {
        # Structure: {region: {season: {parameter: [warning_min, warning_max, critical_min, critical_max]}}}
        "DK": {
            "Saison_Seche_Fraiche": {
                "temperature": [15, 35, 10, 45],
                "humidity": [30, 80, 20, 90],
                "soil_moisture": [20, 70, 10, 80],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Saison_Seche_Chaude": {
                "temperature": [20, 45, 15, 48],
                "humidity": [25, 75, 15, 85],
                "soil_moisture": [15, 65, 5, 75],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Hivernage": {
                "temperature": [22, 37, 18, 42],
                "humidity": [45, 90, 35, 98],
                "soil_moisture": [40, 90, 30, 98],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            }
        },

        "TH": {
            "Saison_Seche_Fraiche": {
                "temperature": [15, 45, 10, 47],
                "humidity": [25, 75, 15, 85],
                "soil_moisture": [20, 70, 10, 80],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Saison_Seche_Chaude": {
                "temperature": [20, 45, 10, 48],
                "humidity": [20, 70, 10, 80],
                "soil_moisture": [15, 65, 5, 75],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Hivernage": {
                "temperature": [20, 34, 15, 38],
                "humidity": [40, 85, 30, 95],
                "soil_moisture": [35, 85, 25, 95],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            }
        },
        "TB": {
            "Saison_Seche_Fraiche": {
                "temperature": [16, 30, 12, 35],
                "humidity": [22, 72, 12, 82],
                "soil_moisture": [18, 68, 8, 78],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Saison_Seche_Chaude": {
                "temperature": [20, 47, 15, 49],
                "humidity": [18, 68, 8, 78],
                "soil_moisture": [12, 62, 5, 72],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Hivernage": {
                "temperature": [18, 32, 13, 37],
                "humidity": [35, 80, 25, 90],
                "soil_moisture": [30, 80, 20, 90],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            }
        },
        "MT": {
            "Saison_Seche_Fraiche": {
                "temperature": [10, 28, 10, 45],
                "humidity": [18, 68, 8, 78],
                "soil_moisture": [15, 65, 5, 75],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Saison_Seche_Chaude": {
                "temperature": [18, 45, 13, 48],
                "humidity": [15, 65, 5, 75],
                "soil_moisture": [10, 60, 5, 70],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            },
            "Hivernage": {
                "temperature": [16, 30, 12, 35],
                "humidity": [30, 75, 20, 85],
                "soil_moisture": [25, 75, 15, 85],
                "vibration": [0.2, 3.5, 0.1, 4.5],
                "pressure": [0.7, 2.2, 0.5, 2.5]
            }
        }
    }
    
    return thresholds[region][season]

#déterminer le statut d'un capteur
@udf(returnType=StringType())
def determine_sensor_status(region, season, temperature, humidity, soil_moisture, vibration, pressure):
    thresholds = get_thresholds(region, season)
    
    # Vérifier si une des mesures est hors des limites critiques
    if (temperature < thresholds["temperature"][2] or temperature > thresholds["temperature"][3] or
        humidity < thresholds["humidity"][2] or humidity > thresholds["humidity"][3] or
        soil_moisture < thresholds["soil_moisture"][2] or soil_moisture > thresholds["soil_moisture"][3] or
        vibration < thresholds["vibration"][2] or vibration > thresholds["vibration"][3] or
        pressure < thresholds["pressure"][2] or pressure > thresholds["pressure"][3]):
        return "Critique"
    
    # Vérifier si une des mesures est hors des limites d'alerte
    elif (temperature < thresholds["temperature"][0] or temperature > thresholds["temperature"][1] or
          humidity < thresholds["humidity"][0] or humidity > thresholds["humidity"][1] or
          soil_moisture < thresholds["soil_moisture"][0] or soil_moisture > thresholds["soil_moisture"][1] or
          vibration < thresholds["vibration"][0] or vibration > thresholds["vibration"][1] or
          pressure < thresholds["pressure"][0] or pressure > thresholds["pressure"][1]):
        return "Alerte"
    
    # Si toutes les mesures sont dans les limites normales
    else:
        return "Normal"

# Ajouter le champ de statut
df_with_status = parsed_df.withColumn(
    "status",
    determine_sensor_status(
        col("region"),
        col("season"),
        col("temperature"),
        col("humidity"),
        col("soil_moisture"),
        col("vibration"),
        col("pressure")
    )
)
# ------------------ Ajout de la localisation ------------------
df_with_geo_point = df_with_status.withColumn(
    "location",
    struct(col("longitude"), col("latitude")) 
)

# ------------------ Écriture vers Elasticsearch ------------------
df_with_geo_point.writeStream \
    .outputMode("append") \
    .format("org.elasticsearch.spark.sql") \
    .option("checkpointLocation", "/tmp/checkpoint_iot") \
    .option("es.nodes", "es01") \
    .option("es.port", "9200") \
    .option("es.nodes.wan.only", "true") \
    .option("es.net.ssl", "true") \
    .option("es.net.ssl.cert.allow.self.signed", "true") \
    .option("es.net.http.auth.user", "elastic") \
    .option("es.net.http.auth.pass", "Eselpil2") \
    .option("es.resource", "agritech-iot-data/") \
    .start() \
    .awaitTermination()
