# 29.2.2 **Capstone Project Step 8: Scale Your ML Prototype**
---
**Bruce Walker, UCSD MLE/AI Bootcamp**

In this notebook, I build from the work from assignment 29.2.1 and refine my chosen model to work on larger datasets.

I will implement my chosen model using a Python class. The class will be able to processing large files of data by reading the input, pre-processing the data, making predictions, and writing the predictions to a file in chunks. At this time I have chosen a chunk size of five million records; this should keep the amount of RAM needed by any one instance of the class to a reasonable size allowing multiple instances of the class to be run simultaneously on a given system.

As I move to deployment, I will look for additional scaling techniques to further improve throughput for even larger amounts of data.

---

Original Dataset source: https://www.kaggle.com/datasets/agungpambudi/network-malware-detection-connection-analysis/data?select=CTU-IoT-Malware-Capture-1-1conn.log.labeled.csv

Data files used in this notebook: https://github.com/bdwalker1/UCSD_MLE_Bootcamp_Capstone/tree/master/data/MalwareDetectionInNetworkTrafficData/combined

## Set-up Helper Classes/Functions

### Imports

In [1]:
import os
from pathlib import Path
import time
import numpy as np
import datetime as dt
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report, accuracy_score
from sklearn.metrics import f1_score as sklf1
from sklearn.metrics import PrecisionRecallDisplay as PRDisp


### Some Global Variable Definitions

In [2]:
notebook_random_state = 42

### Helper Classes and Functions

In [3]:
class SimpleTimer:
    
    def __init__(self):
        self.reset()

    def __time_diff(self, s, e):
        return abs(e - s)

    def reset(self):
        self.__running = False
        self.__start = -1
        self.__end = -1
        self.__laptimes = []

    def start(self):
        if self.__running:
            raise Exception("Timer already running. Must stop before starting.")
        else:
            self.reset()
            self.__start = time.time()
            self.__running = True

    def stop(self, is_last_lap=True):
        if not self.__running:
            raise Exception("Timer not running. Must start before stopping.")
        else:
            if is_last_lap:
                _ = self.laptime()
            self.__end = time.time()
            self.__running = False
            return self.__time_diff(self.__start, self.__end)

    def elapsed(self):
        if self.__running:
            return self.__time_diff(self.__start, time.time())
        else:
            return self.__time_diff(self.__start, self.__end)

    def laptime(self):
        if not self.__running:
            raise Exception("Timer not running. Must start before getting lap times.")
        else:
            self.__laptimes.append(time.time())
            elapsed_time = 0
            if len(self.__laptimes) == 1:
                elapsed_time = self.__time_diff(self.__start, self.__laptimes[0])
            else:
                elapsed_time = self.__time_diff(self.__laptimes[-2], self.__laptimes[-1])
            return elapsed_time

    def show_laptimes(self):
        laptimes = self.__laptimes
        lap_count = len(laptimes)
        if lap_count > 0:
            for n in range(0,lap_count):
                span = 0
                if n == 0:
                    span = self.__time_diff(self.__start, laptimes[n])
                else:
                    span = self.__time_diff(laptimes[n-1], laptimes[n])
                print(f"Lap {n+1}: {self.sts(span)}")
            if not self.__running:
                if self.__time_diff(laptimes[-1], self.__end) > .0001:
                    span = self.__time_diff(laptimes[-1], self.__end)
                    print(f"After Last Lap: {self.sts(span)}")
            
    def sts(self, seconds): # Span to String
        
        # Extract days, hours, minutes, and seconds
        days, remainder = divmod(seconds, 86400)
        hours, remainder = divmod(remainder, 3600)
        minutes, seconds = divmod(remainder, 60)
    
        output = ""
        if days > 0:
            output += f"{int(days)}d "
        if (days > 0) or (hours > 0):
            output += f"{int(hours)}h "
        if (days > 0) or (hours > 0) or (minutes > 0):
            output += f"{int(minutes)}m "
        output += f"{seconds:.3f}s"
        return output.strip()


## Prepare Needed Data Files

### Prepare a Training Dataset File

In [None]:
# Pull subset of data
training_df = full_df.sample(n=2500000, random_state=notebook_random_state)
del full_df

