In [1]:
#needed libraries
import os, gzip, json, uuid
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import pyarrow as pa
from collections import Counter
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
import tensorflow 
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras import regularizers
import warnings
warnings.filterwarnings('ignore')

**DATA LOADING**

In [2]:
log_dir = "../extracted/flaws_cloudtrail_logs"
output_dir = "cloudtrail_parquet"
os.makedirs(output_dir, exist_ok=True)

batch_size = 50_000
records_batch = []

DROP_PREFIXES = (
                "requestParameters.",
                "responseElements.",
                "additionalEventData.")

def sanitize_for_parquet(df):
    for col in df.columns:
        non_null = df[col].dropna()
        if not non_null.empty and isinstance(non_null.iloc[0], (dict, list)):
            df[col] = df[col].astype(str)
    return df

for file in sorted(os.listdir(log_dir)):
    if file.endswith(".json.gz"):
        file_path = os.path.join(log_dir, file)

        #with gzip.open(file_path, "rb") as f:
            #raw = f.read().decode("utf-8", errors="replace")
            #data = json.loads(raw)
        
        with gzip.open(file_path, "rt", encoding="utf-8", errors="replace") as f:
            data = json.load(f)

            for record in data.get("Records", []):
                records_batch.append(record)

                if len(records_batch) >= batch_size:
                    df = pd.json_normalize(records_batch)
                    
                    df = df.loc[:, [c for c in df.columns if not c.startswith(DROP_PREFIXES)]]
                    df = sanitize_for_parquet(df)

                    out_file = os.path.join(
                        output_dir,
                        f"part-{uuid.uuid4().hex}.parquet"
                    )

                    df.to_parquet(out_file, engine="pyarrow", index=False)
                    records_batch = []

# write remaining
if records_batch:
    df = pd.json_normalize(records_batch)

    df = df.loc[:, [c for c in df.columns if not c.startswith(DROP_PREFIXES)]]

    df = sanitize_for_parquet(df)

    out_file = os.path.join(
        output_dir,
        f"part-{uuid.uuid4().hex}.parquet"
    )
    df.to_parquet(out_file, engine="pyarrow", index=False)


**Data Preprocessing**

In [4]:
parquet_dir = "cloudtrail_parquet"      # Folder with original Parquet files
output_dir = "clean_cloudtrail_parquet" # Folder for cleaned files
os.makedirs(output_dir, exist_ok=True)

# 'userIdentity.arn' 'requestParameters','userIdentity.userName','userIdentity.accessKeyId

columns_to_drop = ['serviceEventDetails.snapshotId','managementEvent','readOnly','vpcEndpointId',
                   'userIdentity.sessionContext.sessionIssuer.userName','userIdentity.sessionContext.sessionIssuer.accountId',
                   'userIdentity.sessionContext.sessionIssuer.principalId','apiVersion','userIdentity.sessionContext.attributes.creationDate','userIdentity.sessionContext.sessionIssuer.arn',
                   'userIdentity.sessionContext.sessionIssuer.type','sharedEventID','resources','userIdentity.accountId',
                   'userIdentity.principalId','eventVersion','responseElements','eventCategory',
                   'requestID','eventID','recipientAccountId']

for file in sorted(os.listdir(parquet_dir)):
    if file.endswith(".parquet"):
        input_path = os.path.join(parquet_dir, file)
        output_path = os.path.join(output_dir, file)

        # Read Parquet file in chunks
        table = pq.read_table(input_path)
        df = table.to_pandas()

        # Drop unwanted columns
        df = df.drop(columns=[col for col in columns_to_drop if col in df.columns])

        # Write back to Parquet
        df.to_parquet(output_path, engine="pyarrow", index=False)

**Feature Engineering**

In [5]:
input_dir = "clean_cloudtrail_parquet"
output_dir = "features_parquet"
os.makedirs(output_dir, exist_ok=True)

presence_cols = ['userIdentity.sessionContext.attributes.mfaAuthenticated']

