In [None]:
from core.spark_utils import create_spark_session
from core.s3.s3_utils import S3Service
from core.s3.settings import S3Settings
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, sum, when, col, signum, dense_rank
from pyspark.sql.types import DoubleType
from pyspark.sql import DataFrame as DF
import plotly.express as px
import pandas as pd
import plotly.graph_objects as go

settings = S3Settings()

spark = create_spark_session(
    settings.S3_KEY,
    settings.S3_SECRET
)

s3 = S3Service()

SCALE_SOC = {
    'tesla-fleet-telemetry': 1,
    'mercedes-benz': 100,
    'volvo-cars': 100,
    'kia': 100,
    'renault': 100,
    'ford': 100,
    'stellantis': 100,
    'bmw': 1,
    'volkswagen': 1,
}

In [None]:
def _reassign_short_phases(df, min_duration_minutes=3):
    """
    Recalculates phase_id by merging phases shorter than `min_duration_minutes`
    with the previous valid phase.

    Args:
        df (DataFrame): Spark DataFrame with columns `phase_id`, `date`, `total_phase_time`
        min_duration_minutes (float): Minimum duration to keep a phase (in minutes)

    Returns:
        DataFrame: DataFrame with updated `phase_id` column
    """

    df = df.withColumn(
        "is_valid_phase",
        F.when(F.col("total_phase_time") >= min_duration_minutes, 1).otherwise(0),
    )

    w_time = (
        Window.partitionBy("vin")
        .orderBy("date")
        .rowsBetween(Window.unboundedPreceding, 0)
    )

    df = df.withColumn(
        "last_valid_phase_id",
        F.last(
            F.when(F.col("is_valid_phase") == 1, F.col("phase_id")),
            ignorenulls=True,
        ).over(w_time),
    )

    df = df.withColumn(
        "phase_id_updated",
        F.when(F.col("is_valid_phase") == 1, F.col("phase_id")).otherwise(
            F.col("last_valid_phase_id")
        ),
    )

    df = df.withColumn(
        "phase_id_final",
        F.dense_rank().over(Window.partitionBy("vin").orderBy("phase_id_updated"))
        - 1,
    )

    df = df.drop(
        "phase_id", "last_valid_phase_id", "is_valid_phase", "phase_id_updated"
    )
    df = df.withColumnRenamed("phase_id_final", "phase_id")

    return df