In [None]:
training_df.shape

#### Remove columns we won't be using

In [None]:
# Dropping detailed label as we are only going to predict malicious or not...
training_df.drop("detailed-label",axis=1,inplace=True)

In [None]:
# Dropping filename column as it won't be predictive
training_df.drop("filename", axis=1, inplace=True)

In [None]:
# Dropping IP columns as we want model to train on traffic patterns independant of originating or responding host...')
training_df.drop(['id.orig_h','id.resp_h'], axis=1, inplace=True)


In [None]:
# Dropping duration, missed_bytes, orig_bytes, and resp_bytes columns
# because they are not populated for a majority of the dataset...
training_df.drop(['duration','missed_bytes','orig_bytes','resp_bytes',], axis=1, inplace=True)


In [None]:
print('Dropping any columns with only one value...')
for col in training_df:
     if (training_df[col].unique().shape[0] == 1):
         training_df.drop([col], axis=1, inplace=True)
         print(f'\t{col} dropped.')


In [None]:
print('Dropping any columns with all unique values (except date_time)...')
for col in training_df:
    if (col != 'date_time'):
        if (training_df[col].unique().shape[0] == training_df[col].shape[0]):
            training_df.drop([col], axis=1, inplace=True)
            print(f'\t{col} dropped.')


#### Transform some data

In [None]:
# Pull potentially useful info from date stamp
print('\tParsing date stamp...')
training_df["day_of_week"] = training_df["date_time"].dt.dayofweek
training_df["day_of_week"] = training_df["day_of_week"].astype(np.uint8)
training_df["day_of_month"] = training_df["date_time"].dt.day
training_df["day_of_month"] = training_df["day_of_month"].astype(np.uint8)
training_df["hour_of_day"] =  training_df["date_time"].dt.hour
training_df["hour_of_day"] =  training_df["hour_of_day"].astype(np.uint8)

# Drop the raw timestamp and date/time columns
training_df.drop("date_time",axis=1,inplace=True)

In [None]:
print('Creating binary label target...')
training_df["target"] = training_df["label"].apply(lambda x : 0 if x=="Benign" else 1)
training_df.drop("label",axis=1,inplace=True)


#### Remove outliers based on orig_ip_bytes


In [None]:
training_df[['orig_ip_bytes']].boxplot()
plt.show()

pct = 0.95
cut_off = training_df['orig_ip_bytes'].quantile(pct)
print(f'{(pct*100):.1f} quantile Cut-off: {cut_off}')

orig_count = training_df.shape[0]
records_removed = sum(training_df['orig_ip_bytes'] > cut_off)
keeper_mask = training_df['orig_ip_bytes'] <= cut_off
training_df = training_df[keeper_mask]
print(f"{records_removed} outliers eliminated.")
pct_kept = training_df.shape[0] / orig_count
print(f'Percent of records kept: {pct_kept}')

del orig_count, records_removed, keeper_mask, pct_kept

training_df[['orig_ip_bytes']].boxplot()
plt.show()

#### Limit training set to 2M records and save to file

In [None]:
training_df.head()

In [None]:
training_df = training_df.sample(n=2000000, random_state=notebook_random_state)

output_filename = datapath.replace("/combined/","/training/") + "NTAMalignantTrafficPredictor_Training.csv"

training_df.to_csv(path_or_buf=output_filename, sep="|", header=True, index=False, mode="w")


### Prepare a Test Input File

In [None]:
test_df = full_df.sample(n=1000000)

# Drop the columns that we don't want
for col in full_df.columns:
    if col not in NTAMalignantTrafficPredictor.INPUT_FILE_COLS.keys():
        print(f"Dropping {col}...")
        _ = test_df.drop(col, axis=1, inplace=True)

display(test_df.columns)

In [None]:
output_filename = datapath.replace("/combined/","/testing/") + "NTAMalignantTrafficPredictor_Testing.csv"

test_df.to_csv(path_or_buf=output_filename, sep="|", header=True, index=False, mode="w")


### Prepare An Input File With All The Data

In [None]:
test_df = full_df.copy()

