# PoC: 操作ログと状態イベントの初期分析

このノートブックでは、`equipment_control_logs` と `equipment_status_events` の両データセットを用いて、操作ログの傾向把握と状態イベントとの関係性を検証します。分析要件は `PLAN.md` に基づき、再現性を重視した手順を記録します。

In [None]:
# WHAT: 分析で利用する主要ライブラリの読み込みと表示設定の初期化
# WHY : 再現性の高い環境を準備し結果の読みやすさを確保するため
# CHECK: pandas と matplotlib/seaborn のバージョンが表示されること
from pathlib import Path
import warnings

import pandas as pd

try:
    import matplotlib.pyplot as plt  # noqa: F401
    HAS_MPL = True
except ImportError:
    plt = None
    HAS_MPL = False

try:
    import seaborn as sns  # noqa: F401
except ImportError:
    sns = None

pd.set_option("display.max_columns", 40)
pd.set_option("display.max_rows", 20)
pd.set_option("display.float_format", "{:.3f}".format)

print(f"pandas={pd.__version__}")
if HAS_MPL:
    import matplotlib as mpl  # noqa: WPS433

    print(f"matplotlib={mpl.__version__}")
else:
    print("matplotlib=NOT INSTALLED")
if sns is not None:
    print(f"seaborn={sns.__version__}")
else:
    print("seaborn=NOT INSTALLED")


In [None]:
import sys

# 表示済みライブラリのバージョンを出力する

print(f"python={sys.version.split()[0]}")
print(f"pandas={pd.__version__}")

if HAS_MPL:
    print(f"matplotlib={getattr(mpl, '__version__', 'unknown')}")
else:
    print("matplotlib=NOT INSTALLED")

if sns is not None:
    print(f"seaborn={sns.__version__}")
else:
    print("seaborn=NOT INSTALLED")

In [None]:
# WHAT: データファイルのパスと読み込みロジックを定義する
# WHY : データソースを一元管理し前処理を再利用できるようにするため
# CHECK: 読み込み関数を呼ぶと DataFrame の基本情報が得られること
from typing import List

DATA_DIR = Path("data")
CONTROL_PATH = DATA_DIR / "equipment_control_logs.csv"
STATUS_PATH = DATA_DIR / "equipment_status_events.csv"
NA_TOKENS = ["NULL"]
BOOL_MAP = {"TRUE": True, "FALSE": False, "true": True, "false": False}


def parse_datetimes(df: pd.DataFrame, columns: List[str], *, utc: bool = True) -> pd.DataFrame:
    for column in columns:
        if column not in df.columns:
            continue
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", category=UserWarning)
            df[column] = pd.to_datetime(df[column], errors="coerce", utc=utc)
    return df


def load_control_logs() -> pd.DataFrame:
    df = pd.read_csv(CONTROL_PATH, encoding="utf-8-sig", na_values=NA_TOKENS)
    datetime_cols = [
        "OrderReceiptDate",
        "TimerSetDate",
        "CompleteDate",
        "CreatedDate",
        "UpdatedDate",
        "DeletedDate",
    ]
    df = parse_datetimes(df, datetime_cols)
    df["IsDelete"] = df["IsDelete"].fillna(0).astype(bool)
    df["ControlResult"] = df["ControlResult"].astype("category")
    df["EquipmentTypeId"] = df["EquipmentTypeId"].astype("category")
    df["duration_to_complete_min"] = (
        df["CompleteDate"] - df["OrderReceiptDate"]
    ).dt.total_seconds() / 60
    df["has_error"] = df[["ErrorCode", "ErrorReason"]].notna().any(axis=1)
    return df


def load_status_events() -> pd.DataFrame:
    df = pd.read_csv(STATUS_PATH, encoding="utf-8-sig", na_values=NA_TOKENS)
    datetime_cols = [
        "ReportedDate",
        "DetectionDate",
        "ErrorDate",
        "CreatedDate",
        "UpdatedDate",
        "DeletedDate",
    ]
    df = parse_datetimes(df, datetime_cols)
    df["IsDelete"] = df["IsDelete"].fillna(0).astype(bool)
    if "MessageName" in df.columns:
        df["MessageName"] = df["MessageName"].astype("category")
    if "AliveStatus" in df.columns:
        df["AliveStatus"] = df["AliveStatus"].astype("category")
    return df


def extract_status_properties(df: pd.DataFrame, prefix: str, limit: int) -> pd.DataFrame:
    records = []
    for ordinal in range(1, limit + 1):
        code_col = f"{prefix}Code{ordinal}"
        name_col = f"{prefix}Name{ordinal}"
        value_col = f"{prefix}Value{ordinal}"
        desc_col = f"{prefix}Description{ordinal}"
        available = [
            col
            for col in (code_col, name_col, value_col, desc_col)
            if col in df.columns
        ]
        if not available:
            continue
        subset = df[["Id"] + available].copy()
        subset.rename(
            columns={
                "Id": "event_id",
                code_col: "code",
                name_col: "name",
                value_col: "value",
                desc_col: "description",
            },
            inplace=True,
        )
        subset["ordinal"] = ordinal
        subset.dropna(subset=["code", "name", "value"], how="all", inplace=True)
        subset["value"] = subset["value"].replace(BOOL_MAP)
        records.append(subset)
    if not records:
        return pd.DataFrame(
            columns=["event_id", "code", "name", "value", "description", "ordinal"]
        )
    return pd.concat(records, ignore_index=True)


