In [2]:
# Imports
import polars as pl
import subprocess
import math
from datetime import timedelta


# 1. Cấu hình file & lazy load

In [3]:
FILEPATH = "/kaggle/input/merged-timeseries/merged_timeseries_cleaned_total.csv"

# Nếu CSV (lazy)
df = pl.scan_csv(FILEPATH, try_parse_dates=False)  # lazy, không tốn RAM




# 2. Thông tin cơ bản

In [6]:
# Lấy tên cột nhanh (đọc 5 rows bằng read_csv - rất nhẹ)
cols = pl.read_csv(FILEPATH, n_rows=5).columns
print("Columns (sample):", cols)

# Row count nhanh bằng wc -l (đối với CSV)
import subprocess
lines = int(subprocess.check_output(["wc", "-l", FILEPATH]).split()[0])
total_rows = max(0, lines - 1)  # trừ header
print(f"Estimated total rows (from wc -l): {total_rows:,}")

# Schema inference (lazy->collect small sample to inspect dtypes)
sample = pl.read_csv(FILEPATH, n_rows=1000)  
print("Inferred schema (sample):")
print(sample.dtypes)


Columns (sample): ['timestamp', 'temp', 'humid', 'co2', 'light', 'class', 'trip_id', 'fruit_cate', 'latitude', 'longitude', 'Route', 'temperature_C', 'humidity_%', 'dew_point_C', 'pressure_hPa', 'wind_speed_kmh', 'precipitation_mm', 'trip_id_raw', 'expected_delay_min', 'weight', 'unit_quantity', 'is_timestamp_dup']
Estimated total rows (from wc -l): 43,184,857
Inferred schema (sample):
[String, Float64, Float64, Float64, Float64, String, String, String, Float64, Float64, String, Float64, Float64, Float64, Float64, Float64, Float64, Float64, Int64, Float64, Int64, Boolean]


# 3. COMPLETENESS: missing count & % cho từng cột

In [6]:
# Completeness: số giá trị null / % null cho mỗi cột (lazy aggregated)

# Lấy danh sách cột từ sample (đã có từ cell trước)
cols = cols  

# Tạo biểu thức tính null_count
null_exprs = [pl.col(c).null_count().alias(c) for c in cols]

# Chạy tính null count
null_counts = df.select(null_exprs).collect().transpose(include_header=True)
null_df = null_counts.to_pandas()
null_df.columns = ["column", "null_count"]

# Tính % null
null_df["pct_null"] = null_df["null_count"] / total_rows * 100

# Lấy số lượng giá trị unique cho mỗi cột
unique_exprs = [pl.col(c).n_unique().alias(c) for c in cols]
unique_counts = df.select(unique_exprs).collect().transpose(include_header=True)
unique_df = unique_counts.to_pandas()
unique_df.columns = ["column", "n_unique"]

# Gộp thành 1 bảng
dq_completeness = null_df.merge(unique_df, on="column")

dq_completeness.sort_values("pct_null", ascending=False).head(20)

Unnamed: 0,column,null_count,pct_null,n_unique
0,timestamp,0,0.0,10803484
1,temp,0,0.0,43178474
20,unit_quantity,0,0.0,1971
19,weight,0,0.0,4421
18,expected_delay_min,0,0.0,11
17,trip_id_raw,0,0.0,6474
16,precipitation_mm,0,0.0,61337
15,wind_speed_kmh,0,0.0,637808
14,pressure_hPa,0,0.0,742085
13,dew_point_C,0,0.0,491701


# 4. Uniqueness (kiểm tra trùng lặp)

In [None]:
# ---- 4.1. Trùng theo cặp khóa quan trọng: trip_id + timestamp ----
dup_keys = (
    df
    .group_by(["trip_id", "timestamp"])
    .agg(pl.count().alias("cnt"))
    .filter(pl.col("cnt") > 1)
    .select(["trip_id", "timestamp", "cnt"])
    .collect()
)