# Drop the columns that we don't want
for col in full_df.columns:
    if col not in NTAMalignantTrafficPredictor.INPUT_FILE_COLS.keys():
        print(f"Dropping {col}...")
        _ = test_df.drop(col, axis=1, inplace=True)

display(test_df.columns)

In [None]:
output_filename = datapath.replace("/combined/","/testing/") + "NTAMalignantTrafficPredictor_Full.csv"

test_df.to_csv(path_or_buf=output_filename, sep="|", header=True, index=False, mode="w")


In [None]:
del test_df
del full_df


## The Meat of This Notebook

### Prepare Our Model Class

In [119]:
class NTAMalignantTrafficPredictor:
    INPUT_FILE_CHUNKSIZE = 5000000
    TRAINING_FILE_COLS = {
        "id.orig_p": "int32",
        "id.resp_p": "int32",
        "proto": "string",
        "service": "string",
        "conn_state": "string",
        "history": "string",
        "orig_pkts": "int32",
        "orig_ip_bytes": "int32",
        "resp_pkts": "int32",
        "resp_ip_bytes": "int32",
        "day_of_week": "int32",
        "day_of_month": "int32",
        "hour_of_day": "int32",
        "target": "int32",
    }
    INPUT_FILE_COLS = {
        "ts": "float64",
        "uid": "string",
        "id.orig_p": "int32",
        "id.resp_p": "int32",
        "proto": "string",
        "service": "string",
        "conn_state": "string",
        "history": "string",
        "orig_pkts": "int32",
        "orig_ip_bytes": "int32",
        "resp_pkts": "int32",
        "resp_ip_bytes": "int32",
    }
    ONEHOT_COLS = [
        "proto",
        "service",
        "conn_state",
        "history",
    ]
    SCALE_COLS = [
        "orig_ip_bytes",
        "orig_pkts",
        "resp_ip_bytes",
        "resp_pkts",
    ]
    
    def __init__(self, n_estimators=10, learning_rate=1.0, max_depth=4, random_state=None):
        # Set class parameters
        self.__n_estimators = n_estimators
        self.__learning_rate = learning_rate
        self.__max_depth = max_depth
        self.__random_state = random_state
        self.__training_df = pd.DataFrame(columns=NTAMalignantTrafficPredictor.TRAINING_FILE_COLS.keys())
        self.__predict_df = pd.DataFrame(columns=NTAMalignantTrafficPredictor.INPUT_FILE_COLS.keys())
        self.__encoders = dict()

        self.__predictor = GradientBoostingClassifier(n_estimators=self.__n_estimators,
                                                 learning_rate=self.__learning_rate, 
                                                 max_depth=self.__max_depth, 
                                                 random_state=self.__random_state)

    def __onehot_encode(self, colname, df, train=False):
        if colname not in df.columns:
            raise Exception(f"Column '{colname}' not in dataframe.")
            return None
        if train:
            self.__encoders[colname] = OneHotEncoder(sparse_output=False, dtype=np.uint8, handle_unknown='ignore').set_output(transform="pandas")
            _ = self.__encoders[colname].fit(df[[colname]])
            
        coded_df = self.__encoders[colname].transform(df[[colname]])
        df = pd.concat([df, coded_df], axis=1)
        df.drop(colname, axis=1, inplace=True)
        del coded_df

        return df
    
    def __scale_encode(self, colname, df, train=False):
        if colname not in df.columns:
            raise Exception(f"Column '{colname}' not in dataframe.")
            return None
        if train:
            self.__encoders[colname] = MinMaxScaler()
            _ = self.__encoders[colname].fit(df[[colname]])
            
        df[colname] = self.__encoders[colname].transform(df[[colname]])

        return df
    
    def __preprocess(self, df, train=False):
        for col in self.ONEHOT_COLS:
            df = self.__onehot_encode(col, df, train=train)
            
        for col in self.SCALE_COLS:
            df = self.__scale_encode(col, df, train=train)
            
        return df

    def __prepare_input(self, df):
        prep_df = df.drop("uid", axis=1)
        prep_df["date_time"] = prep_df["ts"].apply(dt.datetime.fromtimestamp)
        prep_df["day_of_week"] = prep_df["date_time"].dt.dayofweek
        prep_df["day_of_week"] = prep_df["day_of_week"].astype(np.uint8)
        prep_df["day_of_month"] = prep_df["date_time"].dt.day
        prep_df["day_of_month"] = prep_df["day_of_month"].astype(np.uint8)
        prep_df["hour_of_day"] =  prep_df["date_time"].dt.hour
        prep_df["hour_of_day"] =  prep_df["hour_of_day"].astype(np.uint8)
        
        # Drop the raw timestamp and date/time columns
        prep_df.drop("date_time",axis=1,inplace=True)
        prep_df.drop("ts",axis=1,inplace=True)

        return prep_df

    def __load_trainingfile(self, filepath):
        if not(os.path.exists(filepath)):
            raise FileNotFoundError(f"File not found: {filepath}")
            return None
        if not(os.path.isfile(filepath)):
            raise FileNotFoundError(f"Specified path is not a file: {filepath}")
            return None

        filename = os.path.basename(filepath)
        self.__training_df = pd.read_csv(filepath, sep="|",low_memory=False, dtype=self.TRAINING_FILE_COLS)
        print(f"Training data shape: {self.__training_df.shape}")
        return self.__training_df

    def __load_datafile(self, filepath):
        if not(os.path.exists(filepath)):
            raise FileNotFoundError(f"File not found: {filepath}")
            return None
        if not(os.path.isfile(filepath)):
            raise FileNotFoundError(f"Specified path is not a file: {filepath}")
            return None

        filename = os.path.basename(filepath)
        self.__predict_df = pd.read_csv(filepath, sep="|",low_memory=False, dtype=self.INPUT_FILE_COLS)
        print(f"Input data shape: {self.__predict_df.shape}")
        return self.__predict_df

    def train(self, filepath):
        self.__load_trainingfile(filepath)
        self.__training_df = self.__preprocess( self.__training_df, train=True)
        y = self.__training_df["target"]
        X = self.__training_df.drop("target", axis=1)
        self.__predictor.fit(X, y)
    
    def predict(self, filepath):
        # Load file(s) / Verify data format
        self.__input_df = self.__load_datafile(filepath)
        
        # Pre-process data
        self.__predict_df = self.__prepare_input(self.__input_df)
        self.__predict_df = self.__preprocess(self.__predict_df, train=False)

        y_pred = self.__predictor.predict(self.__predict_df)
        
        output_df = pd.concat([ self.__input_df[["uid"]], pd.DataFrame(y_pred, columns=["prediction"])], axis=1)
        return output_df

    def predict_to_file(self, filepath):
        if not(os.path.exists(filepath)):
            raise FileNotFoundError(f"File not found: {filepath}")
            return None
        if not(os.path.isfile(filepath)):
            raise FileNotFoundError(f"Specified path is not a file: {filepath}")
            return None

        input_filename = os.path.basename(filepath)
        file_ext = Path(input_filename).suffix
        output_filename = input_filename[0:(-1*len(file_ext))] + '_predictions' + file_ext
        output_filepath = filepath.replace(input_filename, 'output/'+output_filename)
        if (os.path.isfile(output_filepath)):
            os.remove(output_filepath)
        print(output_filepath)
        first_pass = True
        chunk_tmr = SimpleTimer()
        chunk_tmr.start()
        for input_chunk_df in pd.read_csv(filepath, sep="|",low_memory=False, dtype=self.INPUT_FILE_COLS, chunksize=self.INPUT_FILE_CHUNKSIZE):

            print(f"Chunk read time: {chunk_tmr.sts(chunk_tmr.laptime())}")
            
            this_chunk = input_chunk_df.reset_index(drop=True)

            # Pre-process data
            predict_df = self.__prepare_input(this_chunk)
            predict_df = self.__preprocess(predict_df, train=False)
            print(f"Chunk process time: {chunk_tmr.sts(chunk_tmr.laptime())}")
    
            y_pred = self.__predictor.predict(predict_df)
            print(f"Chunk predict time: {chunk_tmr.sts(chunk_tmr.laptime())}")
            
            output_df = pd.concat([this_chunk[["uid"]], pd.DataFrame(y_pred, columns=["prediction"])], axis=1)
            del y_pred
            
            if (not os.path.isdir(output_filepath.replace(output_filename,''))):
                os.mkdir(output_filepath.replace(output_filename,''))
            _ = output_df.to_csv(output_filepath, sep="|", mode="a", header=first_pass, index=False)
            del output_df
            print(f"Chunk output time: {chunk_tmr.sts(chunk_tmr.laptime())}")

            chunk_tmr.stop()
            chunk_tmr.start()
            first_pass = False
            
        return True

    def get_model(self):
        return self.__predictor