In [None]:
# WHAT: 操作ログと状態イベントを読み込み、件数や欠損の概況を確認する
# WHY : 以降の集計で前提とするデータ品質と量を把控するため
# CHECK: 各データセットの形状と主な欠損率が出力されること

control_df = load_control_logs()
status_df = load_status_events()

summary = (
    pd.DataFrame(
        {
            "rows": [len(control_df), len(status_df)],
            "columns": [control_df.shape[1], status_df.shape[1]],
        },
        index=["equipment_control_logs", "equipment_status_events"],
    ).assign(
        non_null_ratio=[
            1 - control_df.isna().mean().mean(),
            1 - status_df.isna().mean().mean(),
        ]
    )
)
summary


In [None]:
# 概要探索: control_df の要点を素早く把握するセル
# 出力: 形状・dtypes・欠損率上位・主要カテゴリの頻度・duration 分布・時刻範囲・サンプル

# 基本情報
print("shape:", control_df.shape)
print("\nデータ型 (dtypes):")
print(control_df.dtypes)

# 欠損率 (上位)
missing_ratio = control_df.isna().mean().sort_values(ascending=False)
print("\n欠損率 (上位 15):")
print(missing_ratio.head(15))

# 一意値数 (上位)
unique_counts = control_df.nunique(dropna=False).sort_values(ascending=False)
print("\n一意値数 (上位 15):")
print(unique_counts.head(15))

# 主要カテゴリ列の頻度
cat_cols = [c for c in control_df.columns if str(control_df[c].dtype) in ("category", "object")]
print("\n主要カテゴリ列のトップ頻度 (各最大10):")
for c in ["ControlResult", "EquipmentTypeId", "PropertyName", "EdgeManagedEquipmentId"]:
    if c in control_df.columns:
        print(f"\n-- {c} --")
        print(control_df[c].value_counts(dropna=False).head(10))

# duration_to_complete_min の統計と外れ値指標
if "duration_to_complete_min" in control_df.columns:
    dur = control_df["duration_to_complete_min"].dropna()
    print("\n'duration_to_complete_min' summary:")
    print(dur.describe(percentiles=[0.01, 0.05, 0.25, 0.5, 0.75, 0.90, 0.95, 0.99]))
    # 正負/極端値チェック
    print("\n<0 分 の件数:", (dur < 0).sum())
    print(">= 1440 分 (1日) の件数:", (dur >= 1440).sum())

# 時刻列の範囲
for tcol in ["OrderReceiptDate", "TimerSetDate", "CompleteDate", "CreatedDate", "UpdatedDate"]:
    if tcol in control_df.columns:
        col = control_df[tcol].dropna()
        if not col.empty:
            print(f"\n{tcol}: min={col.min()}, max={col.max()}")

# has_error / IsDelete の分布
for bcol in ["has_error", "IsDelete"]:
    if bcol in control_df.columns:
        print(f"\n{bcol} distribution:")
        print(control_df[bcol].value_counts(dropna=False))

# サンプル表示（重要列）
display_cols = [
    "Id",
    "EdgeManagedEquipmentId",
    "EquipmentTypeId",
    "OrderReceiptDate",
    "CompleteDate",
    "ControlResult",
    "duration_to_complete_min",
    "has_error",
]
display_cols = [c for c in display_cols if c in control_df.columns]
print("\nサンプル (head 10):")
print(control_df[display_cols].head(10).to_string(index=False))

# 可視化: ヒストグラムとカテゴリ別カウント（matplotlib / seaborn が利用可能なら）
if HAS_MPL and (globals().get("sns") is not None):
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    if "duration_to_complete_min" in control_df.columns:
        sns.histplot(control_df["duration_to_complete_min"].dropna(), bins=50, kde=False)
        plt.xlabel("duration_to_complete_min")
        plt.title("操作完了までの分数 分布")
    else:
        plt.text(0.5, 0.5, "duration_to_complete_min がありません", ha="center")

    plt.subplot(1, 2, 2)
    if "EquipmentTypeId" in control_df.columns:
        top = control_df["EquipmentTypeId"].value_counts().head(15)
        sns.barplot(x=top.values, y=top.index)
        plt.xlabel("count")
        plt.title("上位設備種別 (EquipmentTypeId)")
    else:
        plt.text(0.5, 0.5, "EquipmentTypeId がありません", ha="center")

    plt.tight_layout()