print("Số nhóm (trip_id, timestamp) bị trùng:", dup_keys.height)
print("Ví dụ 10 dòng bị trùng:")
print(dup_keys.head(10))


# ---- 4.2. Trùng toàn bộ dòng (full-row duplicates) ----
unique_rows = df.unique().select(pl.count()).collect()[0, 0]

print("\nTổng số dòng duy nhất:", unique_rows)
print("Tổng số dòng trùng:", total_rows - unique_rows)

  .agg(pl.count().alias("cnt"))


Số nhóm (trip_id, timestamp) bị trùng: 0
Ví dụ 10 dòng bị trùng:
shape: (0, 3)
┌─────────┬───────────┬─────┐
│ trip_id ┆ timestamp ┆ cnt │
│ ---     ┆ ---       ┆ --- │
│ str     ┆ str       ┆ u32 │
╞═════════╪═══════════╪═════╡
└─────────┴───────────┴─────┘


  unique_rows = df.unique().select(pl.count()).collect()[0, 0]


# 5. Validity (kiểm tra giá trị hợp lệ theo domain rules)

In [4]:
# Tập các quy tắc hợp lệ (bạn có thể thêm/sửa)
rules = {
    "humidity_%": (pl.col("humidity_%").is_null() |
                   ((pl.col("humidity_%") >= 0) & (pl.col("humidity_%") <= 100))),

    "latitude": (pl.col("latitude").is_null() |
                 ((pl.col("latitude") >= -90) & (pl.col("latitude") <= 90))),

    "longitude": (pl.col("longitude").is_null() |
                  ((pl.col("longitude") >= -180) & (pl.col("longitude") <= 180))),

    "temp": (pl.col("temp").is_null() |
             ((pl.col("temp") >= -40) & (pl.col("temp") <= 60))),

    "co2": (pl.col("co2").is_null() | (pl.col("co2") >= 0)),

    "is_timestamp_dup": (
        pl.col("is_timestamp_dup").is_null() |
        pl.col("is_timestamp_dup").is_in([True, False])
    )
}

# Tính số dòng vi phạm rule
invalid_rows = {}
for col, expr in rules.items():
    cnt = df.filter(~expr).select(pl.count()).collect()[0,0]
    invalid_rows[col] = cnt

import pandas as pd
pd.DataFrame(list(invalid_rows.items()),
             columns=["column", "invalid_count"]).sort_values("invalid_count", ascending=False)


  cnt = df.filter(~expr).select(pl.count()).collect()[0,0]


Unnamed: 0,column,invalid_count
0,humidity_%,0
1,latitude,0
2,longitude,0
3,temp,0
4,co2,0
5,is_timestamp_dup,0


# 6. Accuracy (phát hiện outlier bằng IQR cho cột số)

In [7]:
# 6. ACCURACY — PHÁT HIỆN OUTLIER BẰNG IQR

# Chọn các cột dạng số 
numeric_cols = ["temp", "humid", "co2", "light",
                "weight", "expected_delay_min", "unit_quantity"]

# Lọc ra các cột có tồn tại trong dataset
numeric_cols = [c for c in numeric_cols if c in cols]

outlier_summary = []

for c in numeric_cols:
    # Tính Q1, Q3
    q = df.select([
        pl.col(c).quantile(0.25).alias("q1"),
        pl.col(c).quantile(0.75).alias("q3")
    ]).collect()

    q1 = q[0,0]
    q3 = q[0,1]

    if q1 is None or q3 is None:
        outlier_summary.append((c, None, None, None, None))
        continue

    iqr = q3 - q1
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr

    # Đếm số outlier
    cnt = df.filter((pl.col(c) < lower) | (pl.col(c) > upper)) \
            .select(pl.count()).collect()[0,0]

    outlier_summary.append((c, q1, q3, cnt, (lower, upper)))

pd.DataFrame(outlier_summary,
             columns=["column", "q1", "q3", "outlier_count", "range"]).sort_values("outlier_count", ascending=False)


  .select(pl.count()).collect()[0,0]


