In [None]:
import json
import sys
from pathlib import Path

import polars as pl
import pyarrow.dataset as ds

print(sys.executable)

d:\SudhakarF\DocumentsF\SanDiego\Course_10_MLOps\FinalProject\ueba\.venv\Scripts\python.exe


In [3]:
ROOT_DIR = Path.cwd().parent
BASE_DIR = ROOT_DIR / "data"
RAW_DATASET_DIR = BASE_DIR / "raw" / "r6.2"
PROCESSED_DATASET_DIR = BASE_DIR / "processed" / "r6.2"
PARQUET_DATASET_DIR = PROCESSED_DATASET_DIR / "parquet"
FEATURES_DATASET_DIR = PROCESSED_DATASET_DIR / "features" / "daily"
DATASET_FILE_NAMES = ["logon", "device", "email", "file", "http", "decoy_file"]

force_recompute = True
verbose = True

In [4]:
for filename in DATASET_FILE_NAMES:
    print(f"Scanning parquet file: {filename}")
    lz = pl.scan_csv(str(RAW_DATASET_DIR / f"{filename}.csv"))
    print(lz.limit(5).collect())

Scanning parquet file: logon
shape: (5, 5)
┌──────────────────────────┬─────────────────────┬─────────┬─────────┬──────────┐
│ id                       ┆ date                ┆ user    ┆ pc      ┆ activity │
│ ---                      ┆ ---                 ┆ ---     ┆ ---     ┆ ---      │
│ str                      ┆ str                 ┆ str     ┆ str     ┆ str      │
╞══════════════════════════╪═════════════════════╪═════════╪═════════╪══════════╡
│ {F3X8-Y2GT43DR-4906OHBL} ┆ 01/02/2010 02:19:18 ┆ DNS1758 ┆ PC-0414 ┆ Logon    │
│ {B4Q0-D0GM24KN-3704MAII} ┆ 01/02/2010 02:31:12 ┆ DNS1758 ┆ PC-0414 ┆ Logoff   │
│ {T7J1-D4HK34KV-5476TCIJ} ┆ 01/02/2010 02:34:02 ┆ DNS1758 ┆ PC-5313 ┆ Logon    │
│ {S4Y6-D8MQ05SA-0759HLIS} ┆ 01/02/2010 02:53:30 ┆ DNS1758 ┆ PC-5313 ┆ Logoff   │
│ {F3P0-E7FH78CV-4874FRGZ} ┆ 01/02/2010 04:07:31 ┆ DNS1758 ┆ PC-0012 ┆ Logon    │
└──────────────────────────┴─────────────────────┴─────────┴─────────┴──────────┘
Scanning parquet file: device
shape: (5, 6)
┌──────────

In [5]:
def format_size(bytes_size: int) -> str:
    """
    Converts a size in bytes to a human-readable format (e.g, KB, MB, GB)
    """
    for unit in ["B", "KB", "MB", "GB", "TB"]:
        if bytes_size < 1024:
            return f"{bytes_size:.2f} {unit}"
        bytes_size /= 1024
    return f"{bytes_size:.2f} PB"


def get_storage_size(path: str | Path):
    """
    Calculates the total physical disk footprint of the dataset in bytes.
    """
    path = Path(path)
    if path.is_file():
        return path.stat().st_size
    else:
        return sum(f.stat().st_size for f in path.rglob("*") if f.is_file())

