# Correction Naolib

In [None]:
### Configuration Mac : utilisation de notebook Jupyter

from pyspark import SparkContext, SparkConf

conf = SparkConf() \
    .setAppName('Naolib') \
    .setMaster('spark://spark:7077') \
    .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .set("spark.sql.shuffle.partitions", "10")


sc = SparkContext.getOrCreate(conf=conf)

from pyspark.sql import SQLContext
# Créer un SQLContext pour les opérations SQL
sql_context = SQLContext(sc)

minio_ip_address = "minio"

In [None]:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", f"http://{minio_ip_address}:9000")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "root")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "password")
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")

from minio import Minio
client_minio = Minio(
    f"{minio_ip_address}:9000",
    access_key="root",
    secret_key="password",
    secure=False
)

# Création du bucket tp6
if client_minio.bucket_exists("tp6") == False:
    client_minio.make_bucket("tp6")


## Batch

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lag, regexp_extract, lit, unix_timestamp
from pyspark.sql.window import Window
import requests
import json

# URL de l'API
API_URL = "https://open.tan.fr/ewp/tempsattentelieu.json/CTRE/20"

# Fonction pour récupérer les données depuis l'API
def fetch_api_data():
    try:
        response = requests.get(API_URL)
        if response.status_code == 200:
            return response.json()
        else:
            print(f"Erreur HTTP : {response.status_code}")
            return []
    except Exception as e:
        print(f"Erreur lors de l'accès à l'API : {e}")
        return []

# Récupérer les données depuis l'API
data = fetch_api_data()

if data:
    # Charger les données JSON dans un DataFrame PySpark
    df = sql_context.read.json(sc.parallelize([json.dumps(data)]))

    # Extraire les colonnes importantes et nettoyer les données
    df_cleaned = df \
        .withColumn("numLigne", col("ligne.numLigne")) \
        .withColumn("terminus", col("terminus")) \
        .withColumn("codeArret", col("arret.codeArret")) \
        .withColumn("tempsMinutes", 
                    when(col("temps") == "proche", lit(0))
                    .otherwise(regexp_extract(col("temps"), r"(\d+)", 1).cast("int"))) \
        .filter(col("tempsMinutes").isNotNull()) \
        .select("numLigne", "terminus", "codeArret", "tempsMinutes")

    # Ajouter une colonne avec les temps précédents pour calculer le délai
    window_spec = Window.partitionBy("numLigne", "codeArret").orderBy("tempsMinutes") # demander explications à Guillaume
    df_delai = df_cleaned \
        .withColumn("tempsPrecedent", lag("tempsMinutes").over(window_spec)) \
        .withColumn("delaiEntreBus", col("tempsMinutes") - col("tempsPrecedent"))

    # Afficher les résultats
    df_delai.select("numLigne", "terminus", "codeArret", "tempsMinutes", "tempsPrecedent", "delaiEntreBus").show()

else:
    print("Aucune donnée disponible depuis l'API.")



## Streaming 

Dans ce cas, nous synchronisons les résultats des requêtes dans le bucket Minio disponible à l'adresse suivante : [tp6](http://localhost:19001/tp6), dans le dossier `naolib`.
Pour cela, il est nécessaire de lancer le notebook [naolib_correction_insert.ipynb](naolib_correction_insert.ipynb) en parallèle pour insérer les données.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_extract, lit, window, min, max, current_timestamp
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
    StructField("sens", StringType()),
    StructField("terminus", StringType()),
    StructField("infotrafic", StringType()),
    StructField("temps", StringType()),
    StructField("dernierDepart", StringType()),
    StructField("tempsReel", StringType()),
    StructField("ligne", StructType([
        StructField("numLigne", StringType()),
        StructField("typeLigne", StringType())
    ])),
    StructField("arret", StructType([
        StructField("codeArret", StringType())
    ]))
])

# Read streaming data from MinIO
input_path = "s3a://tp6/naolib"
df = sql_context.readStream.schema(schema).json(input_path)

# Add a timestamp column and clean data
df_cleaned = df.withColumn("numLigne", col("ligne.numLigne")) \
    .withColumn("codeArret", col("arret.codeArret")) \
    .withColumn("tempsMinutes", when(col("temps") == "proche", lit(0))
                .otherwise(regexp_extract(col("temps"), r"(\d+)", 1).cast("int"))) \
    .withColumn("event_time", current_timestamp())  # Add ingestion time as timestamp

# Add watermark and calculate delays
df_with_watermark = df_cleaned.withWatermark("event_time", "10 minutes")

windowed_df = df_with_watermark.groupBy(
    window(col("event_time"), "10 minutes"),
    col("numLigne"),
    col("codeArret")
).agg(
    min("tempsMinutes").alias("tempsPrecedent"),
    max("tempsMinutes").alias("tempsCurrent")
).withColumn(
    "delaiEntreBus", col("tempsCurrent") - col("tempsPrecedent")
)

# Write results to the console
query = windowed_df.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
query.stop()