In [None]:
def compute_charge_idx(
    tss: DF, make, total_soc_diff_threshold: float = 0.5, phase_delimiter_mn: int = 45
) -> DF:

    w = Window.partitionBy("vin").orderBy("date") # Fenetre partitionnée par vin et triée par date

    # Mettre tous les SOCs de 0 à 100 %
    tss = tss.withColumn("soc", F.col("soc") * SCALE_SOC[make])
    
    # Supprimer les lignes avec des valeurs nulles pour la colonne "soc", ne pas faire de forward fill pour pouvoir distinguer de vrais idles
    tss = tss.na.drop(subset=["soc"]) 

    # Calculer la soc_diff en chaque point
    tss = tss.withColumn(
        "soc_diff",
        F.col("soc")- F.lag("soc").over(w),
    )

    # Calculer le temps entre deux points
    df = tss.withColumn("prev_date", lag("date").over(w)) \
        .withColumn(
        "time_gap_minutes",
        (F.unix_timestamp("date") - F.unix_timestamp("prev_date")) / 60,
    )
    
    # Calculer naïvement la direction de la charge
    df = df.withColumn(
        "direction_raw",
        F.when(col("soc_diff").isNull(), None).otherwise(F.signum("soc_diff")),
    )

    # Forward fill la direction
    df = df.withColumn(
        "direction",
        F.last("direction_raw", ignorenulls=True).over(
            w.partitionBy("vin")
            .orderBy("date")
            .rowsBetween(Window.unboundedPreceding, 0)
        ),
    )

    # Calculer les moments où la direction change
    df = df.withColumn(
        "direction_change",
        F.when(F.col("direction") != F.lag("direction").over(w), 1).otherwise(0),
    )

    # A cette étape on a bêtement sans intelligence toutes les phases où on passe d'un statut idle, charging, discharging
    # Quelque soit les points de soc gagnés ou perdus et le temps passé dans la phse
    df = df.withColumn(
        "phase_id",
        F.sum("direction_change").over(w.rowsBetween(Window.unboundedPreceding, 0)),
    )

    w_phase = Window.partitionBy("vin", "phase_id")


    # Pour Tesla 
    # Temps total de la  phase utile, pour Tesla
    # df = df.withColumn("total_phase_time", F.sum("time_gap_minutes").over(w_phase))

    #  df = _reassign_short_phases(
    #     df
    # )  # Reassign short phases to previous valid phase (especiallyuseful for tesla-fleet-telemetry noise)


    # Total de soc gagné ou perdu dans la phase bête et méchante
    df = df.withColumn("total_soc_diff_phase", F.sum("soc_diff").over(w_phase))

    # Direction de la phase précédente et suivante
    df = df.withColumn("prev_phase", F.lag("direction").over(w)).withColumn(
        "next_phase", F.lead("direction").over(w)
    )

    # Coeur du réacteur : permettant de bien juger du statut, une phase pour être considérée comme charging ou discharging, doit amener un gain ou perte de soc minimum
    # Si la phase dure un point et que la précédente et la suivante sont de même nature, on les réassigne sinon c'est un idle
    df = df.withColumn(
        "charging_status",
        F.when((F.col("total_soc_diff_phase") > total_soc_diff_threshold), "charging")
        .when((F.col("total_soc_diff_phase") < -total_soc_diff_threshold), "discharging")
        .when(
            (F.col("prev_phase") == F.col("next_phase"))
            & (F.col("prev_phase") > 0),
            "charging",
        )
        .when(
            (F.col("prev_phase") == F.col("next_phase"))
            & (F.col("prev_phase") < 0),
            "discharging",
        )
        .otherwise("idle"),
    )

    # Là on vient faire en sorte que la phase précédente commence au bon endroit
    df = df.withColumn(
        "next_status",
        F.lead("charging_status").over(w)
    )
    

    # Set the phase before a charging/discharging phase as the phase to get the correct SOC diff
    df = df.withColumn(
        "charging_status",
        F.when(F.col("next_status") == 'charging', "charging")
        .when(F.col("next_status")== "discharging", "discharging")
        .otherwise(col('charging_status'))
    )

    # Drop les idles car on a pas d'intérêt à les identifier et ça va permettre de bien découper les numéros de phases
    # Mais important de les avoir identifier pour ne pas prolonger des phases de charge ou décharge inutilement comme le temps est important
    df = df.withColumn(
        "charging_status",
        F.when(F.col("charging_status") == "idle", None).otherwise(F.col("charging_status"))
    )

    df = df.na.drop(subset=["charging_status"])

    df = df.withColumn(
        "charging_status_change",
        F.when(
            F.col("charging_status") != F.lag("charging_status").over(w), 1
        ).otherwise(0),
    )
    
    # Séparer les phases successives qui sont identiques
    if make in ('bmw', 'tesla-fleet-telemetry', 'renault'):
        df = df.withColumn("prev_date", lag("date").over(w))
        df = df.withColumn(
            "time_gap_minutes",
            (F.unix_timestamp("date") - F.unix_timestamp("prev_date")) / 60,
        )

    # Ajouter la colonne corrigée : pertinent quand la fréquence est suffisante
        df = df.withColumn(
            "charging_status_change",
            F.when(
                (F.col("charging_status_change") == 0) &
                (F.col("charging_status") == 'discharging') & # on peut le retirer si les fréquences de données en charges sont suffisamment élevées
                (F.col("time_gap_minutes") > phase_delimiter_mn),
                F.lit(1)
            ).otherwise(F.col("charging_status_change"))
        )


    df = df.withColumn(
        "charging_status_idx",
        F.sum("charging_status_change").over(
            w.rowsBetween(Window.unboundedPreceding, 0)
        ),
    )
    
    return df