for file in sorted(os.listdir(input_dir)):
    if file.endswith(".parquet"):
        df = pq.read_table(os.path.join(input_dir, file)).to_pandas()

        # ---- TIME FEATURES ----
        df['eventTime'] = pd.to_datetime(df['eventTime'], errors='coerce')
        df['hour'] = df['eventTime'].dt.hour
        df['day_of_week'] = df['eventTime'].dt.dayofweek
        df['isWeekend'] = df['day_of_week'].isin([5, 6]).astype(int)
        df['isNight'] = df['hour'].between(0, 6).astype(int)

        # ---- ERROR FEATURES ----
        df['errorCode'] = df['errorCode'].fillna('None')
        df['hasError'] = (df['errorCode'] != 'None').astype(int)

        # ---- PRESENCE FEATURES ----
        for col in presence_cols:
            flag_name = f'has_{col.split(".")[-1]}'
            if col in df.columns:
                df[flag_name] = df[col].notna().astype(int)
            else:
                df[flag_name] = 0

        # ---- CLEANUP ----
        df = df.drop(columns=['eventTime', 
                              'errorMessage',
                              'errorCode',
                              'userIdentity.sessionContext.attributes.mfaAuthenticated'], errors='ignore')

        # ---- SAVE ----
        df.to_parquet(os.path.join(output_dir, file), index=False)


In [None]:
input_dir = "features_parquet"

freq_cols = [
    'userAgent',
    'sourceIPAddress',
    'eventName',
    'eventSource'
]

global_counts = {col: Counter() for col in freq_cols}
total_rows = 0

for file in os.listdir(input_dir):
    if file.endswith(".parquet"):
        df = pd.read_parquet(os.path.join(input_dir, file))
        total_rows += len(df)

        for col in freq_cols:
            if col in df.columns:
                global_counts[col].update(df[col].dropna().astype(str))

# Convert counts → frequencies
freq_maps = {
    col: {k: v / total_rows for k, v in counter.items()}
    for col, counter in global_counts.items()
}


In [None]:
output_dir = "clean_features_parquet"
os.makedirs(output_dir, exist_ok=True)

categorical_cols = ['eventType','awsRegion','userIdentity.type','userIdentity.invokedBy']
for file in sorted(os.listdir(input_dir)):
    if file.endswith(".parquet"):
        df = pd.read_parquet(os.path.join(input_dir, file))

        # ---- ONE-HOT ----
        df = pd.get_dummies(df, columns = categorical_cols, drop_first=False)

        # ---- FREQUENCY ENCODING ----
        for col in freq_cols:
            if col in df.columns:
                df[col + "_freq"] = (
                    df[col]
                    .astype(str)
                    .map(freq_maps[col])
                    .fillna(0)
                )
                df.drop(columns=[col], inplace=True)


        # ---- SAVE ----
        df.to_parquet(os.path.join(output_dir, file), index=False)

**Modelling**

Isolation Forest

In [None]:
input_dir = "clean_features_parquet"

dfs = []

for file in sorted(os.listdir(input_dir)):
    if file.endswith(".parquet"):
        df = pd.read_parquet(os.path.join(input_dir, file))
        dfs.append(df)

data = pd.concat(dfs, ignore_index=True)
print(f"Dataset size: {data.shape}")

Dataset size: (1939207, 53)


In [None]:
# isolation forest
iso = IsolationForest(n_estimators=300, 
                      max_samples='auto',
                      contamination=0.01, 
                      random_state=42, 
                      n_jobs=-1)

iso.fit(data)

In [None]:
iso_scores = iso.decision_function(data)
iso_pred = iso.predict(data)

In [None]:
# convert predictions
data['iso_anomaly'] = (iso_pred == -1).astype(int)

In [None]:
data['iso_anomaly'].value_counts(normalize=True)

iso_anomaly
0    0.990006
1    0.009994
Name: proportion, dtype: float64

In [None]:
data['iso_score'] = iso_scores