In [6]:
def compare_dataset_sizes(
    raw_dataset_path: str | Path,
    processed_dataset_path: str | Path,
    verbose: bool = False,
) -> None:
    """
    Inspects parquet file or partitioned parquet directory safely.

    - Prints total size on disk
    - Prints schema (lazy)
    - Prints first 5 rows (lazy)
    """

    # --------------------------------------------------
    # Comparison of raw vs processed dataset sizes
    # --------------------------------------------------

    if not raw_dataset_path.exists():
        print(f"[ERROR] Raw dataset path does not exist: {raw_dataset_path}")
        return
    if not processed_dataset_path.exists():
        print(
            f"[ERROR] Processed dataset path does not exist: {processed_dataset_path}"
        )
        return

    total_size_raw = get_storage_size(raw_dataset_path)
    total_size_processed = get_storage_size(processed_dataset_path)

    total_size_raw_formatted = format_size(total_size_raw)
    total_size_processed_formatted = format_size(total_size_processed)

    if verbose:
        print("\n==============================")
        print("Dataset Size Comparison:")
        print(f"Raw Dataset Path      : {Path(*raw_dataset_path.parts[6:])}")
        print(f"Processed Dataset Path: {Path(*processed_dataset_path.parts[6:])}")
        print(f"Raw Dataset Size      : {total_size_raw_formatted}")
        print(f"Processed Dataset Size: {total_size_processed_formatted}")
        if total_size_raw > 0:
            reduction_pct = (1 - (total_size_processed / total_size_raw)) * 100

            print(
                f"Size Reduction        : "
                f"{total_size_raw_formatted} -> {total_size_processed_formatted} "
                f"({reduction_pct:.2f}% reduction)"
            )

        else:
            print(
                f"Size Reduction        : {total_size_raw_formatted} -> {total_size_processed_formatted} (N/A - raw size is zero)"
            )
        print("==============================\n")

    return {
        "raw_size": total_size_raw,
        "processed_size": total_size_processed,
        "reduction_pct": reduction_pct if total_size_raw > 0 else 0,
    }

In [7]:
for filename in DATASET_FILE_NAMES:
    raw_dataset_path = RAW_DATASET_DIR / f"{filename}.csv"
    processed_dataset_path = PARQUET_DATASET_DIR / f"{filename}.parquet"
    dataset_sizes = compare_dataset_sizes(
        raw_dataset_path, processed_dataset_path, verbose=verbose
    )


Dataset Size Comparison:
Raw Dataset Path      : ueba\data\raw\r6.2\logon.csv
Processed Dataset Path: ueba\data\processed\r6.2\parquet\logon.parquet
Raw Dataset Size      : 230.45 MB
Processed Dataset Size: 90.86 MB
Size Reduction        : 230.45 MB -> 90.86 MB (60.57% reduction)


Dataset Size Comparison:
Raw Dataset Path      : ueba\data\raw\r6.2\device.csv
Processed Dataset Path: ueba\data\processed\r6.2\parquet\device.parquet
Raw Dataset Size      : 133.08 MB
Processed Dataset Size: 43.47 MB
Size Reduction        : 133.08 MB -> 43.47 MB (67.34% reduction)


Dataset Size Comparison:
Raw Dataset Path      : ueba\data\raw\r6.2\email.csv
Processed Dataset Path: ueba\data\processed\r6.2\parquet\email.parquet
Raw Dataset Size      : 7.53 GB
Processed Dataset Size: 3.04 GB
Size Reduction        : 7.53 GB -> 3.04 GB (59.59% reduction)


Dataset Size Comparison:
Raw Dataset Path      : ueba\data\raw\r6.2\file.csv
Processed Dataset Path: ueba\data\processed\r6.2\parquet\file.parquet
Raw Dat

#### Dataset summary for RAW PARQUET FILES

