# Data Integrity Check 

This notebook validates that the final `feature_engineered` table does not contain
invalid raw flow records.

## Goals:
1. Identify and remove rows where any canonical raw field is NULL, Negative or 0 (which should not occur in valid flow data)

In [85]:
!pip -q install "PyAthena[SQLAlchemy]" sqlalchemy s3fs

In [86]:
import boto3
import sagemaker
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text

# Display settings
pd.set_option("display.max_rows", 100)
pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)
pd.set_option("display.width", None)

## Connect to Athena

In [87]:
sess = sagemaker.Session()
region = boto3.Session().region_name

results_bucket = sess.default_bucket()
athena_results_path = f"s3://{results_bucket}/athena/staging/"

database_name = "aai540_eda"

engine = create_engine(
    f"awsathena+rest://@athena.{region}.amazonaws.com:443/{database_name}",
    connect_args={"s3_staging_dir": athena_results_path, "region_name": region},
)
print("Region:", region)
print("Athena results:", athena_results_path)

Region: us-east-1
Athena results: s3://sagemaker-us-east-1-933747558592/athena/staging/


In [88]:
# Helper functions for queries
def exec_ddl(sql: str):
    with engine.begin() as conn:
        conn.execute(text(sql))

def read_sql(sql: str) -> pd.DataFrame:
    return pd.read_sql(sql, engine)

In [89]:
# Helper function to clean S3 paths before creating tables
import subprocess

def clean_s3_path(s3_path: str):
    """Delete all files at the given S3 path"""
    try:
        result = subprocess.run(
            ['aws', 's3', 'rm', s3_path, '--recursive'],
            capture_output=True,
            text=True
        )
        if result.returncode == 0:
            print(f"✓ Cleaned {s3_path}")
        else:
            print(f"Note: {s3_path} may not exist yet (this is OK for first run)")
    except Exception as e:
        print(f"Note: Could not clean {s3_path}: {e}")

## Profile counts: NULL / negative / zero (per feature)

This creates a new table version: `merged_canonical_normalized_v1`

In [91]:
# drop table if it already exists
exec_ddl(f"DROP TABLE IF EXISTS {database_name}.merged_canonical_normalized_v1")

# clean S3 location before creating table
clean_s3_path(f"s3://{results_bucket}/aai540/processed/merged_canonical_normalized_v1/")

# create table pkt_rate
exec_ddl(f"""
CREATE TABLE {database_name}.merged_canonical_normalized_v1
WITH (
  format = 'PARQUET',
  external_location = 's3://{results_bucket}/aai540/processed/merged_canonical_normalized_v1/',
  parquet_compression = 'SNAPPY'
) AS
SELECT
  *,
  CASE
    WHEN duration IS NULL OR duration <= 0 THEN NULL
    ELSE CAST(pkt_total AS DOUBLE) / CAST(duration AS DOUBLE)
  END AS pkt_rate
FROM {database_name}.merged_canonical_normalized
""")

✓ Cleaned s3://sagemaker-us-east-1-933747558592/aai540/processed/merged_canonical_normalized_v1/


In [92]:
read_sql(f"""
SELECT duration, pkt_total, pkt_rate
FROM {database_name}.merged_canonical_normalized_v1
pkt_total
Limit 25
""")

Unnamed: 0,duration,pkt_total,pkt_rate
0,0.0,1,
1,0.0,1,
2,0.0,1,
3,0.0,1,
4,2e-06,2,1000000.0
5,0.0,1,
6,0.0,1,
7,0.0,1,
8,0.0,1,
9,0.000143,3,20979.020979


### Sanity check

In [93]:
read_sql(f"""
SELECT
  COUNT(*) AS rows_total,
  SUM(CASE WHEN duration IS NULL OR duration <= 0 THEN 1 ELSE 0 END) AS bad_duration_rows,
  SUM(CASE WHEN (duration IS NULL OR duration <= 0) AND pkt_rate IS NOT NULL THEN 1 ELSE 0 END) AS pkt_rate_should_be_null_but_isnt
FROM {database_name}.merged_canonical_normalized_v1
""")


Unnamed: 0,rows_total,bad_duration_rows,pkt_rate_should_be_null_but_isnt
0,26708942,5325854,0


This creates a new table version: `merged_canonical_normalized_v3`

