In [None]:
pip install -r ../etl/requirements.txt 

Defaulting to user installation because normal site-packages is not writeable
Collecting polars (from -r ../etl/requirements.txt (line 1))
  Downloading polars-1.32.2-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (15 kB)
Collecting connectorx (from -r ../etl/requirements.txt (line 2))
  Downloading connectorx-0.4.3-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (26 kB)
Downloading polars-1.32.2-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (38.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m38.3/38.3 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading connectorx-0.4.3-cp312-cp312-manylinux_2_28_x86_64.whl (41.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.7/41.7 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: polars, connectorx
Successfully installed connectorx-0.4.3 polars-1.32.2
Note: you may need to restart the kernel to use up

In [1]:
import polars as pl
import os
from dotenv import load_dotenv
from datetime import datetime as dt

In [29]:
uri = "mysql://etl_user:etlpassword123@localhost:3306/coal_mining"
query_load_data = """
    SELECT 
        date, 
        mine_id, 
        shift, 
        tons_extracted, 
        quality_grade
    FROM 
        production_logs
"""

sql_data = pl.read_database_uri(query=query_load_data, uri=uri)

In [3]:
iot_data = pl.read_csv(
    "../../data/iot/equipment_sensors.csv",
    use_pyarrow=True,
)

In [4]:
anomaly_data = sql_data.filter(pl.col("tons_extracted") < 0)

In [32]:
# daily_production =
sql_data.group_by("date").agg(
    pl.when(~(pl.col("tons_extracted") >= 0))
    .then(pl.col("tons_extracted"))
    .otherwise(0)
    .sum()
    .alias("total_production_daily"),
    pl.col("quality_grade").mean().alias("average_quality_grade"),
)

date,total_production_daily,average_quality_grade
date,f64,f64
2025-04-26,0.0,4.85
2025-05-08,-207.24,4.45
2024-12-19,0.0,4.816667
2024-12-07,0.0,3.8
2024-12-13,-947.68,5.3
…,…,…
2025-04-02,0.0,5.1
2025-03-09,-185.93,5.366667
2024-11-07,0.0,4.65
2025-03-15,0.0,4.583333


In [4]:
total_equipment = iot_data["equipment_id"].n_unique()

iot_data_daily = iot_data.group_by(pl.col("timestamp").dt.date()).agg(
    ((pl.col("status") == "active").sum() / (24 * total_equipment)).alias(
        "equipment_utilization"
    ),
    pl.col("fuel_consumption").sum().alias("total_fuel_consumption"),
)

In [10]:
count_equipment = (
    iot_data.sort(["timestamp", "equipment_id"])
    .group_by("timestamp")
    .agg(
        pl.col("equipment_id").n_unique().alias("unique_equipment_count"),
    )
    .filter(pl.col("unique_equipment_count") < iot_data["equipment_id"].n_unique())
)

In [16]:
count_equipment["timestamp"].to_numpy()

array([], dtype='datetime64[ms]')

In [26]:
incomplete_data = (
    iot_data.filter(
        iot_data["timestamp"].is_in(count_equipment["timestamp"].to_numpy())
    )["timestamp", "equipment_id"]
    .group_by("timestamp")
    .agg(
        pl.col("equipment_id").sum().alias("list_equipment_id"),
    )
)

In [34]:
missing_equipment = incomplete_data.with_columns(
    pl.when(~(pl.col("list_equipment_id").str.contains("TR001")))
    .then(1)
    .otherwise(0)
    .alias("TR001"),
    pl.when(~(pl.col("list_equipment_id").str.contains("TR002")))
    .then(1)
    .otherwise(0)
    .alias("TR002"),
    pl.when(~(pl.col("list_equipment_id").str.contains("TR003")))
    .then(1)
    .otherwise(0)
    .alias("TR003"),
    pl.when(~(pl.col("list_equipment_id").str.contains("TR004")))
    .then(1)
    .otherwise(0)
    .alias("TR004"),
    pl.when(~(pl.col("list_equipment_id").str.contains("TR005")))
    .then(1)
    .otherwise(0)
    .alias("TR005"),
).drop("list_equipment_id")

In [53]:
missing_value = (
    missing_equipment.unpivot(index=["timestamp"])
    .filter(pl.col("value") == 1)
    .drop("value")
    .rename({"variable": "equipment_id"})
    .with_columns(pl.col("timestamp").dt.date().dt.offset_by("-1d").alias("yesterday"))
    .join(
        iot_data.with_columns(pl.col("timestamp").dt.date().alias("yesterday")),
        how="inner",
        left_on=["yesterday", "equipment_id"],
        right_on=["yesterday", "equipment_id"],
    )
    .group_by(["timestamp", "yesterday", "equipment_id"])
    .agg(pl.col("fuel_consumption").mean())
    .with_columns(
        pl.lit("active").alias("status"),
        pl.lit(False).alias("maintenance_alert"),
    )["timestamp", "equipment_id", "status", "fuel_consumption", "maintenance_alert"]
)

In [52]:
iot_data

timestamp,equipment_id,status,fuel_consumption,maintenance_alert
datetime[ms],str,str,f64,bool
2024-07-01 00:00:00,"""TR001""","""maintenance""",0.0,false
2024-07-01 00:00:00,"""TR002""","""idle""",0.0,false
2024-07-01 00:00:00,"""TR003""","""maintenance""",0.0,false
2024-07-01 00:00:00,"""TR004""","""idle""",0.0,false
2024-07-01 00:00:00,"""TR005""","""active""",8.45,false
…,…,…,…,…
2025-06-30 23:00:00,"""TR001""","""idle""",0.0,false
2025-06-30 23:00:00,"""TR002""","""idle""",0.0,false
2025-06-30 23:00:00,"""TR003""","""maintenance""",0.0,false
2025-06-30 23:00:00,"""TR004""","""idle""",0.0,false


In [23]:
import polars as pl

df = pl.DataFrame({"ID": [1, 2], "type1": [1, 0], "type2": [0, 0], "type3": [1, 1]})

# Melt into long format
df_long = df.unpivot(index=["ID"])

# Filter only rows where flag == 1 and drop flag column
# df_long = df_long.filter(pl.col("flag") == 1).select(["ID", "type"])

print(df_long)

shape: (6, 3)
┌─────┬──────────┬───────┐
│ ID  ┆ variable ┆ value │
│ --- ┆ ---      ┆ ---   │
│ i64 ┆ str      ┆ i64   │
╞═════╪══════════╪═══════╡
│ 1   ┆ type1    ┆ 1     │
│ 2   ┆ type1    ┆ 0     │
│ 1   ┆ type2    ┆ 0     │
│ 2   ┆ type2    ┆ 0     │
│ 1   ┆ type3    ┆ 1     │
│ 2   ┆ type3    ┆ 1     │
└─────┴──────────┴───────┘


In [21]:
import requests

In [30]:
response = requests.get(
    "https://archive-api.open-meteo.com/v1/archive?latitude=2.0167&longitude=117.3000&start_date=2024-07-01&end_date=2025-05-09&daily=temperature_2m_mean,precipitation_sum&timezone=Asia/Jakarta"
)

In [46]:
weather_historical_data = pl.from_dict(response.json()["daily"])

In [49]:
weather_historical_data = weather_historical_data.with_columns(
    pl.col("time").str.to_date().alias("date"),
).drop("time")

In [50]:
weather_historical_data

temperature_2m_mean,precipitation_sum,date
f64,f64,date
26.0,1.7,2024-07-01
25.3,29.9,2024-07-02
24.6,9.5,2024-07-03
25.9,0.2,2024-07-04
26.3,6.2,2024-07-05
…,…,…
26.8,9.8,2025-05-05
26.5,3.3,2025-05-06
26.8,9.8,2025-05-07
26.5,5.1,2025-05-08


In [None]:
##### BREAKING POINT #####

In [16]:
from dotenv import load_dotenv

load_dotenv()

True

In [21]:
db_user = os.getenv("MYSQL_USER", "")
db_password = os.getenv("MYSQL_PASSWORD", "")
db_host = os.getenv("MYSQL_HOST", "")
db_name = os.getenv("MYSQL_DATABASE", "")


def load_sql_data(
    host: str, user: str, password: str, database: str, query: str
) -> pl.DataFrame:
    """
    Load data from a SQL database using Polars.

    Args:
        host (str): Database host.
        user (str): Database user.
        password (str): Database password.
        database (str): Database name.
        query (str): SQL query to execute.

    Returns:dock
        pl.DataFrame: DataFrame containing the result of the query.
    """
    return pl.read_database_uri(
        query, f"mysql://{user}:{password}@{host}:3306/{database}"
    )

In [22]:
load_sql_data(
    host=db_host,
    user=db_user,
    password=db_password,
    database=db_name,
    query="SELECT * FROM mines",
)

mine_id,mine_code,mine_name,location,operational_status
i64,str,str,str,str
1,"""MINE001""","""Bukit Bara""","""Berau, Kalimantan""","""Active"""
2,"""MINE002""","""Gunung Hitam""","""Berau, Kalimantan""","""Active"""
3,"""MINE003""","""Sumber Jaya""","""Berau, Kalimantan""","""Maintenance"""


In [None]:
"mysql://etl_user:etlpassword123@localhost:3306/coal_mining"

In [12]:
os.environ.get("MYSQL_USER", "")

''

In [33]:
(dt.strptime("2025-05-08", "%Y-%m-%d") - dt.now()).days < -90

True