# Hack4Rail - Challenge 9

Sample notebook to access battery data on Snowflake

In [None]:
from dotenv import load_dotenv
import plotly.express as px
from charged.snowflake_utils import create_session
import snowflake.snowpark.functions as F
import pandas as pd
 
# load environment variables from .env file
load_dotenv()

# create snowpark session
session = create_session()

In [None]:
import pandas as pd


def load_data(start_date=None, end_date=None):
    if start_date and end_date:
        filters = [
            ("TIMESTAMP_VEHICLE", ">=", pd.Timestamp(start_date).tz_localize("America/Los_Angeles")),
            ("TIMESTAMP_VEHICLE", "<=", pd.Timestamp(end_date).tz_localize("America/Los_Angeles")),
        ]
    else:
        filters = None
    df = pd.read_parquet("../data/clean_data.parquet", filters=filters, engine="pyarrow")
    return df

df = load_data('2025-06-01', '2025-06-08')
df.head()

## Time Series

In [None]:
# query tables BATTERIELOK_DATA, VEHICLES, VEHICLE_TYPE
# table = "BATTERIELOK_DATA"
# sdf = (
#     session
#     .table(table)
#     .filter(F.col("VEHICLE_ID").isNotNull())
#     .with_column(
#         "TIMESTAMP_TRUNC",
#         F.from_unixtime(
#             F.round(F.unix_timestamp(F.col("TIMESTAMP_VEHICLE")) / 60) * 60
#         ).cast("TIMESTAMP"),
#     )
#     .with_column(
#         "DATE",
#         F.date_trunc("DAY", "TIMESTAMP_VEHICLE").cast("DATE"),
#     )
# )
# sdf.show()

In [None]:
# sdf.columns

columns = [
    "VEHICLE_OUTSIDE_TEMP",
    "BATTERY_SOC",
    "BATTERY_SOH",
    "BATTERY_COOLING_TEMP",
    "BATTERY_1_TEMP",
    "BATTERY_1_VOLTAGE",
    "BATTERY_1_CURRENT",
    "BATTERY_2_TEMP",
    "BATTERY_2_VOLTAGE",
    "BATTERY_2_CURRENT",
    "BATTERY_3_TEMP",
    "BATTERY_3_VOLTAGE",
    "BATTERY_3_CURRENT",
    "BATTERY_4_TEMP",
    "BATTERY_4_VOLTAGE",
    "BATTERY_4_CURRENT",
    "BATTERY_5_VOLTAGE"
]

In [None]:
# # groupby vehicle and 60 sec
# aggregations = []
# for column in columns:
#     aggregations.append(F.avg(F.col(column)).alias(f"{column}_AVG"))

# df = (
#     sdf
#     .filter(F.to_date(F.col("TIMESTAMP_VEHICLE")) >= from_date)
#     .filter(F.to_date(F.col("TIMESTAMP_VEHICLE")) <= to_date)
#     .group_by(["VEHICLE_ID", "TIMESTAMP_TRUNC"])
#     .agg(*aggregations)
#     .order_by(["VEHICLE_ID", "TIMESTAMP_TRUNC"], ascending=[True, True])
#     .to_pandas()
# )
# df.head()


In [None]:
# code with
# Group by 'VEHICLE_ID' and 'TIMESTAMP_TRUNC', and calculate the average for each column
aggregations = {f"{column}": 'mean' for column in columns}

df_grouped = (
    df
    .groupby(["VEHICLE_ID", "TIMESTAMP_TRUNC"])
    .agg(aggregations)
)

# Order the DataFrame by 'VEHICLE_ID' and 'TIMESTAMP_TRUNC'
df_ordered = df_grouped.sort_values(by=["VEHICLE_ID", "TIMESTAMP_TRUNC"], ascending=[True, True])

# Display the first few rows of the resulting DataFrame
df_graph = df_ordered.reset_index().head()

In [None]:
# show 
fig = px.line(
    df_graph,
    x="TIMESTAMP_TRUNC",
    y="BATTERY_SOC",
    markers=True,
    # color="VEHICLE_ID",
    facet_row="VEHICLE_ID",
    render_mode="svg",
)

fig.show()

## Overview

Overview of battery health per vehicle and battery

In [None]:
sdf.show()

In [None]:
# aggregate per day
columns = [
    "BATTERY_SOH"
]

