In [0]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql import functions as F, Window
from delta.tables import DeltaTable
from py_functions_silver import *
from pyspark.sql.functions import current_timestamp
from dataquality_rules_strava import *
from pyspark.sql import Row

def add_periodo_dia(df: DataFrame,
                    col_start_local: str = "start_date_local") -> DataFrame:
    """Adiciona coluna periodo_dia (manhã / tarde / noite)."""
    col_ts = F.to_timestamp(F.col(col_start_local))
    return df.withColumn(
        "periodo_dia",
        F.when(F.hour(col_ts) < 12, F.lit("manhã"))
         .when(F.hour(col_ts) < 18, F.lit("tarde"))
         .otherwise(F.lit("noite"))
    )


def add_semana_ano_mes(df: DataFrame,
                       col_start_local: str = "start_date_local") -> DataFrame:
    """Adiciona semana, ano e mes_ref (MMM/yyyy)."""
    col_ts = F.to_timestamp(F.col(col_start_local))
    return (df
        .withColumn("semana", F.weekofyear(col_ts))
        .withColumn("ano",    F.year(col_ts))
        .withColumn("mes_ref", F.date_format(col_ts, "MMM/yyyy"))
    )


def add_intensidade_calorica(df: DataFrame,
                             col_calories: str = "calories") -> DataFrame:
    """Classifica intensidade_calorica com base em calories."""
    return df.withColumn(
        "intensidade_calorica",
        F.when(F.col(col_calories) >= 600, F.lit("alta"))
         .when(F.col(col_calories) >= 300, F.lit("moderada"))
         .when(F.col(col_calories) < 300, F.lit("leve"))
         .otherwise(F.lit(None))
    )


def add_duracao_tipo(df: DataFrame,
                     col_tempo_real: str = "tempo_real") -> DataFrame:
    """
    Classifica duracao_tipo (longa/média/curta) a partir de tempo_real.
    Assumindo tempo_real como tipo time/timestamp.
    """
    minutos = (
        F.hour(F.col(col_tempo_real)) * 60 +
        F.minute(F.col(col_tempo_real))
    )

    return df.withColumn(
        "duracao_tipo",
        F.when(minutos >= 60, F.lit("longa"))
         .when(minutos >= 30, F.lit("média"))
         .otherwise(F.lit("curta"))
    )


In [0]:
from pyspark.sql import functions as F

# Tabelas de origem (silver)
ACTIVITIES_TABLE = "uc_athlete_data.silver.strava_activities"
SUB_ACTIVITY_TABLE = "uc_athlete_data.silver.strava_sub_activity"

# Tabela final de features
FINAL_TABLE = "uc_athlete_data.silver.strava_activities_features"


# Leitura streaming das duas tabelas Delta
activities_stream = (
    spark.readStream
         .table(ACTIVITIES_TABLE)
         .withWatermark("silver_ingestion_timestamp", "7 days")     # ajusta janela se precisar
         .alias("a")
)

sub_activity = (
    spark.read
         .table(SUB_ACTIVITY_TABLE)
         # .withWatermark("start_date", "7 days")  #se tiver uma coluna de tempo em sub_activity, pode pôr watermark nela também
         .alias("b")
)

# Join das duas streams
joined_stream = (
    activities_stream
      .join(
          sub_activity,
          on=[
              F.col("a.id") == F.col("b.id_sub_activity"),
              F.col("a.athlete_id") == F.col("b.athlete_id")  # ajusta nome se for athelete_id
          ],
          how="leftOuter"
      )
      # filtro da sua query
      #.filter(F.col("a.start_date") >= F.to_timestamp(F.lit("2024-12-01")))
)


