In [1]:
import polars as pl
from src.main import GetTanksReq, fetch_tank_data

req = GetTanksReq(property_ids={"98840"})

df = await fetch_tank_data(req)
if df is None:
    raise ValueError("df is None")
lf = df.lazy()
lf.collect()

unique_id,property_id,source_key,tank_name,uom,timestamp,value,tanksize
object,str,str,str,str,datetime[μs],f64,f64
0d96d173-3c6a-469f-901b-5cea8aa4210e,"""98840""","""9884005""","""OilTank5Level""","""in""",2015-05-01 05:05:54.083239,74.240695,790.375905
106e51eb-4faa-4d5c-b43e-210c92016f98,"""98840""","""98840W2""","""WaterTank2Volume""","""bbl""",2023-08-16 19:52:13.787936,357.148125,653.135767
1158ad41-4c78-4d2f-83ec-2ff717c2d445,"""98840""","""9884003""","""OilTank3Volume""","""bbl""",2024-02-11 04:10:55.839466,284.213132,624.069952
2748bdd4-4b4b-4add-aa16-251efbb55639,"""98840""","""98840FAC""","""ESD-OilTankID""","""""",2024-07-05 18:19:24.387492,4.0,
29576772-cde6-4767-a478-c7337a1c8c39,"""98840""","""9884004""","""OilTank4Level""","""in""",2021-06-18 15:33:10.557032,75.002393,417.5281
…,…,…,…,…,…,…,…
b5a48faa-1f40-4887-a0e9-7a84a3a83366,"""98840""","""98840W2""","""WaterTank2Level""","""in""",2019-04-05 01:41:07.809116,76.456344,653.135767
c4abef08-5ca3-43b2-a9a4-fdf5aa6210df,"""98840""","""98840FAC""","""ESD-WaterTankInchesUntilAlarm""","""in""",2024-08-16 04:45:27.763083,257.847738,
db762e5e-9c0c-4f77-864b-df5e66865f2e,"""98840""","""9884005""","""OilTank5Volume""","""bbl""",2023-03-17 20:34:17.444289,300.447955,790.375905
f3c718bf-58e8-488e-9445-99fae307d251,"""98840""","""98840FAC""","""ESD-OilTankInchesUntilAlarm""","""in""",2024-08-22 10:59:43.894659,102.478181,


In [2]:
lf = df.lazy()

tank_metrics = ["Level", "Volume", "InchesUntilAlarm", "InchesToESD", "TimeUntilESD", "Capacity", "ID"]
tank_types = ["Water", "Oil"]

tank_metrics_str= "|".join(tank_metrics)
tank_types_str = "|".join(tank_types)

pattern = f"^(?<is_ESD>ESD-)?(?<tank_type>{tank_types_str})Tank(?<tank_number>[0-9]*)(?<tank_metric>{tank_metrics_str})"
lf = lf.with_columns(
    separated_metrics=pl.col("tank_name").str.extract_groups(pattern)
)
lf = lf.unnest("separated_metrics")

lf = lf.with_columns(pl.col("tank_number").cast(pl.UInt8, strict=False))


# pivoting the data
values = pl.col("value")
columns = pl.col("tank_metric")
pivoted_lf = lf.group_by("property_id", "tank_type", "tank_number", "source_key").agg(
    values.filter(columns == metric).first().alias(metric) for metric in tank_metrics
)

null_condition = pl.col("tank_number").is_null()
not_null_condition = pl.col("tank_number").is_not_null()

null_tanks = pivoted_lf.filter(null_condition)
numbered_tanks = pivoted_lf.filter(not_null_condition)

null_tanks = null_tanks.with_columns(pl.col("ID").alias("tank_number"))
null_tanks = null_tanks.drop("ID")
null_tanks = null_tanks.with_columns(
    pl.col("tank_number").cast(pl.UInt8, strict=False)
)

joined_lf = numbered_tanks.join(
    null_tanks, on=["property_id", "tank_type", "tank_number"], how="left"
)

final_lf = joined_lf.join(lf, on=["source_key"], how="left")
final_lf = final_lf.group_by("property_id", "tank_type", "tank_number").agg(
    pl.all().first()
)

final_lf = final_lf.with_columns(
    pl.col("property_id"),
    pl.col("tank_type"),
    pl.col("tank_number"),
    pl.coalesce(pl.col("Level"), pl.col("Level_right")).alias("level"),
    pl.coalesce(pl.col("Volume"), pl.col("Volume_right")).alias("volume"),
    pl.coalesce(pl.col("InchesToESD"), pl.col("InchesUntilAlarm_right")).alias(
        "inches_to_esd"
    ),
    pl.coalesce(pl.col("TimeUntilESD"), pl.col("TimeUntilESD_right")).alias(
        "time_until_esd"
    ),
    pl.coalesce(pl.col("Capacity"), pl.col("tanksize")).alias("capacity"),
    pl.coalesce(pl.col("unique_id")).alias("unique_id")
)

