In [0]:
service_credential = dbutils.secrets.get(scope="ws-scope",key="weather-sp-secret")

spark.conf.set("fs.azure.account.auth.type.jcystorage.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.jcystorage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.jcystorage.dfs.core.windows.net", "a2210f7a-e661-45c1-98f4-af64e6d2df9f")
spark.conf.set("fs.azure.account.oauth2.client.secret.jcystorage.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.jcystorage.dfs.core.windows.net", "https://login.microsoftonline.com/a3b377f7-39e7-4ca3-bf66-ea94bbdcdbf3/oauth2/token")

In [0]:
from datetime import datetime, timedelta
from pyspark.sql.functions import col, abs, sqrt, row_number, date_format
from pyspark.sql.window import Window

staging_dir = "abfss://weather-container-staging@jcystorage.dfs.core.windows.net/"
common_dir = "abfss://weather-container-common@jcystorage.dfs.core.windows.net/"

dates_to_process_df = spark.sql("SELECT processed_date FROM processed_dates WHERE process_step = 'staging'")

staging = dbutils.fs.ls(staging_dir)
DELTA = 0.02
dates_to_process = [row['processed_date'].strftime('%Y-%m-%d') for row in dates_to_process_df.select(col("processed_date")).collect()]
print(dates_to_process)

try:
    location_enrichment_df = (
        spark.table("location_enrichment")
        .select(
            col("name").alias("city"),
            col("country"),
            col("lat").cast("double").alias("lat_city"),
            col("lng").cast("double").alias("lng_city")
        )
    )
except Exception as e:
    print(f"Error obteniendo la tabla de encrichment: {e}")

for date in dates_to_process:
    try:
        weather_staging_df = (
        spark.read.format("delta").load(f"{staging_dir}data/{date}")
            .select(
                col("latitud").cast("double").alias("lat_weather"),
                col("longitud").cast("double").alias("lng_weather"),
                col("*")
            ).where(col("temp_date") == date)
        )
        enriched_df = weather_staging_df.join(
            location_enrichment_df,
            (abs(col("lat_weather") - col("lat_city")) <= DELTA) &
            (abs(col("lng_weather") - col("lng_city")) <= DELTA),
            "inner"
        )

        enriched_df_all_candidates = enriched_df.withColumn(
            "distance",
            sqrt(
                (col("lat_weather") - col("lat_city")) ** 2 +
                (col("lng_weather") - col("lng_city")) ** 2
            )
        )

        window_spec = Window.partitionBy(
            col("lat_weather"), col("lng_weather"), col("temp_date"), col("temp_hour")
        ).orderBy(col("distance").asc())

        enriched_df_best_candidates = (
            enriched_df_all_candidates
            .withColumn("rn", row_number().over(window_spec))
            .withColumn("temp_hour", date_format(col("temp_hour"), "HH"))
            .filter(col("rn") == 1)
            .drop("rn", "distance")
        )


        enriched_df_best_candidates.write.format("delta").mode("overwrite").partitionBy(
        "temp_date",
        "temp_hour",
        "country",
        "city"
        ).save(f"{common_dir}data")

    except Exception as e:
        print(f"Error procesando la fehca {date} error: {e}")