In [8]:
def dataset_summary(
    path: str | Path, force_recompute: bool = False, verbose: bool = False
) -> dict:
    """ """
    path = Path(path)
    dataset = ds.dataset(str(path), format="parquet")
    summary_path = PROCESSED_DATASET_DIR / "profiling" / f"{path.stem}_summary.json"

    if summary_path.exists() and not force_recompute:
        print(f"Loading cached summary for {path.name}...\n")
        with open(summary_path, "r") as f:
            summary = json.load(f)
    else:
        # crete summary path if it doesn't exist
        summary_path.parent.mkdir(parents=True, exist_ok=True)

        total_files = 0
        total_rows = 0
        col_metadata = {}
        for fragment in dataset.get_fragments():
            total_files += 1
            fragment_meta = fragment.metadata
            total_rows += fragment_meta.num_rows

            for row_group_idx in range(fragment_meta.num_row_groups):
                row_group_meta = fragment_meta.row_group(row_group_idx)

                for col_idx in range(row_group_meta.num_columns):
                    col_chunk = row_group_meta.column(col_idx)
                    col_name = col_chunk.path_in_schema
                    col_stats = col_chunk.statistics

                    if col_stats is None:
                        continue

                    if col_name not in col_metadata:
                        col_metadata[col_name] = {
                            "not_null_count": 0,
                            "null_count": 0,
                            "min": None,
                            "max": None,
                        }

                    # Aggregate counts
                    col_metadata[col_name]["not_null_count"] += (
                        col_stats.num_values or 0
                    )
                    col_metadata[col_name]["null_count"] += col_stats.null_count or 0

                    # Min
                    if col_stats.has_min_max and col_stats.min is not None:
                        current_min = col_metadata[col_name]["min"]
                        col_metadata[col_name]["min"] = (
                            col_stats.min
                            if current_min is None
                            else min(current_min, col_stats.min)
                        )

                    # Max
                    if col_stats.has_min_max and col_stats.max is not None:
                        current_max = col_metadata[col_name]["max"]
                        col_metadata[col_name]["max"] = (
                            col_stats.max
                            if current_max is None
                            else max(current_max, col_stats.max)
                        )

        lf = pl.scan_parquet(str(path))
        sample = lf.limit(5).collect()

        summary = {
            "dataset_name": path.name,
            "dataset_path": str(Path(*path.parts[6:])),
            "dataset_memory_size": format_size(get_storage_size(path)),
            "schema": [(field.name, str(field.type)) for field in dataset.schema],
            "total_files": total_files,
            "total_rows": total_rows,
            "columns": col_metadata,
            "sample": sample,
        }

        with open(summary_path, "w") as f:
            json.dump(summary, f, indent=2, default=str)

    if verbose:
        print("\n" + "=" * 60)
        print("DATASET METADATA SUMMARY")
        print("=" * 60)

        print(f"Dataset Name : {summary['dataset_name']}")
        print(f"Dataset Path : {summary['dataset_path']}")
        print(f"Memory Size : {summary['dataset_memory_size']}")

        print("\nSCHEMA")
        print("-" * 60)
        for name, dtype in summary["schema"]:
            print(f"{name:<25} {dtype}")

        print("\nDATASET OVERVIEW")
        print("-" * 60)
        print(f"Total Files : {summary['total_files']}")
        print(f"Total Rows  : {summary['total_rows']:,}")

        print("\nCOLUMN STATISTICS")
        print("-" * 60)
        for col, stats in summary["columns"].items():
            print(f"\n{col}")
            print(f"  Non-Null Count : {stats['not_null_count']:,}")
            print(f"  Null Count     : {stats['null_count']:,}")
            print(f"  Min            : {stats['min']}")
            print(f"  Max            : {stats['max']}")

        print("\nSAMPLE (First 5 Rows)")
        print("-" * 60)
        print(summary["sample"])

        print("\n" + "=" * 60 + "\n")

    return summary

In [9]:
## logon dataset summary
logon_summary = dataset_summary(
    PARQUET_DATASET_DIR / "logon.parquet",
    force_recompute=force_recompute,
    verbose=verbose,
)


DATASET METADATA SUMMARY
Dataset Name : logon.parquet
Dataset Path : ueba\data\processed\r6.2\parquet\logon.parquet
Memory Size : 90.86 MB

SCHEMA
------------------------------------------------------------
id                        large_string
user                      large_string
pc                        large_string
activity                  large_string
timestamp                 timestamp[us]

DATASET OVERVIEW
------------------------------------------------------------
Total Files : 520
Total Rows  : 3,530,285

COLUMN STATISTICS
------------------------------------------------------------