required_columns = [
    "unique_id",
    "property_id",
    "source_key",
    "tank_type",
    "tank_number",
    "level",
    "volume",
    "inches_to_esd",
    "time_until_esd",
    "capacity",
]
final_lf = final_lf.select(required_columns)

final_lf = final_lf.sort("property_id", "tank_type", "tank_number")

percent_tank_full = (
    (pl.col("volume") / pl.col("capacity") * 100).round().cast(pl.UInt8)
)
final_lf = final_lf.with_columns(percent_tank_full.alias("percent_full"))

capacity_rounded = pl.col("capacity").round()
final_lf = final_lf.with_columns(capacity_rounded.alias("capacity"))

volume_to_feet = pl.col("volume").round().cast(pl.UInt64)
final_lf = final_lf.with_columns(volume_to_feet.alias("volume"))

result = final_lf.collect()
result

unique_id,property_id,source_key,tank_type,tank_number,level,volume,inches_to_esd,time_until_esd,capacity,percent_full
object,str,str,str,u8,f64,u64,f64,f64,f64,u8
8748b483-ee0b-460d-bc01-d1e37319d283,"""98840""","""9884001""","""Oil""",1,79.02198,299,,,554.0,54
686fc368-3d98-4608-aef0-5af0eb416e8c,"""98840""","""9884002""","""Oil""",2,77.590071,294,,,736.0,40
1158ad41-4c78-4d2f-83ec-2ff717c2d445,"""98840""","""9884003""","""Oil""",3,77.392763,284,,,624.0,46
29576772-cde6-4767-a478-c7337a1c8c39,"""98840""","""9884004""","""Oil""",4,75.002393,268,102.478181,,418.0,64
0d96d173-3c6a-469f-901b-5cea8aa4210e,"""98840""","""9884005""","""Oil""",5,74.240695,300,,,790.0,38
63176073-1766-4d40-8a15-72822e6c7220,"""98840""","""9884006""","""Oil""",6,82.279163,310,,,400.0,77
2cd89218-f881-41d7-a97a-7d06deacb5f5,"""98840""","""98840W1""","""Water""",1,79.941493,268,,,697.0,38
106e51eb-4faa-4d5c-b43e-210c92016f98,"""98840""","""98840W2""","""Water""",2,76.456344,357,257.847738,,653.0,55


In [3]:
lf = df.lazy()

tank_metrics = ["Level", "Volume", "InchesUntilAlarm", "InchesToESD", "TimeUntilESD", "Capacity", "ID"]
tank_types = ["Water", "Oil"]

tank_metrics_str= "|".join(tank_metrics)
tank_types_str = "|".join(tank_types)

pattern = f'^(?<is_esd>ESD-)?(?<tank_type>{tank_types_str})Tank(?<tank_number>[0-9]*)(?<tank_metric>{tank_metrics_str})'
lf = lf.with_columns(separated_metrics = pl.col("tank_name").str.extract_groups(pattern))

lf = lf.unnest("separated_metrics")

lf = lf.with_columns(pl.col("tank_number").cast(pl.UInt8, strict=False))

values = pl.col("value")
columns = pl.col("tank_metric")
pivoted_lf = lf.group_by("property_id", "tank_type", "tank_number", "source_key", "unique_id").agg(
    values.filter(columns == metric).first().alias(metric) for metric in tank_metrics
)

null_condition = pl.col("tank_number").is_null()
not_null_condition = pl.col("tank_number").is_not_null()

null_tanks = pivoted_lf.filter(null_condition)
numbered_tanks = pivoted_lf.filter(not_null_condition)

null_tanks = null_tanks.with_columns(pl.col("ID").alias("tank_number"))
null_tanks = null_tanks.drop("ID")
null_tanks = null_tanks.with_columns(pl.col("tank_number").cast(pl.UInt8, strict=False))

numbered_tanks = numbered_tanks.with_columns(pl.col("unique_id").alias("identifier"))
numbered_tanks = numbered_tanks.drop("unique_id")