# groupby vehicle and 60 sec
aggregations = []
for column in columns:
    aggregations.append(F.avg(F.col(column)).alias(f"{column}_AVG"))

df = (
    sdf
    .filter(F.to_date(F.col("TIMESTAMP_VEHICLE")) >= from_date)
    .filter(F.to_date(F.col("TIMESTAMP_VEHICLE")) <= to_date)
    .group_by(["VEHICLE_ID", "DATE", "TIMESTAMP_TRUNC"])
    .agg(*aggregations)
    .order_by(["VEHICLE_ID", "TIMESTAMP_TRUNC"], ascending=[True, True])
    .to_pandas()
)
df.head()

In [None]:
fig = px.box(
    df,
    x="DATE",
    y="BATTERY_SOH_AVG",
    # color="VEHICLE_ID",
    facet_row="VEHICLE_ID",
    # render_mode="svg",
)
fig.update_yaxes(matches=None, showticklabels=True)

fig.show()

## Errors

In [None]:
# ('ERRORS', ArrayType(StringType()), nullable=True)
sdf.schema

In [None]:
df = (
    sdf
    .filter(F.to_date(F.col("TIMESTAMP_VEHICLE")) >= '2025-01-01')
    .filter(F.to_date(F.col("TIMESTAMP_VEHICLE")) <= to_date)
    .filter(F.size(F.col("ERRORS")) > 0)
    .group_by(["VEHICLE_ID", "DATE"])
    .count()
    .order_by(["VEHICLE_ID", "DATE"], ascending=[True, True])
    .to_pandas()
)
df.head()

In [None]:
fig = px.bar(
    df,
    x="DATE",
    y="COUNT",
    facet_row="VEHICLE_ID",
)
fig.update_yaxes(matches=None, showticklabels=True)

fig.show()

## KPIs

In [None]:
df = (
    sdf
    .filter(F.to_date(F.col("TIMESTAMP_VEHICLE")) >= '2025-06-20')
    .filter(F.to_date(F.col("TIMESTAMP_VEHICLE")) <= '2025-06-24')
    .with_column(
        "TIMESTAMP_TRUNC",
        F.from_unixtime(
            F.round(F.unix_timestamp(F.col("TIMESTAMP_VEHICLE")) / 60) * 60
        ).cast("TIMESTAMP"),
    )
    .with_column(
        "DATE",
        F.date_trunc("DAY", "TIMESTAMP_VEHICLE").cast("DATE"),
    )
    .with_column("ERROR_SIZE", F.size(F.col("ERRORS")))
    .to_pandas()
)
df.shape

In [None]:
(
    sdf
    .group_by(["VEHICLE_ID"])
    .agg(
        F.avg(F.col("BATTERY_SOH")).alias("BATTERY_SOH_AVG"),
        F.sum(F.col("ERROR_SIZE")).alias("ERROR_COUNT"),
    )
    .with_column(
        "ERROR_STATE",
        F.when(
            F.col("ERROR_COUNT") > 0, F.lit(1)
        ).otherwise(F.lit(0))
    )
    .with_column(
        "SOH_STATE",
        F.when(
            F.col("BATTERY_SOH_AVG") > 95, F.lit(0)
        ).otherwise(
            F.when(
                (F.col("BATTERY_SOH_AVG") > 50)
                , F.lit(0.5)
            ).otherwise(F.lit(0))
        )
    )
    .with_column("KPI", F.col("ERROR_STATE") + F.col("SOH_STATE"))
    .order_by(["VEHICLE_ID"], ascending=[True])
    .show(50)
)

In [None]:
# Assuming `df` is your Pandas DataFrame equivalent to `sdf`
df_result = (
    df
    .groupby('VEHICLE_ID')
    .agg(
        BATTERY_SOH_AVG=('BATTERY_SOH', 'mean'),
        ERROR_COUNT=('ERROR_SIZE', 'sum')
    )
    .assign(
        ERROR_STATE=lambda x: x['ERROR_COUNT'].apply(lambda count: 1 if count > 0 else 0),
        SOH_STATE=lambda x: x['BATTERY_SOH_AVG'].apply(
            lambda soh: 0 if soh > 95 else (0.5 if soh > 50 else 0)
        )
    )
)

df_result['KPI'] = df_result['ERROR_STATE'] + df_result['SOH_STATE']

df_result = df_result.sort_values('VEHICLE_ID')

# Display the first 50 rows
df_result.reset_index().head()