In [0]:
def build_features(df: DataFrame) -> DataFrame:
    # espelha o SELECT que você mandou
    base = df.select(
        F.col("a.athlete_id").alias("athlete_id"),
        F.col("a.sport_type").alias("sport_type"),
        F.col("b.calories").alias("calories"),
        F.col("b.id_sub_activity").alias("id_sub_activity"),
        F.col("a.achievement_count").alias("achievement_count"),
        F.col("a.pr_count").alias("pr_count"),
        F.col("a.has_heartrate").alias("has_heartrate"),
        F.col("a.average_heartrate").alias("average_heartrate"),
        F.col("a.max_heartrate").alias("max_heartrate"),
        F.col("a.start_date").alias("start_date"),
        F.col("a.start_date_local").alias("start_date_local"),
        F.col("a.distance_km").alias("distance_km"),
        F.col("a.average_speed_kmh").alias("average_speed_kmh"),
        F.col("a.average_cadence").alias("average_cadence"),
        F.col("a.total_elevation_gain").alias("total_elevation_gain"),
        F.col("a.tempo_real").alias("tempo_real"),
        F.col("a.pace_strava").alias("pace_strava"),
        F.col("a.dia_semana").alias("dia_semana")
    )

    # aplica as funções de enrich
    enriched = (base
        .transform(add_periodo_dia)           # periodo_dia
        .transform(add_semana_ano_mes)        # semana, ano, mes_ref
        .transform(add_intensidade_calorica)  # intensidade_calorica
        .transform(add_duracao_tipo)          # duracao_tipo
    )

    return enriched


features_stream = build_features(joined_stream)


In [0]:
def upsert_features(microBatchDF: DataFrame, batch_id: int):
    if microBatchDF.isEmpty():
        return

    microBatchDF.createOrReplaceTempView("features_microbatch")

    merge_sql = f"""
        MERGE INTO {FINAL_TABLE} T
        USING features_microbatch S
        ON  T.athlete_id  = S.athlete_id
        AND T.start_date  = S.start_date
        AND T.id_sub_activity  = S.id_sub_activity
        
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """

    microBatchDF.sparkSession.sql(merge_sql)


In [0]:
query = (
    features_stream
        .writeStream
        .foreachBatch(upsert_features)
        .outputMode("append")  # não influencia tanto com foreachBatch, mas deixa explícito
        .option("checkpointLocation", "abfss://silver@adlsathlete.dfs.core.windows.net/strava/sub_activity_feature_checkpoint/")
        .trigger(availableNow=True)
        .start())        



In [0]:
%sql
select * from uc_athlete_data.silver.strava_activities_features

In [0]:
%sql
select * from uc_athlete_data.silver.strava_activities_features
where id_sub_activity is null

In [0]:
%sql
select athlete_id, start_date, id_sub_activity from uc_athlete_data.silver.strava_activities_features
group by athlete_id, start_date, id_sub_activity
having count(*) >1

In [0]:
%sql
select id_sub_activity from uc_athlete_data.silver.strava_sub_activity

In [0]:
%sql
drop table uc_athlete_data.silver.strava_activities_features

In [0]:
dbutils.fs.rm("abfss://silver@adlsathlete.dfs.core.windows.net/strava/sub_activity_feature_checkpoint/", recurse=True)

In [0]:
%sql
truncate table uc_athlete_data.silver.strava_activities_features

In [0]:
%sql
CREATE TABLE IF NOT EXISTS uc_athlete_data.silver.strava_activities_features (
  athlete_id BIGINT,
  sport_type STRING,
  calories DOUBLE,
  id_sub_activity BIGINT,
  achievement_count BIGINT,
  pr_count BIGINT,
  has_heartrate BOOLEAN,
  average_heartrate DOUBLE,
  max_heartrate DOUBLE,
  start_date DATE,
  start_date_local STRING,
  distance_km DOUBLE,
  average_speed_kmh DOUBLE,
  average_cadence DOUBLE,
  total_elevation_gain DOUBLE,
  tempo_real STRING,
  pace_strava STRING,
  dia_semana STRING,
  periodo_dia STRING,
  semana INT,
  ano INT,
  mes_ref STRING,
  intensidade_calorica STRING,
  duracao_tipo STRING
)
USING DELTA
PARTITIONED BY (ano, mes_ref)
TBLPROPERTIES (
  delta.autoOptimize.optimizeWrite = true,
  delta.autoOptimize.autoCompact = true
);
