In [1]:
!pip install pycryptodome



In [2]:
# Step 1: Imports & Setup
import os
import sys
import pandas as pd
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15

# Define paths (relative to notebook location)
DATA_DIR = os.path.abspath(os.path.join(os.getcwd(), "..", "data"))
RAW_DIR = os.path.join(DATA_DIR, "raw")
INT_DIR = os.path.join(DATA_DIR, "integrity")
TRAIN_RAW = os.path.join(RAW_DIR, "UNSW_NB15_training-set.csv")
TRAIN_HASHED = os.path.join(INT_DIR, "train_hashes.csv")
DATA_SIG      = os.path.join(INT_DIR, "dataset_signature.sig")
PRIVKEY_PATH  = os.path.join(INT_DIR, "private_key.pem")
PUBKEY_PATH   = os.path.join(INT_DIR, "public_key.pem")

# Ensure integrity folder exists
os.makedirs(INT_DIR, exist_ok=True)


In [3]:
# Step 2: Load & Preview Raw Data
df_raw = pd.read_csv(TRAIN_RAW)
print(f"Raw data rows: {len(df_raw)}, columns: {df_raw.shape[1]}")
display(df_raw.head())


Raw data rows: 175341, columns: 36


Unnamed: 0,dur,proto,service,state,spkts,dpkts,sbytes,dbytes,rate,sload,...,trans_depth,response_body_len,ct_src_dport_ltm,ct_dst_sport_ltm,is_ftp_login,ct_ftp_cmd,ct_flw_http_mthd,is_sm_ips_ports,attack_cat,label
0,0.121478,tcp,-,FIN,6,4,258,172,74.08749,14158.942,...,0,0,1,1,0,0,0,0,Normal,0
1,0.649902,tcp,-,FIN,14,38,734,42014,78.47337,8395.112,...,0,0,1,1,0,0,0,0,Normal,0
2,1.623129,tcp,-,FIN,8,16,364,13186,14.170161,1572.2719,...,0,0,1,1,0,0,0,0,Normal,0
3,1.681642,tcp,ftp,FIN,12,12,628,770,13.677108,2740.179,...,0,0,1,1,1,1,0,0,Normal,0
4,0.449454,tcp,-,FIN,10,6,534,268,33.373825,8561.499,...,0,0,2,1,0,0,0,0,Normal,0


In [4]:
# Step 3: Compute SHA-256 Row Hashes
def compute_row_hash(row_values):
    hasher = SHA256.new()
    # Concatenate all values as strings
    for v in row_values:
        hasher.update(str(v).encode('utf-8'))
    return hasher.hexdigest()

print("Computing row hashes...")
df_raw['row_hash'] = df_raw.apply(lambda row: compute_row_hash(row.values), axis=1)

# Save hashed output
df_raw.to_csv(TRAIN_HASHED, index=False)
print(f"Saved hashed rows to: {TRAIN_HASHED}")

Computing row hashes...
Saved hashed rows to: c:\Users\dorai\OneDrive\Documents\Documents\SEM6\Computer Security\Project_cs\csproject\IDS-binary-classification\data\integrity\train_hashes.csv


In [5]:
# Step 4: Generate RSA Keypair
if not os.path.isfile(PRIVKEY_PATH) or not os.path.isfile(PUBKEY_PATH):
    print("Generating RSA keypair...")
    key = RSA.generate(2048)
    with open(PRIVKEY_PATH, 'wb') as f:
        f.write(key.export_key())
    with open(PUBKEY_PATH, 'wb') as f:
        f.write(key.publickey().export_key())
else:
    print("RSA keypair already exists.")

Generating RSA keypair...


In [6]:
# Step 5: Sign the Hashed Dataset
# Define signing function
from Crypto.Signature import pkcs1_15