id
  Non-Null Count : 3,530,285
  Null Count     : 0
  Min            : {A0A0-A2DT76JF-3051XWBA}
  Max            : {Z9Z9-Z3EM53KV-0163FIEO}

user
  Non-Null Count : 3,530,285
  Null Count     : 0
  Min            : AAB0162
  Max            : ZZO2997

pc
  Non-Null Count : 3,530,285
  Null Count     : 0
  Min            : PC-0001
  Max            : PC-9996

activity
  Non-Null Count : 3,530

In [10]:
## device dataset summary
device_summary = dataset_summary(
    PARQUET_DATASET_DIR / "device.parquet",
    force_recompute=force_recompute,
    verbose=verbose,
)


DATASET METADATA SUMMARY
Dataset Name : device.parquet
Dataset Path : ueba\data\processed\r6.2\parquet\device.parquet
Memory Size : 43.47 MB

SCHEMA
------------------------------------------------------------
id                        large_string
user                      large_string
pc                        large_string
file_tree                 large_string
activity                  large_string
timestamp                 timestamp[us]

DATASET OVERVIEW
------------------------------------------------------------
Total Files : 516
Total Rows  : 1,551,828

COLUMN STATISTICS
------------------------------------------------------------

id
  Non-Null Count : 1,551,828
  Null Count     : 0
  Min            : {A0A0-A2LF00HC-9260HFAU}
  Max            : {Z9Z9-Z2FL65FI-5646FSCT}

user
  Non-Null Count : 1,551,828
  Null Count     : 0
  Min            : AAC0610
  Max            : ZRM0694

pc
  Non-Null Count : 1,551,828
  Null Count     : 0
  Min            : PC-0001
  Max            : P

In [11]:
## email dataset summary
email_summary = dataset_summary(
    PARQUET_DATASET_DIR / "email.parquet",
    force_recompute=force_recompute,
    verbose=verbose,
)


DATASET METADATA SUMMARY
Dataset Name : email.parquet
Dataset Path : ueba\data\processed\r6.2\parquet\email.parquet
Memory Size : 3.04 GB

SCHEMA
------------------------------------------------------------
id                        large_string
user                      large_string
pc                        large_string
to                        large_string
cc                        large_string
bcc                       large_string
from                      large_string
activity                  large_string
size                      int64
attachments               large_string
content                   large_string
timestamp                 timestamp[us]

DATASET OVERVIEW
------------------------------------------------------------
Total Files : 528
Total Rows  : 10,994,957

COLUMN STATISTICS
------------------------------------------------------------