def add_phase(df: DF) -> DF:
    w_phase_static = Window.partitionBy("vin").orderBy("first_date")

    # 1. Agrégation par phase
    phase_stats = (
        df
        .orderBy("date")
        .groupBy("vin", "charging_status_idx")
        .agg(
            F.first("soc", ignorenulls=True).alias("first_soc"),
            F.last("soc", ignorenulls=True).alias("last_soc"),
            F.min("date").alias("first_date"),
            F.max("date").alias("last_date"),
            F.count("date").alias("count_points"),
            F.first("charging_status").alias("charging_status")
        )
        .withColumn("next_dt_begin", F.lead("first_date").over(w_phase_static))
        .withColumn("next_first_soc", F.lead("first_soc").over(w_phase_static))
    )

    # 2. Appliquer la logique d'ajustement
    condition = (
        ((F.col("last_soc") < F.col("next_first_soc")) & (F.col("charging_status") == "charging")) |
        ((F.col("last_soc") > F.col("next_first_soc")) & (F.col("charging_status") == "discharging"))
    )

    phase_stats = phase_stats.withColumn(
        "adjusted_last_date",
        F.when(condition, F.col("next_dt_begin")).otherwise(F.col("last_date"))
    ).withColumn(
        "adjusted_last_soc",
        F.when(condition, F.col("next_first_soc")).otherwise(F.col("last_soc"))
    ).withColumn(
        "is_usable_phase",
        F.when(F.col("count_points") > 1, F.lit(1)).otherwise(F.lit(0))
    )

    phase_stats = phase_stats.drop('last_soc', 'last_date')

    # # 4. Renommer pour clarté
    phase_stats = phase_stats.withColumnRenamed("adjusted_last_soc", "last_soc")
    phase_stats = phase_stats.withColumnRenamed("adjusted_last_date", "last_date")

    phase_stats = phase_stats.withColumn('total_soc_diff', col('last_soc') - col('first_soc'))
    phase_stats = phase_stats.withColumn('total_phase_time_minutes', 
    (F.unix_timestamp(col('last_date')) - F.unix_timestamp(col('first_date'))) / 60
    )

    # 3. Joindre les stats sur le DataFrame de base
    df = df.join(
        phase_stats.select(
            "vin", "charging_status_idx",
            "first_soc", "last_soc",
            "first_date", "last_date",
            "count_points", "total_soc_diff", 'total_phase_time_minutes',
            "is_usable_phase"
        ),
        on=["vin", "charging_status_idx"],
        how="left"
    )




    return df, phase_stats

In [None]:


def generate_df(make: str, vin: str, col_soc: str) -> None:
    rss = s3.read_parquet_df_spark(spark, f'raw_ts/{make}/time_series/raw_ts_spark.parquet')
    rss_by_vin = rss.filter(rss['vin'] == vin)
    rss_by_vin = rss_by_vin.withColumnRenamed(col_soc, "soc")
    rss_by_vin_with_idx = compute_charge_idx(rss_by_vin, make)
    rss_with_phase, phase_stats = add_phase(rss_by_vin_with_idx)

    return rss_with_phase, phase_stats


def plot_df(df: DF) -> None:
    pdf = df.select("date", "soc", "charging_status", 'charging_status_idx', 'soc_diff', 'first_date', 'last_date', 'first_soc', 'last_soc', 'total_soc_diff', 'is_usable_phase').toPandas()


    # Mapping de couleurs
    status_color_map = {
        "charging": "green",
        "discharging": "red",
        "idle": "gray"
    }
    pdf["color"] = pdf["charging_status"].map(status_color_map)

    # Tracé
    fig = go.Figure()

    # 1. Ligne continue noire
    fig.add_trace(go.Scatter(
        x=pdf["date"],
        y=pdf["soc"],
        mode="lines",
        line=dict(color="black"),
        name="SoC (ligne)",
        hovertemplate="<b>Date:</b> %{x}<br><b>SoC:</b> %{y:.1f}%<extra></extra>"
    ))

    # 2. Points colorés avec toutes les informations
    fig.add_trace(go.Scatter(
        x=pdf["date"],
        y=pdf["soc"],
        mode="markers",
        marker=dict(color=pdf["color"], size=6),
        name="Status (points)",
        showlegend=False,
        hovertemplate="<b>Date:</b> %{x}<br><b>SoC:</b> %{y:.1f}%<br><b>Status:</b> %{customdata[0]}<br><b>Index:</b> %{customdata[1]}<br><b>Soc Diff:</b> %{customdata[2]:.2f}<br><b>First Date:</b> %{customdata[3]}<br><b>Last Date:</b> %{customdata[4]}<br><b>First Soc:</b> %{customdata[5]:.2f}<br><b>Last Soc:</b> %{customdata[6]:.2f}<br><b>Total Soc Diff:</b> %{customdata[7]:.2f}<br><b>Is Usable Phase:</b> %{customdata[8]}<extra></extra>",
        customdata=list(zip(pdf["charging_status"], pdf["charging_status_idx"], pdf["soc_diff"], pdf['first_date'], pdf['last_date'], pdf['first_soc'], pdf['last_soc'], pdf['total_soc_diff'], pdf['is_usable_phase']))
    ))

    # Mise en forme
    fig.update_layout(
        title="Évolution du SoC avec statut de charge",
        xaxis_title="Date",
        yaxis_title="SoC (%)",
        hovermode="x unified"
    )

    fig.show()
    



