In [0]:
import yaml
from datetime import datetime
from vehicle_status import vehicle_status
from apply_constraints import apply_constraints
from interpolation import interpolate_dataframe
from battery_mapping import battery_mapping
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import to_timestamp, to_unix_timestamp, col, lit, when

In [0]:
spark = SparkSession.builder.appName("TelemetryApp").getOrCreate()

In [0]:
data = spark.table("workspace.default.raw_telemetry")

In [0]:
dataclean_specs = "/Workspace/Users/abhishekrmandapmalvi@gmail.com/vehicle-telemetry/config/schema.yml"
battery_specs = "/Workspace/Users/abhishekrmandapmalvi@gmail.com/vehicle-telemetry/config/vehicle_hash_table.yml"

In [0]:
data = apply_constraints(data, dataclean_specs)

In [0]:
data = battery_mapping(data, battery_specs)

In [0]:
data = data.withColumn("timestamp", to_timestamp("timestamp")) \
           .withColumn("unix_timestamp", to_unix_timestamp("timestamp"))

In [0]:
data = data.withColumn(
                "battery_current",
                when(
                    (((col("battery_current") > 0) & (col("speed_kmh") > 0)) |
                    ((col("battery_current") < 0) & (col("speed_kmh") <= 0))),
                    lit(None)
                ).otherwise(col("battery_current"))
                )

In [0]:
data = interpolate_dataframe(data, dataclean_specs)

In [0]:
display(data)

vehicle_id,timestamp,speed_kmh,battery_voltage,battery_current,battery_soc_percent,battery_temp_celsius,latitude,longitude,commercial_name,battery_pack_configuration,battery_type,form_factor,battery_capacity,nominal_voltage,number_of_cell,unix_timestamp
V-001,2025-02-07T23:05:35.000Z,97.13,204.55,-30.53,53.12,-4.83,-83.9301045,66.445737,Mustang Mach-e,197s2p,lithium-ion,pouch,100.54,800.0,384,1738969535
V-001,2025-02-07T23:05:35.000Z,142.24,204.55,-30.53,53.12,-4.83,84.318647,24.194758,Mustang Mach-e,197s2p,lithium-ion,pouch,100.54,800.0,384,1738969535
V-001,2025-02-07T23:05:35.000Z,36.16,204.55,-30.53,53.12,-4.83,9.3689125,127.842595,Mustang Mach-e,197s2p,lithium-ion,pouch,100.54,800.0,384,1738969535
V-001,2025-02-07T23:05:35.000Z,130.36,204.55,-30.53,53.12,-4.83,-39.0903215,-172.370391,Mustang Mach-e,197s2p,lithium-ion,pouch,100.54,800.0,384,1738969535
V-001,2025-02-07T23:05:35.000Z,45.73,204.55,-30.53,53.12,-4.83,68.480647,-119.592027,Mustang Mach-e,197s2p,lithium-ion,pouch,100.54,800.0,384,1738969535
V-001,2025-02-07T23:05:35.000Z,151.62,204.55,-30.53,53.12,-4.83,5.642958,-179.119823,Mustang Mach-e,197s2p,lithium-ion,pouch,100.54,800.0,384,1738969535
V-001,2025-02-07T23:05:35.000Z,58.54,204.55,-30.53,53.12,-4.83,47.5161545,-84.558383,Mustang Mach-e,197s2p,lithium-ion,pouch,100.54,800.0,384,1738969535
V-001,2025-02-07T23:05:36.000Z,124.17,61.73,-5.21,21.54,8.76,-14.8585715,92.800675,Mustang Mach-e,197s2p,lithium-ion,pouch,100.54,800.0,384,1738969536
V-001,2025-02-07T23:05:36.000Z,37.71,61.73,-5.21,21.54,8.76,2.70011,-124.149146,Mustang Mach-e,197s2p,lithium-ion,pouch,100.54,800.0,384,1738969536
V-001,2025-02-07T23:05:36.000Z,36.94,61.73,-5.21,21.54,8.76,35.847065,27.698816,Mustang Mach-e,197s2p,lithium-ion,pouch,100.54,800.0,384,1738969536


In [0]:
data = vehicle_status(data)

In [0]:
data.select("number_of_cell").dtypes

[('number_of_cell', 'int')]

In [0]:
data.write.format("delta").mode("overwrite").saveAsTable("workspace.default.cleaned_telemetry")