In [95]:
# drop v3 if it already exists
exec_ddl(f"DROP TABLE IF EXISTS {database_name}.merged_canonical_normalized_v3")

# clean S3 location before creating table
clean_s3_path(f"s3://{results_bucket}/aai540/processed/merged_canonical_normalized_v3/")

# create v3 with bytes_per_pkt (built on v2 so prior features are retained)
exec_ddl(f"""
CREATE TABLE {database_name}.merged_canonical_normalized_v3
WITH (
  format = 'PARQUET',
  external_location = 's3://{results_bucket}/aai540/processed/merged_canonical_normalized_v3/',
  parquet_compression = 'SNAPPY'
) AS
SELECT
  *,
  CASE
    WHEN pkt_total IS NULL OR pkt_total <= 0 THEN NULL
    ELSE CAST(bytes_total AS DOUBLE) / CAST(pkt_total AS DOUBLE)
  END AS bytes_per_pkt
FROM {database_name}.merged_canonical_normalized_v2
""")

✓ Cleaned s3://sagemaker-us-east-1-933747558592/aai540/processed/merged_canonical_normalized_v3/


In [96]:
read_sql(f"""
SELECT duration, pkt_total, bytes_total, bytes_per_pkt
FROM {database_name}.merged_canonical_normalized_v3
WHERE bytes_total > 0
LIMIT 25
""")

Unnamed: 0,duration,pkt_total,bytes_total,bytes_per_pkt
0,1.073386,30,11748,391.6
1,0.029492,106,6676,62.981132
2,0.016852,90,29262,325.133333
3,0.040876,146,65202,446.589041
4,0.025854,110,39014,354.672727
5,0.000997,4,324,81.0
6,0.026932,106,38380,362.075472
7,0.510419,58,7976,137.517241
8,0.036839,146,65202,446.589041
9,0.024871,110,39014,354.672727


### Sanity check

In [97]:
read_sql(f"""
SELECT
  COUNT(*) AS rows_total,
  SUM(CASE WHEN pkt_total IS NULL OR pkt_total <= 0 THEN 1 ELSE 0 END) AS bad_pkt_total_rows,
  SUM(CASE WHEN (pkt_total IS NULL OR pkt_total <= 0) AND bytes_per_pkt IS NOT NULL THEN 1 ELSE 0 END) AS bytes_per_pkt_should_be_null_but_isnt
FROM {database_name}.merged_canonical_normalized_v3
""")

Unnamed: 0,rows_total,bad_pkt_total_rows,bytes_per_pkt_should_be_null_but_isnt
0,26708942,115260,0


This creates a new table version: `merged_canonical_normalized_v4`

In [98]:
# drop v4 if it already exists
exec_ddl(f"DROP TABLE IF EXISTS {database_name}.merged_canonical_normalized_v4")

# clean S3 location before creating table
clean_s3_path(f"s3://{results_bucket}/aai540/processed/merged_canonical_normalized_v4/")

# create v4 with pkt_ratio (built on v3 so prior features are retained)
exec_ddl(f"""
CREATE TABLE {database_name}.merged_canonical_normalized_v4
WITH (
  format = 'PARQUET',
  external_location = 's3://{results_bucket}/aai540/processed/merged_canonical_normalized_v4/',
  parquet_compression = 'SNAPPY'
) AS
SELECT
  *,
  CASE
    WHEN pkt_fwd IS NULL OR pkt_fwd < 0 THEN NULL
    WHEN pkt_bwd IS NULL OR pkt_bwd < 0 THEN NULL
    ELSE CAST(pkt_fwd AS DOUBLE) / (CAST(pkt_bwd AS DOUBLE) + 1.0)
  END AS pkt_ratio
FROM {database_name}.merged_canonical_normalized_v3
""")

✓ Cleaned s3://sagemaker-us-east-1-933747558592/aai540/processed/merged_canonical_normalized_v4/


In [99]:
read_sql(f"""
SELECT duration, pkt_total, pkt_fwd, pkt_bwd, pkt_ratio
FROM {database_name}.merged_canonical_normalized_v4
LIMIT 25
""")

