In [None]:
import pandas as pd
import time
from pathlib import Path


In [None]:
PROJECT_ROOT = Path(r"D:\soc-dashboard-suite-main\soc-dashboard-suite-main")

STREAM_PATH = PROJECT_ROOT / "data" / "stream" / "email_event_stream.csv"
ENRICHED_STREAM = PROJECT_ROOT / "data" / "stream" / "email_event_stream_enriched.csv"


In [None]:
pd.DataFrame().to_csv(ENRICHED_STREAM, index=False)
processed = 0


In [None]:
pd.DataFrame(columns=[
    "event_time",
    "sender_email",
    "sender_domain",
    "recipient_email",
    "recipient_domain",
    "subject",
    "message_id",
    "event_type",
    "ingested_at",
    "first_seen_time",
    "is_first_seen_day"
]).to_csv(ENRICHED_STREAM, index=False)

print("Initialized enriched stream")


In [None]:
while True:
    try:
        # Wait until stream file exists and has some content
        if not STREAM_PATH.exists() or STREAM_PATH.stat().st_size == 0:
            time.sleep(2)
            continue

        stream_df = pd.read_csv(STREAM_PATH)

        # Ensure header has been written
        if "event_time" not in stream_df.columns:
            print("Waiting for valid stream header...")
            time.sleep(2)
            continue

        # Convert event_time safely
        stream_df["event_time"] = pd.to_datetime(
            stream_df["event_time"],
            errors="coerce",
            utc=True
        )

        # Drop partially written rows
        stream_df = stream_df.dropna(subset=["event_time"])

        # Only process new events
        if len(stream_df) > processed:
            new_events = stream_df.iloc[processed:].copy()

            new_events["sender_domain"] = (
                new_events["sender_email"]
                .astype(str)
                .str.split("@")
                .str[-1]
            )

            first_seen = (
                stream_df.groupby("sender_email")["event_time"]
                .min()
                .reset_index()
                .rename(columns={"event_time": "first_seen_time"})
            )

            new_events = new_events.merge(first_seen, on="sender_email", how="left")

            new_events["is_first_seen_day"] = (
                new_events["event_time"].dt.normalize() ==
                new_events["first_seen_time"].dt.normalize()
            )

            new_events.to_csv(
                ENRICHED_STREAM,
                mode="a",
                header=False,
                index=False
            )

            processed = len(stream_df)
            print(f"Enriched up to {processed} events")

        time.sleep(2)

    except Exception as e:
        print("Waiting for enrichment...", e)
        time.sleep(2)
