In [0]:
import json

# silver_params = dbutils.widgets.get("silver_params")
# print(f"Params received: {silver_params}")

# output_data = json.loads(silver_params)

# today = output_data.get("today", "")
# silver_adls = output_data.get("silver_adls", "")
# gold_adls = output_data.get("gold_adls", "")
dbutils.widgets.text("silver_adls", "")
silver_adls = dbutils.widgets.get("silver_adls")

dbutils.widgets.text("gold_adls", "")
gold_adls = dbutils.widgets.get("gold_adls")

dbutils.widgets.text("today", "")
today = dbutils.widgets.get("today")

In [0]:
df = spark.read.parquet(f"{silver_adls}/meteo/*_{today}.parquet")

In [0]:
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import reverse_geocoder as rg

In [0]:
def get_country_info(lat, lon):
    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0]
        country_code = result.get('cc')
        city_name = result.get('name')
        return (country_code, city_name)
    except Exception as e:
        print(f"Erreur coordonnées : {lat}, {lon} -> {str(e)}")
        return (None, None)

In [0]:
# Enregistrement en tant que fonction Spark UDF
# Définir le schéma du struct retourné
from pyspark.sql.types import StructType, StructField, StringType
location_schema = StructType([
    StructField("country_code", StringType(), True),
    StructField("city", StringType(), True)
])

# Créer l’UDF
get_location_udf = udf(get_country_info, location_schema)

In [0]:
latitude = 48.85
longitude = 2.35
get_country_info(latitude, longitude)

('FR', 'Paris')

In [0]:
df_with_location = ( 
    df.withColumn('location', get_location_udf(F.col('latitude'), F.col('longitude')))
)

In [0]:
df_with_location = df_with_location.withColumn("country_code", F.col("location.country_code")) \
             .withColumn("city", F.col("location.city")) \
             .drop("location")

In [0]:
df_with_location.show()

+--------+---------+----------+-----+--------------+--------------------+-------------+------------+-----+
|latitude|longitude|      date|heure|temperature_2m|soil_temperature_0cm|precipitation|country_code| city|
+--------+---------+----------+-----+--------------+--------------------+-------------+------------+-----+
|   48.85|     2.35|2025-04-13|00:00|          13.7|                14.7|          0.5|          FR|Paris|
|   48.85|     2.35|2025-04-13|01:00|          13.7|                14.5|          1.2|          FR|Paris|
|   48.85|     2.35|2025-04-13|02:00|          14.1|                14.3|          0.3|          FR|Paris|
|   48.85|     2.35|2025-04-13|03:00|          13.9|                14.2|          0.0|          FR|Paris|
|   48.85|     2.35|2025-04-13|04:00|          14.0|                14.3|          0.0|          FR|Paris|
|   48.85|     2.35|2025-04-13|05:00|          13.9|                13.9|          0.0|          FR|Paris|
|   48.85|     2.35|2025-04-13|06:00|

In [0]:
df_with_location_class = (
    df_with_location.withColumn(
        "stemp_class",
    F.when(F.col("soil_temperature_0cm") <= 0, "gel")
    .when(F.col("soil_temperature_0cm") <= 10, "froid")
    .when(F.col("soil_temperature_0cm") <= 20, "modéré")
    .when(F.col("soil_temperature_0cm") <= 30, "chaud")
    .otherwise("très chaud")
    )
)

In [0]:
df_with_location_class.show()

+--------+---------+----------+-----+--------------+--------------------+-------------+------------+-----+-----------+
|latitude|longitude|      date|heure|temperature_2m|soil_temperature_0cm|precipitation|country_code| city|stemp_class|
+--------+---------+----------+-----+--------------+--------------------+-------------+------------+-----+-----------+
|   48.85|     2.35|2025-04-13|00:00|          13.7|                14.7|          0.5|          FR|Paris|     modéré|
|   48.85|     2.35|2025-04-13|01:00|          13.7|                14.5|          1.2|          FR|Paris|     modéré|
|   48.85|     2.35|2025-04-13|02:00|          14.1|                14.3|          0.3|          FR|Paris|     modéré|
|   48.85|     2.35|2025-04-13|03:00|          13.9|                14.2|          0.0|          FR|Paris|     modéré|
|   48.85|     2.35|2025-04-13|04:00|          14.0|                14.3|          0.0|          FR|Paris|     modéré|
|   48.85|     2.35|2025-04-13|05:00|          1

In [0]:
df_with_location_class.printSchema()

root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- date: date (nullable = true)
 |-- heure: string (nullable = true)
 |-- temperature_2m: double (nullable = true)
 |-- soil_temperature_0cm: double (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- country_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- stemp_class: string (nullable = false)



In [0]:
gold_output_path = f"{gold_adls}/weather_gold/{today}"

In [0]:
df_with_location_class.write.mode("overwrite").parquet(gold_output_path)