else:
    print("\nINFO: グラフ描画は matplotlib/seaborn が必要です (pip install matplotlib seaborn)。")

In [None]:
# WHAT: データサンプルと追加指標を可視化前に点検する
# WHY : カテゴリ値や活用指標の妥当性を手速く確認するため
# CHECK: サンプル表示と活用列の統計量が確認できること

control_sample = control_df[
    [
        "Id",
        "BuildingId",
        "EquipmentTypeId",
        "OrderReceiptDate",
        "CompleteDate",
        "ControlResult",
        "duration_to_complete_min",
        "has_error",
    ]
].head()
status_sample = status_df[
    ["Id", "MessageName", "AliveStatus", "ReportedDate", "EquipmentTypeId", "EquipmentId"]
].head()

control_stats = control_df["duration_to_complete_min"].describe(percentiles=[0.5, 0.9])
control_sample, status_sample, control_stats


In [None]:
# WHAT: 状態イベントのプロパティ列を正規化し、詳細項目の把控を容易にする
# WHY : 可変長のプロパティ情報を集計や結合に活用できるようにするため
# CHECK: 抽出したプロパティの件数とサンプルが確認できること

status_props = extract_status_properties(status_df, prefix="Property", limit=10)
status_error_props = extract_status_properties(status_df, prefix="ErrorProperty", limit=5)

props_summary = {
    "property_records": len(status_props),
    "error_property_records": len(status_error_props),
}
status_props.head(), status_error_props.head(), props_summary


In [None]:
# WHAT: 操作ログの件数推移と制御結果の内訳を集計・可視化する
# WHY : 操作活動のピークや成功率を把控し重点的に調査すべき領域を抽出するため
# CHECK: 日次件数テーブルと制御結果の集計、および可視化または代替出力が得られること

control_daily = (
    control_df.dropna(subset=["CompleteDate"])
    .assign(complete_local=lambda df: df["CompleteDate"].dt.tz_convert("Asia/Tokyo"))
    .groupby([pd.Grouper(key="complete_local", freq="D"), "EquipmentTypeId"], observed=True)
    .size()
    .reset_index(name="count")
)

result_by_type = (
    control_df.groupby(["EquipmentTypeId", "ControlResult"], observed=True)
    .size()
    .unstack(fill_value=0)
    .assign(total=lambda df: df.sum(axis=1))
    .sort_values("total", ascending=False)
)

if HAS_MPL and sns is not None:
    plt.figure(figsize=(8, 4))
    sns.lineplot(data=control_daily, x="complete_local", y="count", hue="EquipmentTypeId", marker="o")
    plt.title("日次の操作件数（設備種別別）")
    plt.xlabel("完了日 (JST)")
    plt.ylabel("件数")
    plt.xticks(rotation=45)
    plt.tight_layout()
else:
    print("INFO: matplotlib / seaborn が未インストールのためグラフ描画をスキップしました。")
    print("以下の集計結果を参考にしてください。")

control_daily, result_by_type


In [None]:
# WHAT: 操作ログと状態イベントを機器単位・時間近部で突き合わせる
# WHY : 操作後の状態通知の有無や時差を検証し、連携状況を把控するため
# CHECK: マッチング件数・遅延分数・代表サンプルが確認できること

join_keys = ["EdgeManagedEquipmentId"]
matchable_controls = (
    control_df.dropna(subset=["CompleteDate"] + join_keys).sort_values("CompleteDate")
)
matchable_status = (
    status_df.dropna(subset=["ReportedDate"] + join_keys).sort_values("ReportedDate")
)

matched = pd.merge_asof(
    matchable_controls,
    matchable_status,
    left_on="CompleteDate",
    right_on="ReportedDate",
    by="EdgeManagedEquipmentId",
    direction="nearest",
    tolerance=pd.Timedelta("1h"),
    suffixes=("_control", "_status"),
)

matched["lag_minutes"] = (
    matched["ReportedDate"] - matched["CompleteDate"]
).dt.total_seconds() / 60
match_summary = matched["Id_status"].notna().value_counts().rename(
    index={True: "matched", False: "unmatched"}
)
lag_stats = (
    matched.loc[matched["Id_status"].notna(), "lag_minutes"].describe()
    if matched["Id_status"].notna().any()
    else "状態イベントとのマッチがありません"
)

matched_sample = matched.loc[
    matched["Id_status"].notna(),
    [
        "Id_control",
        "Id_status",
        "EdgeManagedEquipmentId",
        "CompleteDate",
        "ReportedDate",
        "lag_minutes",
        "ControlResult",
        "MessageName",
    ],
].head()

match_summary, lag_stats, matched_sample


## メモ

- グラフ描画がスキップされた場合は、`pip install matplotlib seaborn` を実行して再度セルを実行してください。
- 追加で確認したいメトリクスがあれば、`control_df` や `status_df` を基にセルを追記してください。