Unnamed: 0,column,q1,q3,outlier_count,range
6,unit_quantity,329.0,966.0,6021187,"(-626.5, 1921.5)"
4,weight,1.495706,13.4,4724334,"(-16.360735038904714, 31.25644102334283)"
3,light,9.208954,19.246443,2313511,"(-5.847280196096774, 34.30267768199781)"
0,temp,22.166133,24.865782,1997295,"(18.11665946097506, 28.915255326834373)"
1,humid,85.792357,91.721655,1215863,"(76.89840831967032, 100.61560378916485)"
2,co2,304.832816,345.009603,252,"(244.5676354580289, 405.2747841953328)"
5,expected_delay_min,2.0,8.0,0,"(-7.0, 17.0)"


# 7. Timeliness (timestamp hợp lệ + check độ trễ, gap thời gian)

In [9]:
# 7) TIMELINESS — kiểm tra tính kịp thời của timestamp

# 7.0. Parse timestamp → Datetime
df_ts = df.with_columns(
    pl.col("timestamp").str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S", strict=False)
                         .alias("ts_parsed")
)

# 7.1. Số dòng parse lỗi
unparsed = (
    df_ts.filter(pl.col("ts_parsed").is_null())
         .select(pl.len())
         .collect()[0, 0]
)

print("Số dòng timestamp parse lỗi:", unparsed)


# 7.2 Min / Max timestamp
tmin = df_ts.select(pl.col("ts_parsed").min()).collect()[0,0]
tmax = df_ts.select(pl.col("ts_parsed").max()).collect()[0,0]

print("Timestamp nhỏ nhất:", tmin)
print("Timestamp lớn nhất:", tmax)


# 7.3. Tính độ trễ giữa các record theo trip_id
gaps = (
    df_ts
    .sort(["trip_id", "ts_parsed"])
    .with_columns(
        pl.col("ts_parsed").cast(pl.Int64).alias("ts_int")
    )
    .with_columns(
        pl.col("ts_int").diff().alias("delta_ns")
    )
    .group_by("trip_id")
    .agg([
        pl.col("delta_ns").max().alias("max_gap_ns"),
        pl.col("delta_ns").mean().alias("avg_gap_ns"),
        pl.col("delta_ns").median().alias("median_gap_ns"),
        pl.len().alias("n_rows")
    ])
    .with_columns([
        (pl.col("max_gap_ns") / 1e9).alias("max_gap_s"),
        (pl.col("avg_gap_ns") / 1e9).alias("avg_gap_s"),
        (pl.col("median_gap_ns") / 1e9).alias("median_gap_s")
    ])
    .select(["trip_id", "n_rows", "max_gap_s", "avg_gap_s", "median_gap_s"])
    .collect()
)

# 7.4. Hiển thị các trip có gap bất thường
gaps.sort("max_gap_s", descending=True).head(10)


Số dòng timestamp parse lỗi: 0
Timestamp nhỏ nhất: 2024-01-01 00:10:25
Timestamp lớn nhất: 2024-06-01 01:47:00


trip_id,n_rows,max_gap_s,avg_gap_s,median_gap_s
str,u32,f64,f64,f64
"""TRIP_20605""",1201,7166.116,6.566291,0.6
"""TRIP_71926""",1177,7139.203,6.665083,0.6
"""TRIP_07055""",6127,7063.295,1.752717,0.6
"""TRIP_109475""",1045,7050.526,7.346341,0.6
"""TRIP_43089""",181,6940.964,38.944552,0.6
"""TRIP_106092""",9157,6830.954,1.345916,0.6
"""TRIP_68509""",6283,6735.12,1.671864,0.6
"""TRIP_18321""",6301,6723.344,1.666933,0.6
"""TRIP_39539""",811,6664.777,8.817234,0.6
"""TRIP_58977""",9175,6632.122,1.322782,0.6