id
  Non-Null Count : 10,994,957
  Null Count     : 0
  Min            : {A0A0-A1OS05HH-5522FYZC}
  Max            : {Z9Z9-Z9HS

In [12]:
## file dataset summary
file_summary = dataset_summary(
    PARQUET_DATASET_DIR / "file.parquet",
    force_recompute=force_recompute,
    verbose=verbose,
)


DATASET METADATA SUMMARY
Dataset Name : file.parquet
Dataset Path : ueba\data\processed\r6.2\parquet\file.parquet
Memory Size : 358.52 MB

SCHEMA
------------------------------------------------------------
id                        large_string
user                      large_string
pc                        large_string
filename                  large_string
activity                  large_string
to_removable_media        bool
from_removable_media      bool
content                   large_string
timestamp                 timestamp[us]

DATASET OVERVIEW
------------------------------------------------------------
Total Files : 517
Total Rows  : 2,014,883

COLUMN STATISTICS
------------------------------------------------------------

id
  Non-Null Count : 2,014,883
  Null Count     : 0
  Min            : {A0A0-A0EU53QX-9893ZLWH}
  Max            : {Z9Z9-Y9MF65DD-0026DQPL}

user
  Non-Null Count : 2,014,883
  Null Count     : 0
  Min            : AAB0162
  Max            : ZZO2997

pc

In [13]:
## http dataset summary
http_summary = dataset_summary(
    PARQUET_DATASET_DIR / "http.parquet",
    force_recompute=force_recompute,
    verbose=verbose,
)


DATASET METADATA SUMMARY
Dataset Name : http.parquet
Dataset Path : ueba\data\processed\r6.2\parquet\http.parquet
Memory Size : 23.59 GB

SCHEMA
------------------------------------------------------------
id                        large_string
user                      large_string
pc                        large_string
url                       large_string
activity                  large_string
content                   large_string
timestamp                 timestamp[us]

DATASET OVERVIEW
------------------------------------------------------------
Total Files : 661
Total Rows  : 117,025,216

COLUMN STATISTICS
------------------------------------------------------------

id
  Non-Null Count : 117,025,216
  Null Count     : 0
  Min            : {A0A0-A0BE95BB-9636BWMU}
  Max            : {Z9Z9-Z9KR22UZ-1017APZF}

user
  Non-Null Count : 117,025,216
  Null Count     : 0
  Min            : AAB0162
  Max            : ZZO2997

pc
  Non-Null Count : 117,025,216
  Null Count     : 0
  Mi

In [14]:
## decoy_file dataset summary
decoy_file_summary = dataset_summary(
    PARQUET_DATASET_DIR / "decoy_file.parquet",
    force_recompute=force_recompute,
    verbose=verbose,
)


DATASET METADATA SUMMARY
Dataset Name : decoy_file.parquet
Dataset Path : ueba\data\processed\r6.2\parquet\decoy_file.parquet
Memory Size : 413.15 KB

SCHEMA
------------------------------------------------------------
decoy_filename            large_string
pc                        large_string

DATASET OVERVIEW
------------------------------------------------------------
Total Files : 1
Total Rows  : 31,095

COLUMN STATISTICS
------------------------------------------------------------

decoy_filename
  Non-Null Count : 31,095
  Null Count     : 0
  Min            : C:\002O8QH7.doc
  Max            : C:\ZZSAETN6.doc

pc
  Non-Null Count : 31,095
  Null Count     : 0
  Min            : PC-0001
  Max            : PC-9996

SAMPLE (First 5 Rows)
------------------------------------------------------------
shape: (5, 2)
┌─────────────────────────┬─────────┐
│ decoy_filename          ┆ pc      │
│ ---                     ┆ ---     │
│ str                     ┆ str     │
╞═════════════════

#### Dataset summary for User-Day Aggregrated parquet files

In [None]:
for filename in DATASET_FILE_NAMES:
    if filename == "decoy_file":
        continue
    raw_dataset_path = PARQUET_DATASET_DIR / f"{filename}.parquet"
    processed_dataset_path = FEATURES_DATASET_DIR / f"{filename}_features.parquet"
    dataset_sizes = compare_dataset_sizes(
        raw_dataset_path, processed_dataset_path, verbose=verbose
    )


Dataset Size Comparison:
Raw Dataset Path      : ueba\data\processed\r6.2\parquet\logon.parquet
Processed Dataset Path: ueba\data\processed\r6.2\features\daily\logon.parquet
Raw Dataset Size      : 90.86 MB
Processed Dataset Size: 7.83 MB
Size Reduction        : 90.86 MB -> 7.83 MB (91.38% reduction)


Dataset Size Comparison:
Raw Dataset Path      : ueba\data\processed\r6.2\parquet\device.parquet
Processed Dataset Path: ueba\data\processed\r6.2\features\daily\device.parquet
Raw Dataset Size      : 43.47 MB
Processed Dataset Size: 480.34 KB
Size Reduction        : 43.47 MB -> 480.34 KB (98.92% reduction)


Dataset Size Comparison:
Raw Dataset Path      : ueba\data\processed\r6.2\parquet\email.parquet
Processed Dataset Path: ueba\data\processed\r6.2\features\daily\email.parquet
Raw Dataset Size      : 3.04 GB
Processed Dataset Size: 17.27 MB
Size Reduction        : 3.04 GB -> 17.27 MB (99.45% reduction)


Dataset Size Comparison:
Raw Dataset Path      : ueba\data\processed\r6.2\parquet