### Create a Class Instance and Train It

In [120]:
nta_predictor = NTAMalignantTrafficPredictor(n_estimators=10, learning_rate=1.0, max_depth=4, random_state=0)

In [121]:
display(nta_predictor.get_model())

In [122]:
nta_predictor.train("G:/My Drive/UCSD_MLE_Bootcamp_Capstone/data/MalwareDetectionInNetworkTrafficData/training/NTAMalignantTrafficPredictor_Training.csv")

Training data shape: (2000000, 14)


In [123]:
display(nta_predictor.get_model())

### Test Our Trained Model

#### Load the Full Dataset for Prediction Evaluation

In [11]:
datapath="G:/My Drive/UCSD_MLE_Bootcamp_Capstone/data/MalwareDetectionInNetworkTrafficData/combined/"
input_filename_prefix = "CTU-IoT-Malware-Capture_"

if not(os.path.exists(datapath)):
    datapath="/Users/bdwalker1/Library/CloudStorage/GoogleDrive-maritz.bruce@gmail.com/My Drive/UCSD_MLE_Bootcamp_Capstone/data/MalwareDetectionInNetworkTrafficData/combined/"
    if not(os.path.exists(datapath)):
        print("Data path does not exist!")

dtypes_dict = {'ts': 'float64', 'uid': 'string', 'id.orig_h': 'string', 'id.orig_p': 'int32', \
               'id.resp_h': 'string', 'id.resp_p': 'int32', 'proto': 'string', 'service': 'string', \
               'duration': 'O', 'orig_bytes': 'O', 'resp_bytes': 'O', \
               'conn_state': 'string', 'local_orig': 'string', 'local_resp': 'string', \
               'missed_bytes': 'int32', 'history': 'string', \
               'orig_pkts': 'int32', 'orig_ip_bytes': 'int32', 'resp_pkts': 'int32', 'resp_ip_bytes': 'int32', \
               'tunnel_parents': 'string', 'label': 'string', 'detailed-label': 'string', \
               'filename': 'string', 'date_time': 'string'}