Unnamed: 0,duration,pkt_total,pkt_fwd,pkt_bwd,pkt_ratio
0,60.825176,5,3,2,1.0
1,0.0,1,1,0,1.0
2,60.661508,5,3,2,1.0
3,60.820598,5,3,2,1.0
4,60.061714,5,3,2,1.0
5,61.078555,23,7,16,0.411765
6,61.044184,5,3,2,1.0
7,60.094434,5,3,2,1.0
8,10.643116,8,5,3,1.25
9,0.001926,2,1,1,0.5


This creates a new table version: `merged_canonical_normalized_v5`

In [100]:
# drop v5 if it already exists
exec_ddl(f"DROP TABLE IF EXISTS {database_name}.merged_canonical_normalized_v5")

# clean S3 location before creating table
clean_s3_path(f"s3://{results_bucket}/aai540/processed/merged_canonical_normalized_v5/")

# Create v5 with byte_ratio (built on v4 so prior features are retained)
exec_ddl(f"""
CREATE TABLE {database_name}.merged_canonical_normalized_v5
WITH (
  format = 'PARQUET',
  external_location = 's3://{results_bucket}/aai540/processed/merged_canonical_normalized_v5/',
  parquet_compression = 'SNAPPY'
) AS
SELECT
  *,
  CASE
    WHEN bytes_fwd IS NULL OR bytes_fwd < 0 THEN NULL
    WHEN bytes_bwd IS NULL OR bytes_bwd < 0 THEN NULL
    ELSE CAST(bytes_fwd AS DOUBLE) / (CAST(bytes_bwd AS DOUBLE) + 1.0)
  END AS byte_ratio
FROM {database_name}.merged_canonical_normalized_v4
""")

✓ Cleaned s3://sagemaker-us-east-1-933747558592/aai540/processed/merged_canonical_normalized_v5/


In [101]:
read_sql(f"""
SELECT
  bytes_fwd,
  bytes_bwd,
  byte_ratio
FROM {database_name}.merged_canonical_normalized_v5
LIMIT 25
""")

Unnamed: 0,bytes_fwd,bytes_bwd,byte_ratio
0,43,0,43.0
1,43,0,43.0
2,0,0,0.0
3,6,6,0.857143
4,148,424,0.348235
5,72,114,0.626087
6,72,104,0.685714
7,90,172,0.520231
8,0,0,0.0
9,84,190,0.439791


## Finalize Feature Engineering Table

In [103]:
# drop final table if it already exists
exec_ddl(f"DROP TABLE IF EXISTS {database_name}.feature_engineered")

# clean S3 location before creating table
clean_s3_path(f"s3://{results_bucket}/aai540/processed/feature_engineered/")

# create final feature_engineered table from v5
exec_ddl(f"""
CREATE TABLE {database_name}.feature_engineered
WITH (
  format = 'PARQUET',
  external_location = 's3://{results_bucket}/aai540/processed/feature_engineered/',
  parquet_compression = 'SNAPPY'
) AS
SELECT *
FROM {database_name}.merged_canonical_normalized_v5
""")


✓ Cleaned s3://sagemaker-us-east-1-933747558592/aai540/processed/feature_engineered/


In [104]:
read_sql(f"""
SELECT
  pkt_rate,
  byte_rate,
  bytes_per_pkt,
  pkt_ratio,
  byte_ratio
FROM {database_name}.feature_engineered
LIMIT 10
""")

Unnamed: 0,pkt_rate,byte_rate,bytes_per_pkt,pkt_ratio,byte_ratio
0,,,0.0,1.0,0.0
1,7092.198582,0.0,0.0,1.0,0.0
2,,,0.0,1.0,0.0
3,203.894383,8767.458,43.0,0.5,0.9772727
4,1.971244,491374000.0,249271023.0,0.75,1495626000.0
5,77.697663,2680.569,34.5,1.2,2.203704
6,18181.818182,0.0,0.0,1.0,0.0
7,,,0.0,0.0,0.0
8,0.065175,0.0,0.0,0.666667,0.0
9,0.065496,0.0,0.0,0.666667,0.0