null_tanks_merged = null_tanks.group_by(["property_id", "tank_type", "source_key"]).agg(
    [
        pl.col("tank_number").max(),  # Use max or min to fill missing values
        pl.col("unique_id").last(),  # Use first() or last() for string-like columns
        pl.col("Level").max(),
        pl.col("Volume").max(),
        pl.col("InchesUntilAlarm").max(),
        pl.col("InchesToESD").max(),
        pl.col("TimeUntilESD").max(),
        pl.col("Capacity").max(),
    ]
)

joined_lf = numbered_tanks.join(null_tanks_merged, on=["property_id", "tank_type", "tank_number"], how="left")

final_lf = joined_lf.join(lf, on=["source_key"], how="left")

final_lf = final_lf.with_columns(
    pl.col("unique_id").alias("identifier"),
    pl.col("property_id"),
    pl.col("tank_type"),
    pl.col("tank_number"),
    pl.coalesce(pl.col("Level"), pl.col("Level_right")).alias("level"),
    pl.coalesce(pl.col("Volume"), pl.col("Volume_right")).alias("volume"),
    pl.coalesce(pl.col("InchesToESD"), pl.col("InchesUntilAlarm_right")).alias("inches_to_esd"),
    pl.coalesce(pl.col("TimeUntilESD"), pl.col("TimeUntilESD_right")).alias("time_until_esd"),
    pl.coalesce(pl.col("Capacity"), pl.col("tanksize")).alias("capacity")
)

required_columns = ["identifier", "property_id", "source_key", "tank_type", "tank_number", "level", "volume", "inches_to_esd", "time_until_esd", "capacity"]
final_lf = final_lf.select(required_columns)

final_lf = final_lf.sort("property_id", "tank_type", "tank_number")

percent_tank_full = (pl.col("volume") / pl.col("capacity") * 100).round().cast(pl.UInt8)
final_lf = final_lf.with_columns(percent_tank_full.alias("percent_full"))

capacity_rounded = pl.col("capacity").round()
final_lf = final_lf.with_columns(capacity_rounded.alias("capacity"))

volume_to_feet = pl.col("volume").round().cast(pl.UInt64)

final_lf = final_lf.with_columns(volume_to_feet.alias("volume"))

result = final_lf.group_by(["property_id", "source_key", "tank_type", "tank_number"]).agg(
    [
        pl.col("identifier").first(),  # Use first() or last() for non-numeric columns
        pl.col("level").max(),  # Use max() to get the highest value (fill missing)
        pl.col("volume").max(),
        pl.col("inches_to_esd").max(),
        pl.col("time_until_esd").max(),
        pl.col("capacity").max(),
        pl.col("percent_full").max(),
    ]
)

result = result.sort("property_id", "tank_type", "tank_number")
result = result.select(["identifier", "property_id", "source_key", "tank_type", "tank_number", "level", "volume", "inches_to_esd", "time_until_esd", "capacity", "percent_full"])
result = result.with_columns(pl.col("identifier").map_elements(lambda x: str(x) if x is not None else None, return_dtype=pl.Utf8).alias("identifier"))
result.collect()