full_df = pd.DataFrame()
chucksize = 1000000
recs_loaded = 0
for fileitem in os.scandir(datapath):
    if fileitem.is_file():
        if (fileitem.name.startswith(input_filename_prefix) and fileitem.name.endswith(".csv")):
            filepath = fileitem.path
            filename = fileitem.name
            with pd.read_csv(filepath, sep="|",low_memory=False, dtype=dtypes_dict, parse_dates=['date_time'], chunksize=chucksize) as reader:
                for df in reader:
                    full_df = pd.concat([full_df, df])
                    recs_loaded += df.shape[0]
                    print(f"\r{recs_loaded:>10} records loaded",end='')

print(f"\r{recs_loaded:>10} total records.     \n")

  25000363 total records.     



#### Test Model On A Test Input File

In [112]:
tmr = SimpleTimer()
tmr.start()
output_df = nta_predictor.predict(
    "G:/My Drive/UCSD_MLE_Bootcamp_Capstone/data/MalwareDetectionInNetworkTrafficData/testing/NTAMalignantTrafficPredictor_Testing.csv"
)
print(f"Load/Prediction time: {tmr.sts(tmr.laptime())}")

merge_df = pd.merge(full_df, output_df, on="uid")
merge_df["target"] = merge_df["label"].apply(lambda x : 0 if x=="Benign" else 1)
print(f"Merge time: {tmr.sts(tmr.laptime())}")