def sign_file(input_path, private_key_path, signature_path):
    """
    Signs the file at input_path using RSA private key and writes the signature.
    """
    # Load private key
    with open(private_key_path, 'rb') as kf:
        priv_key = RSA.import_key(kf.read())

    # Compute SHA-256 digest of the file
    hasher = SHA256.new()
    with open(input_path, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hasher.update(chunk)

    # Sign the digest
    signature = pkcs1_15.new(priv_key).sign(hasher)

    # Write signature to file
    with open(signature_path, 'wb') as sf:
        sf.write(signature)

    print(f"Dataset signed. Signature saved to: {signature_path}")

# Execute signing
print("Signing dataset...")
sign_file(TRAIN_HASHED, PRIVKEY_PATH, DATA_SIG)

Signing dataset...
Dataset signed. Signature saved to: c:\Users\dorai\OneDrive\Documents\Documents\SEM6\Computer Security\Project_cs\csproject\IDS-binary-classification\data\integrity\dataset_signature.sig


In [7]:
# Step 6: Verify Integrity: Verify Integrity
def verify_row_hashes(hashed_csv, hash_column='row_hash'):
    df = pd.read_csv(hashed_csv)
    mismatches = []
    for idx, row in df.iterrows():
        expected = row[hash_column]
        actual = compute_row_hash(row.drop(labels=[hash_column]).values)
        if expected != actual:
            mismatches.append(idx)
    if mismatches:
        print(f"Row hash mismatches at indices: {mismatches}")
        return False
    print("All row hashes match.")
    return True

from Crypto.Hash import SHA256 as _SHA256

def verify_signature(file_path, public_key_path, sig_path):
    # Load public key
    with open(public_key_path, 'rb') as kf:
        pub_key = RSA.import_key(kf.read())
    # Read signature
    with open(sig_path, 'rb') as sf:
        signature = sf.read()
    # Compute digest
    hasher = _SHA256.new()
    with open(file_path, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hasher.update(chunk)
    # Verify
    try:
        pkcs1_15.new(pub_key).verify(hasher, signature)
        print("Signature verification successful.")
        return True
    except (ValueError, TypeError):
        print("Signature verification FAILED!")
        return False

print("Verifying row hashes...")
rows_ok = verify_row_hashes(TRAIN_HASHED)
print("Verifying dataset signature...")
sig_ok  = verify_signature(TRAIN_HASHED, PUBKEY_PATH, DATA_SIG)

if not (rows_ok and sig_ok):
    print("Integrity check FAILED. Exiting...")
    sys.exit(1)
print("Integrity check PASSED. You may proceed.")


Verifying row hashes...
All row hashes match.
Verifying dataset signature...
Signature verification successful.
Integrity check PASSED. You may proceed.


In [8]:
# ────────────────────────────────────────────────────────────────────────────────
# Module 1 (cont’d): Ingest & Integrity for Test Data
# ────────────────────────────────────────────────────────────────────────────────

# Define test paths
TEST_RAW    = os.path.join(RAW_DIR,  "UNSW_NB15_testing-set.csv")
TEST_HASHED = os.path.join(INT_DIR, "test_hashes.csv")
TEST_SIG    = os.path.join(INT_DIR, "test_dataset_signature.sig")

# 1) Load test CSV and preview
df_test = pd.read_csv(TEST_RAW)
print(f"Test raw data rows: {len(df_test)}, cols: {df_test.shape[1]}")
display(df_test.head())

Test raw data rows: 82332, cols: 36


Unnamed: 0,dur,proto,service,state,spkts,dpkts,sbytes,dbytes,rate,sload,...,trans_depth,response_body_len,ct_src_dport_ltm,ct_dst_sport_ltm,is_ftp_login,ct_ftp_cmd,ct_flw_http_mthd,is_sm_ips_ports,attack_cat,label
0,1.1e-05,udp,-,INT,2,0,496,0,90909.09,180363630.0,...,0,0,1,1,0,0,0,0,Normal,0
1,8e-06,udp,-,INT,2,0,1762,0,125000.0,881000000.0,...,0,0,1,1,0,0,0,0,Normal,0
2,5e-06,udp,-,INT,2,0,1068,0,200000.0,854400000.0,...,0,0,1,1,0,0,0,0,Normal,0
3,6e-06,udp,-,INT,2,0,900,0,166666.66,600000000.0,...,0,0,2,1,0,0,0,0,Normal,0
4,1e-05,udp,-,INT,2,0,2126,0,100000.0,850400000.0,...,0,0,2,1,0,0,0,0,Normal,0


In [9]:
# 2) Compute SHA-256 row hashes
print("Computing SHA-256 row hashes for test set...")
df_test['row_hash'] = df_test.apply(lambda row: compute_row_hash(row.values), axis=1)


Computing SHA-256 row hashes for test set...


In [10]:
# 3) Save hashed test CSV
df_test.to_csv(TEST_HASHED, index=False)
print(f"Saved test hashes to: {TEST_HASHED}")


Saved test hashes to: c:\Users\dorai\OneDrive\Documents\Documents\SEM6\Computer Security\Project_cs\csproject\IDS-binary-classification\data\integrity\test_hashes.csv


In [11]:

# 4) Sign the hashed test dataset
print("Signing test dataset...")
sign_file(TEST_HASHED, PRIVKEY_PATH, TEST_SIG)


Signing test dataset...
Dataset signed. Signature saved to: c:\Users\dorai\OneDrive\Documents\Documents\SEM6\Computer Security\Project_cs\csproject\IDS-binary-classification\data\integrity\test_dataset_signature.sig


In [12]:

# 5) Verify test row hashes
print("Verifying test row hashes...")
if not verify_row_hashes(TEST_HASHED):
    print("Test-row hash mismatch! Halting.")
    sys.exit(1)


Verifying test row hashes...
All row hashes match.


In [13]:

# 6) Verify test dataset signature
print("Verifying test dataset signature...")
if not verify_signature(TEST_HASHED, PUBKEY_PATH, TEST_SIG):
    print("Test-signature verification FAILED! Halting.")
    sys.exit(1)

print("Test data integrity check PASSED.")


Verifying test dataset signature...
Signature verification successful.
Test data integrity check PASSED.