identifier,property_id,source_key,tank_type,tank_number,level,volume,inches_to_esd,time_until_esd,capacity,percent_full
str,str,str,str,u8,f64,u64,f64,f64,f64,u8
,"""98840""","""9884001""","""Oil""",1,79.02198,299,,,554.0,54
,"""98840""","""9884002""","""Oil""",2,77.590071,294,,,736.0,40
,"""98840""","""9884003""","""Oil""",3,77.392763,284,,,624.0,46
"""f3c718bf-58e8-488e-9445-99fae3…","""98840""","""9884004""","""Oil""",4,75.002393,268,102.478181,,418.0,64
,"""98840""","""9884005""","""Oil""",5,74.240695,300,,,790.0,38
,"""98840""","""9884006""","""Oil""",6,82.279163,310,,,400.0,77
,"""98840""","""98840W1""","""Water""",1,79.941493,268,,,697.0,38
"""c4abef08-5ca3-43b2-a9a4-fdf5aa…","""98840""","""98840W2""","""Water""",2,76.456344,357,257.847738,,653.0,55


In [4]:
lf = df.lazy()


In [5]:
tank_metrics = ["Level", "Volume", "InchesUntilAlarm", "InchesToESD", "TimeUntilESD", "Capacity", "ID"]
tank_types = ["Water", "Oil"]

tank_metrics_str= "|".join(tank_metrics)
tank_types_str = "|".join(tank_types)

pattern = f'^(?<is_esd>ESD-)?(?<tank_type>{tank_types_str})Tank(?<tank_number>[0-9]*)(?<tank_metric>{tank_metrics_str})'
lf = lf.with_columns(separated_metrics = pl.col("metric_nice_name").str.extract_groups(pattern))
lf.collect()

ColumnNotFoundError: metric_nice_name

In [None]:
lf = lf.unnest("separated_metrics")
lf.collect()

In [None]:
lf = lf.with_columns(pl.col("tank_number").cast(pl.UInt8, strict=False))
lf.collect()

In [None]:
#pivoting the data
values = pl.col("value")
columns = pl.col("tank_metric")
pivoted_lf = lf.group_by("property_id", "tank_type", "tank_number", "scada_id", "unique_id").agg(
    values.filter(columns == metric).first().alias(metric) for metric in tank_metrics
)

pivoted_lf.collect()

In [None]:
null_condition = pl.col("tank_number").is_null()
not_null_condition = pl.col("tank_number").is_not_null()

null_tanks = pivoted_lf.filter(null_condition)
numbered_tanks = pivoted_lf.filter(not_null_condition)

null_tanks = null_tanks.with_columns(pl.col("ID").alias("tank_number"))
null_tanks = null_tanks.drop("ID")
null_tanks = null_tanks.with_columns(pl.col("tank_number").cast(pl.UInt8, strict=False))

null_tanks.collect()

In [None]:
numbered_tanks.collect()

In [None]:
numbered_tanks = numbered_tanks.with_columns(pl.col("unique_id").alias("identifier"))
numbered_tanks = numbered_tanks.drop("unique_id")
numbered_tanks.collect()

In [None]:
pp = null_tanks.group_by(["property_id", "tank_type", "scada_id"]).agg(
    [
        pl.col("tank_number").max(),  # Use max or min to fill missing values
        pl.col("unique_id").last(),  # Use first() or last() for string-like columns
        pl.col("Level").max(),
        pl.col("Volume").max(),
        pl.col("InchesUntilAlarm").max(),
        pl.col("InchesToESD").max(),
        pl.col("TimeUntilESD").max(),
        pl.col("Capacity").max(),
    ]
)

pp.collect()

In [None]:
joined_lf = numbered_tanks.join(pp, on=["property_id", "tank_type", "tank_number"], how="left")
joined_lf.collect()

In [None]:
final_lf = joined_lf.join(lf, on=["scada_id"], how="left")
final_lf.collect()

In [None]:
final_lf = final_lf.with_columns(
    pl.col("unique_id").alias("identifier"),
    pl.col("property_id"),
    pl.col("tank_type"),
    pl.col("tank_number"),
    pl.coalesce(pl.col("Level"), pl.col("Level_right")).alias("level"),
    pl.coalesce(pl.col("Volume"), pl.col("Volume_right")).alias("volume"),
    pl.coalesce(pl.col("InchesToESD"), pl.col("InchesUntilAlarm_right")).alias("inches_to_esd"),
    pl.coalesce(pl.col("TimeUntilESD"), pl.col("TimeUntilESD_right")).alias("time_until_esd"),
    pl.coalesce(pl.col("Capacity"), pl.col("tanksize")).alias("capacity")
)

required_columns = ["identifier", "property_id", "scada_id", "tank_type", "tank_number", "level", "volume", "inches_to_esd", "time_until_esd", "capacity"]
final_lf = final_lf.select(required_columns)
final_lf.collect()

In [None]:
final_lf = final_lf.sort("property_id", "tank_type", "tank_number")
final_lf.collect()

In [None]:
percent_tank_full = (pl.col("volume") / pl.col("capacity") * 100).round().cast(pl.UInt8)
final_lf = final_lf.with_columns(percent_tank_full.alias("percent_full"))

final_lf.collect()

In [None]:
capacity_rounded = pl.col("capacity").round()
final_lf = final_lf.with_columns(capacity_rounded.alias("capacity"))
final_lf.collect()

In [None]:
volume_to_feet = pl.col("volume").round().cast(pl.UInt64)

final_lf = final_lf.with_columns(volume_to_feet.alias("volume"))

final_lf.collect()

In [None]:
new_pp = final_lf.group_by(["property_id", "scada_id", "tank_type", "tank_number"]).agg(
    [
        pl.col("identifier").first(),  # Use first() or last() for non-numeric columns
        pl.col("level").max(),  # Use max() to get the highest value (fill missing)
        pl.col("volume").max(),
        pl.col("inches_to_esd").max(),
        pl.col("time_until_esd").max(),
        pl.col("capacity").max(),
        pl.col("percent_full").max(),
    ]
)



new_pp.collect()

In [None]:
new_pp = new_pp.sort("property_id", "tank_type", "tank_number")
new_pp.collect()