In [105]:
profile_invalid = read_sql(f"""
SELECT
  COUNT(*) AS total_rows,

  -- NULL counts
  SUM(CASE WHEN duration  IS NULL THEN 1 ELSE 0 END) AS duration_null,
  SUM(CASE WHEN pkt_total IS NULL THEN 1 ELSE 0 END) AS pkt_total_null,
  SUM(CASE WHEN bytes_total IS NULL THEN 1 ELSE 0 END) AS bytes_total_null,
  SUM(CASE WHEN pkt_fwd   IS NULL THEN 1 ELSE 0 END) AS pkt_fwd_null,
  SUM(CASE WHEN pkt_bwd   IS NULL THEN 1 ELSE 0 END) AS pkt_bwd_null,
  SUM(CASE WHEN bytes_fwd IS NULL THEN 1 ELSE 0 END) AS bytes_fwd_null,
  SUM(CASE WHEN bytes_bwd IS NULL THEN 1 ELSE 0 END) AS bytes_bwd_null,

  -- Negative counts
  SUM(CASE WHEN duration  < 0 THEN 1 ELSE 0 END) AS duration_neg,
  SUM(CASE WHEN pkt_total < 0 THEN 1 ELSE 0 END) AS pkt_total_neg,
  SUM(CASE WHEN bytes_total < 0 THEN 1 ELSE 0 END) AS bytes_total_neg,
  SUM(CASE WHEN pkt_fwd   < 0 THEN 1 ELSE 0 END) AS pkt_fwd_neg,
  SUM(CASE WHEN pkt_bwd   < 0 THEN 1 ELSE 0 END) AS pkt_bwd_neg,
  SUM(CASE WHEN bytes_fwd < 0 THEN 1 ELSE 0 END) AS bytes_fwd_neg,
  SUM(CASE WHEN bytes_bwd < 0 THEN 1 ELSE 0 END) AS bytes_bwd_neg,

  -- Zero counts
  SUM(CASE WHEN duration  = 0 THEN 1 ELSE 0 END) AS duration_zero,
  SUM(CASE WHEN pkt_total = 0 THEN 1 ELSE 0 END) AS pkt_total_zero,
  SUM(CASE WHEN bytes_total = 0 THEN 1 ELSE 0 END) AS bytes_total_zero

FROM {database_name}.feature_engineered
""")
profile_invalid

Unnamed: 0,total_rows,duration_null,pkt_total_null,bytes_total_null,pkt_fwd_null,pkt_bwd_null,bytes_fwd_null,bytes_bwd_null,duration_neg,pkt_total_neg,bytes_total_neg,pkt_fwd_neg,pkt_bwd_neg,bytes_fwd_neg,bytes_bwd_neg,duration_zero,pkt_total_zero,bytes_total_zero
0,26708942,0,0,0,0,0,0,0,115,0,0,0,0,0,0,5325739,115260,15981998


## Count rows to be dropped

In [106]:
invalid_row_summary = read_sql(f"""
SELECT
  COUNT(*) AS total_rows,
  SUM(
    CASE WHEN
      -- NULL raw features
      duration IS NULL OR pkt_total IS NULL OR bytes_total IS NULL OR
      pkt_fwd IS NULL OR pkt_bwd IS NULL OR bytes_fwd IS NULL OR bytes_bwd IS NULL OR

      -- negative raw features
      duration < 0 OR pkt_total < 0 OR bytes_total < 0 OR
      pkt_fwd < 0 OR pkt_bwd < 0 OR bytes_fwd < 0 OR bytes_bwd < 0 OR

      -- zeros 
      duration = 0 OR pkt_total = 0
    THEN 1 ELSE 0 END
  ) AS rows_to_drop
FROM {database_name}.feature_engineered
""")

invalid_row_summary


Unnamed: 0,total_rows,rows_to_drop
0,26708942,5325854


## Inspect a few invalid rows

In [107]:
invalid_rows_sample = read_sql(f"""
SELECT
  duration, pkt_total, bytes_total, pkt_fwd, pkt_bwd, bytes_fwd, bytes_bwd,
  pkt_rate, byte_rate, bytes_per_pkt, pkt_ratio, byte_ratio,
  label, source_dataset
FROM {database_name}.feature_engineered
WHERE
  duration IS NULL OR pkt_total IS NULL OR bytes_total IS NULL OR
  pkt_fwd IS NULL OR pkt_bwd IS NULL OR bytes_fwd IS NULL OR bytes_bwd IS NULL OR
  duration < 0 OR pkt_total < 0 OR bytes_total < 0 OR
  pkt_fwd < 0 OR pkt_bwd < 0 OR bytes_fwd < 0 OR bytes_bwd < 0 OR
  duration = 0 OR pkt_total = 0
LIMIT 20
""")