In [None]:
make = 'mercedes-benz'
vin = 'W1N2437011J000869'
col_soc = 'battery_level'

df, phase_df = generate_df(make, vin, col_soc)

In [None]:
plot_df(df)

In [None]:

from transform.processed_tss.config import ODOMETER_MILES_TO_KM
from transform.fleet_info.main import fleet_info

phase_df_processed = phase_df.select(
    F.col("vin").alias("VIN_ph"),
    F.col("first_date").alias("DATETIME_BEGIN"),
    F.col("last_date").alias("DATETIME_END"),
    F.col("total_phase_time_minutes").alias("PHASE_TIME_MINUTES"),
    F.col("charging_status_idx").alias("PHASE_INDEX"),
    F.col("charging_status").alias("PHASE_STATUS"),
    F.col("first_soc").alias("SOC_FIRST"),
    F.col("last_soc").alias("SOC_LAST"),
    F.col("total_soc_diff").alias("SOC_DIFF"),
    F.col("count_points").alias("NO_SOC_DATAPOINT"),
    F.col("is_usable_phase").alias("IS_USABLE_PHASE")
)

dynamic_config = s3.read_yaml_file(f"config/{make}.yaml")

if dynamic_config is None:
    raise ValueError(f"Config file config/{make}.yaml not found")

    
rss = s3.read_parquet_df_spark(spark, f'raw_ts/{make}/time_series/raw_ts_spark.parquet')
rss_by_vin = rss.filter(rss['vin'] == vin)
tss = rss.withColumnsRenamed(dynamic_config['raw_tss_to_processed_tss']['rename'])
tss = tss.select(*dynamic_config['raw_tss_to_processed_tss']['keep'])  # Reduce column volumetry


def _normalize_units_to_metric(tss, make):
    tss = tss.withColumn("odometer", col("odometer") * ODOMETER_MILES_TO_KM.get(make, 1))
    tss = tss.withColumn("soc", F.col("soc") * SCALE_SOC[make])
    return tss


tss = _normalize_units_to_metric(tss, make)

# Alias pour plus de clarté
ph = phase_df_processed.alias("ph")
ts = tss.alias("ts")

# Join condition
join_condition = (
    (ph["VIN_ph"] == ts["vin"]) &
    (ts["date"] >= ph["datetime_begin"]) &
    (ts["date"] <= ph["datetime_end"])
)

# Join explicite
tss_phased = ph.join(ts, on=join_condition, how="left")


tss_phased = tss_phased.join(spark.createDataFrame(fleet_info), "vin", "left").drop("vin").withColumnRenamed("VIN_ph", "VIN")


tss_phased.show()


In [None]:


agg_columns = [
    F.first("make", ignorenulls=True).alias("MAKE"),
    F.first("model", ignorenulls=True).alias("MODEL"),
    F.first("version", ignorenulls=True).alias("VERSION"),
    F.first("net_capacity", ignorenulls=True).alias("BATTERY_NET_CAPACITY"),
    F.first("odometer", ignorenulls=True).alias("ODOMETER_FIRST"),
    F.last("odometer", ignorenulls=True).alias("ODOMETER_LAST")
]

if "consumption" in tss_phased.columns:
    agg_columns.append(F.mean("consumption").alias("CONSUMPTION"))

df_final = (
    tss_phased.groupBy("VIN", "PHASE_INDEX", "DATETIME_BEGIN", "DATETIME_END", "PHASE_STATUS", "SOC_FIRST", "SOC_LAST", "SOC_DIFF", "NO_SOC_DATAPOINT", "IS_USABLE_PHASE")
    .agg(*agg_columns)
)

df_final.sort("DATETIME_BEGIN").show()

In [None]:
from core.spark_utils import create_spark_session
from core.s3.s3_utils import S3Service
from core.s3.settings import S3Settings
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, sum, when, col, signum, dense_rank
from pyspark.sql.types import DoubleType
from pyspark.sql import DataFrame as DF
import plotly.express as px
import pandas as pd
import plotly.graph_objects as go

settings = S3Settings()

spark = create_spark_session(
    settings.S3_KEY,
    settings.S3_SECRET
)

s3 = S3Service()

tss = s3.read_parquet_df_spark(spark, 'raw_ts/volkswagen/time_series/raw_ts_spark.parquet')


In [None]:
# tss.filter(col('SOH_OEM').isNotNull()).show()
print(tss.printSchema())