diff_df = merge_df[["uid", "label", "detailed-label", "prediction"]].loc[(merge_df["target"] != merge_df["prediction"])]
print(f"Diff time: {tmr.sts(tmr.laptime())}")

print(f"\nMis-predictions / Total records: {diff_df.shape[0]} / {output_df.shape[0]}")
print(f"Bad prediction rate:{(diff_df.shape[0]/output_df.shape[0]):0.2%}")

_ = tmr.stop()

Input data shape: (1000000, 12)
Load/Prediction time: 4.027s
Merge time: 18.471s
Diff time: 0.081s

Mis-predictions / Total records: 2743 / 1000000
Bad prediction rate:0.27%


#### Test Model on Full Dataset (25M+ records)

In [113]:
tmr = SimpleTimer()
_ = tmr.start()
output2_df = nta_predictor.predict("G:/My Drive/UCSD_MLE_Bootcamp_Capstone/data/MalwareDetectionInNetworkTrafficData/testing/NTAMalignantTrafficPredictor_Full.csv")
print(f"Load/Prediction time: {tmr.sts(tmr.laptime())}")

merge2_df = pd.merge(full_df, output2_df, on="uid")
merge2_df["target"] = merge2_df["label"].apply(lambda x : 0 if x=="Benign" else 1)
print(f"Merge time: {tmr.sts(tmr.laptime())}")

diff2_df = merge2_df[["uid", "label", "detailed-label", "prediction"]].loc[(merge2_df["target"] != merge2_df["prediction"])]
print(f"Diff time: {tmr.sts(tmr.laptime())}")

print(f"\nMis-predictions / Total records: {diff2_df.shape[0]} / {output2_df.shape[0]}")
print(f"Bad prediction rate:{(diff2_df.shape[0]/output2_df.shape[0]):0.2%}")

_ = tmr.stop()

Input data shape: (25000363, 12)
Load/Prediction time: 1m 42.771s
Merge time: 40.909s
Diff time: 1.313s

Mis-predictions / Total records: 67638 / 25000363
Bad prediction rate:0.27%


### Test Process to Write Predictions to File

In [125]:
tmr = SimpleTimer()
tmr.start()
output_df = nta_predictor.predict_to_file(
#    "G:/My Drive/UCSD_MLE_Bootcamp_Capstone/data/MalwareDetectionInNetworkTrafficData/testing/NTAMalignantTrafficPredictor_Testing.csv"
#    "G:/My Drive/UCSD_MLE_Bootcamp_Capstone/data/MalwareDetectionInNetworkTrafficData/testing/NTAMalignantTrafficPredictor_Testing.txt"
    "G:/My Drive/UCSD_MLE_Bootcamp_Capstone/data/MalwareDetectionInNetworkTrafficData/testing/NTAMalignantTrafficPredictor_Full.csv"
)
print(f"Load/Prediction time: {tmr.sts(tmr.laptime())}")
_ = tmr.stop()

G:/My Drive/UCSD_MLE_Bootcamp_Capstone/data/MalwareDetectionInNetworkTrafficData/testing/output/NTAMalignantTrafficPredictor_Full_predictions.csv
Chunk read time: 6.242s
Chunk process time: 10.953s
Chunk predict time: 2.692s
Chunk output time: 6.887s
Chunk read time: 6.028s
Chunk process time: 10.925s
Chunk predict time: 2.655s
Chunk output time: 4.790s
Chunk read time: 6.058s
Chunk process time: 10.760s
Chunk predict time: 2.796s
Chunk output time: 4.743s
Chunk read time: 8.548s
Chunk process time: 10.896s
Chunk predict time: 2.654s
Chunk output time: 4.895s
Chunk read time: 5.937s
Chunk process time: 10.424s
Chunk predict time: 2.590s
Chunk output time: 7.477s
Chunk read time: 0.364s
Chunk process time: 0.103s
Chunk predict time: 0.001s
Chunk output time: 0.012s
Load/Prediction time: 2m 9.446s


---

# Thoughts On The Scaling Process And Things To Consider As I Move To Deployment

While this notebook shows that my model can process over 25 million records in under three minutes, it is possible this pace will not be fast enough for potential real-world use of the model. As I move to deployment stage I will look for further efficiencies to this code -- especially