invalid_rows_sample


Unnamed: 0,duration,pkt_total,bytes_total,pkt_fwd,pkt_bwd,bytes_fwd,bytes_bwd,pkt_rate,byte_rate,bytes_per_pkt,pkt_ratio,byte_ratio,label,source_dataset
0,0.0,1,0,1,0,0,0,,,0.0,1.0,0.0,1,TON_IoT
1,0.0,1,0,1,0,0,0,,,0.0,1.0,0.0,1,TON_IoT
2,0.0,1,0,0,1,0,0,,,0.0,0.0,0.0,1,TON_IoT
3,0.0,1,0,1,0,0,0,,,0.0,1.0,0.0,1,TON_IoT
4,0.0,1,0,1,0,0,0,,,0.0,1.0,0.0,1,TON_IoT
5,0.0,1,0,0,1,0,0,,,0.0,0.0,0.0,1,TON_IoT
6,0.0,1,0,0,1,0,0,,,0.0,0.0,0.0,1,TON_IoT
7,0.0,1,0,0,1,0,0,,,0.0,0.0,0.0,1,TON_IoT
8,0.0,1,0,1,0,0,0,,,0.0,1.0,0.0,1,TON_IoT
9,0.0,1,0,1,0,0,0,,,0.0,1.0,0.0,1,TON_IoT


## Create cleaned table (drop ALL invalid rows: NULL, negative, or zero)

In [108]:
# create a cleaned table
exec_ddl(f"DROP TABLE IF EXISTS {database_name}.feature_engineered_cleaned")

exec_ddl(f"""
CREATE TABLE {database_name}.feature_engineered_cleaned
WITH (
  format = 'PARQUET',
  external_location = 's3://{results_bucket}/aai540/processed/feature_engineered_cleaned/',
  parquet_compression = 'SNAPPY'
) AS
SELECT *
FROM {database_name}.feature_engineered
WHERE
  -- Raw canonical must exist
  duration IS NOT NULL
  AND pkt_total IS NOT NULL
  AND bytes_total IS NOT NULL
  AND pkt_fwd IS NOT NULL
  AND pkt_bwd IS NOT NULL
  AND bytes_fwd IS NOT NULL
  AND bytes_bwd IS NOT NULL

  -- Raw canonical must be non-negative
  AND duration >= 0
  AND pkt_total >= 0
  AND bytes_total >= 0
  AND pkt_fwd >= 0
  AND pkt_bwd >= 0
  AND bytes_fwd >= 0
  AND bytes_bwd >= 0

  -- Drop zero denominators
  AND duration > 0
  AND pkt_total > 0
""")

## Validate

In [109]:
read_sql(f"""
SELECT
  (SELECT COUNT(*) FROM {database_name}.feature_engineered) AS before_rows,
  (SELECT COUNT(*) FROM {database_name}.feature_engineered_cleaned) AS after_rows
""")

Unnamed: 0,before_rows,after_rows
0,26708942,21383088


In [110]:
label_distribution = read_sql(f"""
SELECT
  label,
  COUNT(*) AS row_count,
  COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () AS percentage
FROM {database_name}.feature_engineered_cleaned
GROUP BY label
ORDER BY label
""")

label_distribution

Unnamed: 0,label,row_count,percentage
0,0,4852041,22.691021
1,1,16531047,77.308979


In [111]:
attack_type_distribution = read_sql(f"""
SELECT
  attack_category,
  COUNT(*) AS row_count,
  COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () AS percentage
FROM {database_name}.feature_engineered_cleaned
GROUP BY attack_category
ORDER BY row_count DESC
""")

attack_type_distribution

Unnamed: 0,attack_category,row_count,percentage
0,DoS/DDoS,9678799,45.263804
1,Normal,4852041,22.691021
2,Web Attack,2095484,9.799726
3,Reconnaissance,1862138,8.708462
4,Brute Force,1643527,7.686107
5,Backdoor,510444,2.387139
6,Injection,451709,2.112459
7,Generic Malware,215651,1.008512
8,Exploits,46032,0.215273
9,Fuzzing,24226,0.113295
