In [3]:
from pymongo import MongoClient
import pandas as pd
from datetime import datetime

# Kết nối MongoDB
client = MongoClient("mongodb://localhost:27017")
db = client["traffic"]

# Chọn collection gốc
raw_coll = db["traffic1"]


In [4]:
# Lấy mẫu nhỏ (ví dụ 50.000 document để thử pipeline)
cursor = raw_coll.find().limit(4337136)
df = pd.DataFrame(list(cursor))

In [5]:
# Chuyển trường 'count_date' sang datetime
df["count_date"] = pd.to_datetime(df["count_date"], errors="coerce")

# Tạo các feature thời gian
df["day_of_week"] = df["count_date"].dt.day_name()
df["is_weekend"] = df["count_date"].dt.dayofweek >= 5
df["month"] = df["count_date"].dt.month

# Ví dụ tạo biến trễ (lag feature) cho "all_motor_vehicles"
df = df.sort_values(["count_point_id", "count_date", "hour"])
df["all_motor_vehicles_lag1"] = df.groupby("count_point_id")["all_motor_vehicles"].shift(1)
df["all_motor_vehicles_lag2"] = df.groupby("count_point_id")["all_motor_vehicles"].shift(2)

# Loại bỏ giá trị NaN sinh ra từ lag
df = df.dropna(subset=["all_motor_vehicles_lag1", "all_motor_vehicles_lag2"])

# Ghi sang collection mới
features_coll = db["traffic_features"]
features_coll.delete_many({})  # xóa dữ liệu cũ nếu có
features_coll.insert_many(df.to_dict("records"))

print("✅ Đã tạo collection 'traffic_features' thành công!")


✅ Đã tạo collection 'traffic_features' thành công!


In [6]:
print("Số dòng sau xử lý:", len(df))


Số dòng sau xử lý: 4253812


In [9]:
# --- Tính toán các thống kê ---
hourly_avg = (
    df.groupby("hour")["all_motor_vehicles"]
    .mean()
    .reset_index()
    .rename(columns={"all_motor_vehicles": "avg_traffic"})
)

daily_avg = (
    df.groupby("day_of_week")["all_motor_vehicles"]
    .mean()
    .reset_index()
    .rename(columns={"all_motor_vehicles": "avg_traffic"})
    .sort_values("day_of_week")
)

monthly_avg = (
    df.groupby("month")["all_motor_vehicles"]
    .mean()
    .reset_index()
    .rename(columns={"all_motor_vehicles": "avg_traffic"})
    .sort_values("month")
)

region_avg = (
    df.groupby("region_name")["all_motor_vehicles"]
    .mean()
    .reset_index()
    .rename(columns={"all_motor_vehicles": "avg_traffic"})
    .sort_values("avg_traffic", ascending=False)
)

weekend_avg = (
    df.groupby("is_weekend")["all_motor_vehicles"]
    .mean()
    .reset_index()
    .rename(columns={"all_motor_vehicles": "avg_traffic"})
)
weekend_avg["day_type"] = weekend_avg["is_weekend"].map({True: "Weekend", False: "Weekday"})
weekend_avg = weekend_avg[["day_type", "avg_traffic"]]

top_roads = (
    df.groupby("road_name")["all_motor_vehicles"]
    .mean()
    .reset_index()
    .sort_values("all_motor_vehicles", ascending=False)
    .head(10)
    .rename(columns={"all_motor_vehicles": "avg_traffic"})
)

# --- Tạo danh sách document ---
summary_docs = [
    {"type": "hourly_avg", "timestamp": datetime.now(), "data": hourly_avg.to_dict("records")},
    {"type": "daily_avg", "timestamp": datetime.now(), "data": daily_avg.to_dict("records")},
    {"type": "monthly_avg", "timestamp": datetime.now(), "data": monthly_avg.to_dict("records")},
    {"type": "region_avg", "timestamp": datetime.now(), "data": region_avg.to_dict("records")},
    {"type": "weekend_avg", "timestamp": datetime.now(), "data": weekend_avg.to_dict("records")},
    {"type": "top_roads", "timestamp": datetime.now(), "data": top_roads.to_dict("records")},
]
db_new = db["analytical_summary"]
# --- Ghi vào MongoDB ---
db_new.delete_many({})  # Xóa bản cũ
db_new.insert_many(summary_docs)

print("✅ Đã tạo collection 'analytical_summary' với nhiều document riêng biệt!")

✅ Đã tạo collection 'analytical_summary' với nhiều document riêng biệt!
