# Notebook 04: Make pipeline

### Introduction

In this notebook, we will create a machine learning pipeline using scikit-learn. The pipeline will include data preprocessing steps and a decision tree regressor as the base model. The pipeline will be trained and evaluated on the password strength prediction task.

### Setup

Let's start by importing the necessary libraries.


In [54]:
import time

import joblib
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeRegressor


### Data Loading

Let's load the stratified sample data from the previous notebook.


In [55]:
df = pd.read_csv("./data/stratified_sample_data.csv")
df.head()

Unnamed: 0,password,strength
0,csillik,0.180594
1,huniihuu,0.177778
2,chaipy,0.172331
3,876876b,0.155556
4,miiwhy,0.154795


### Data Splitting
Let's split the data into training and testing sets.

In [56]:
X = df["password"]
y = df["strength"]
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)
X_train_df = X_train.to_frame()
X_test_df = X_test.to_frame()

### Custom Transformers

We will define custom transformers to preprocess the data before feeding it into the pipeline.


In [57]:
class LenTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["len"] = X["password"].apply(lambda x: self._lenTransform(x))
        transformed_X = X["len"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _lenTransform(self, text: str) -> int:
        return len(text)

In [58]:
class AlphaUCTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["alphaUC"] = X["password"].apply(lambda x: self._alphaUCTransform(x))
        transformed_X = X["alphaUC"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _alphaUCTransform(self, text: str) -> int:
        return sum(1 for a in text if a.isupper())

In [59]:
class AlphaLCTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["alphaLC"] = X["password"].apply(lambda x: self._alphaLCTransform(x))
        transformed_X = X["alphaLC"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _alphaLCTransform(self, text: str) -> int:
        return sum(1 for a in text if a.islower())

In [60]:
class NumberTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["number"] = X["password"].apply(lambda x: self._numberTransform(x))
        transformed_X = X["number"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _numberTransform(self, text: str) -> int:
        return sum(1 for a in text if a.isdecimal())

In [61]:
class SymbolTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["symbol"] = X["password"].apply(lambda x: self._symbolTransform(x))
        transformed_X = X["symbol"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _symbolTransform(self, text: str) -> int:
        return sum(a in set("!@#$%^&*") for a in text)

In [62]:
class MidCharTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["midChar"] = X["password"].apply(lambda x: self._midCharTransform(x))
        transformed_X = X["midChar"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _midCharTransform(self, text: str) -> int:
        return sum(
            bool(
                (a.isdecimal() or (a in set("!@#$%^&*")))
                and ix > 0
                and ix < len(text) - 1
            )
            for ix, a in enumerate(text)
        )

In [63]:
class RepCharTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["repChar"] = X["password"].apply(lambda x: self._repCharTransform(x))
        transformed_X = X["repChar"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _repCharTransform(self, text: str) -> int:
        return len(text) - len(list(set(text)))

In [64]:
class UniqueCharTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["uniqueChar"] = X["password"].apply(lambda x: self._uniqueCharTransform(x))
        transformed_X = X["uniqueChar"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _uniqueCharTransform(self, text: str) -> int:
        return len(list(set(text)))

In [65]:
class ConsecAlphaUCTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["consecAlphaUC"] = X["password"].apply(
            lambda x: self._consecAlphaUCTransform(x)
        )
        transformed_X = X["consecAlphaUC"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _consecAlphaUCTransform(self, text: str) -> int:
        temp = ""
        nConsecAlphaUC = 0
        for a in text:
            if a.isupper():
                if temp and temp[-1] == a:
                    nConsecAlphaUC += 1
                temp = a
        return nConsecAlphaUC

In [66]:
class ConsecAlphaLCTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["consecAlphaLC"] = X["password"].apply(
            lambda x: self._consecAlphaLCTransform(x)
        )
        transformed_X = X["consecAlphaLC"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _consecAlphaLCTransform(self, text: str) -> int:
        temp = ""
        nConsecAlphaLC = 0
        for a in text:
            if a.islower():
                if temp and temp[-1] == a:
                    nConsecAlphaLC += 1
                temp = a
        return nConsecAlphaLC

In [67]:
class ConsecNumberTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["consecNumber"] = X["password"].apply(
            lambda x: self._consecNumberTransform(x)
        )
        transformed_X = X["consecNumber"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _consecNumberTransform(self, text: str) -> int:
        temp = ""
        nConsecNumber = 0
        for a in text:
            if a.isdecimal():
                if temp and temp[-1] == a:
                    nConsecNumber += 1
                temp = a
        return nConsecNumber

In [68]:
class ConsecSymbolTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["consecSymbol"] = X["password"].apply(
            lambda x: self._consecSymbolTransform(x)
        )
        transformed_X = X["consecSymbol"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _consecSymbolTransform(self, text: str) -> int:
        temp = ""
        nConsecSymbol = 0
        for a in text:
            if a in set("!@#$%^&*"):
                if temp and temp[-1] == a:
                    nConsecSymbol += 1
                temp = a
        return nConsecSymbol

In [69]:
class SeqAlphaTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["seqAlpha"] = X["password"].apply(lambda x: self._seqAlphaTransform(x))
        transformed_X = X["seqAlpha"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _seqAlphaTransform(self, text: str) -> int:
        sAlphas = "abcdefghijklmnopqrstuvwxyz"
        nSeqAlpha = 0
        for s in range(len(sAlphas) - 2):
            sFwd = sAlphas[s : s + 3]
            sRev = sFwd[::-1]
            if sFwd in text.lower() or sRev in text.lower():
                nSeqAlpha += 1
        return nSeqAlpha

In [70]:
class SeqNumberTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["seqNumber"] = X["password"].apply(lambda x: self._seqNumberTransform(x))
        transformed_X = X["seqNumber"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _seqNumberTransform(self, text: str) -> int:
        sNumerics = "01234567890"
        nSeqNumber = 0
        for s in range(len(sNumerics) - 2):
            sFwd = sNumerics[s : s + 3]
            sRev = sFwd[::-1]
            if sFwd in text.lower() or sRev in text.lower():
                nSeqNumber += 1
        return nSeqNumber

In [71]:
class SeqKeyboardTransform(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X["seqKeyboard"] = X["password"].apply(lambda x: self._seqKeyboardTransform(x))
        transformed_X = X["seqKeyboard"].to_numpy()
        return np.array(transformed_X).reshape(-1, 1)

    def _seqKeyboardTransform(self, text: str) -> int:
        sTopRow = "qwertyuiop"
        sHomeRow = "asdfghjkl"
        sBottomRow = "zxcvbnm"
        nKeyboard = 0
        sRows = [sTopRow, sHomeRow, sBottomRow]

        for sRow in sRows:
            for s in range(len(sRow) - 2):
                sFwd = sRow[s : s + 3]
                sRev = sFwd[::-1]
                if sFwd in text.lower() or sRev in text.lower():
                    nKeyboard += 1

        return nKeyboard

### Data Preprocessing

We will define a column transformer to apply the custom transformers to the appropriate columns.


In [72]:
feature = ["password"]

preprocess = ColumnTransformer(
    [
        ("len", LenTransform(), feature),
        ("alpha_uc", AlphaUCTransform(), feature),
        ("alpha_lc", AlphaLCTransform(), feature),
        ("number", NumberTransform(), feature),
        ("symbol", SymbolTransform(), feature),
        ("mid_char", MidCharTransform(), feature),
        ("rep_char", RepCharTransform(), feature),
        ("unique_char", UniqueCharTransform(), feature),
        ("consec_alpha_uc", ConsecAlphaUCTransform(), feature),
        ("consec_alpha_lc", ConsecAlphaLCTransform(), feature),
        ("consec_number", ConsecNumberTransform(), feature),
        ("consec_symbol", ConsecSymbolTransform(), feature),
        ("seq_alpha", SeqAlphaTransform(), feature),
        ("seq_number", SeqNumberTransform(), feature),
        ("seq_keyboard", SeqKeyboardTransform(), feature),
    ]
)

### Create Pipeline
We will create a pipeline that includes the data preprocessing steps and a decision tree regressor as the base model.

In [73]:
pipeline = Pipeline(
    [("preprocess", preprocess), ("regressor", DecisionTreeRegressor())]
)
pipeline

### Train and Test the Pipeline
Let's train the pipeline on the training data and evaluate its performance on the testing data.

In [74]:
print("Train model")

start_time = time.time()
pipeline.fit(X_train_df, y_train)
end_time = time.time()

train_time = (end_time - start_time, 4)
print(f"Training time: {(end_time - start_time):.4f}")

print("\nTest model")
start_time = time.time()
y_pred = pipeline.predict(X_test_df)
end_time = time.time()

print(
    f"MAE: {(mean_absolute_error(y_test, y_pred)):.4f}\tMSE: {(mean_squared_error(y_test, y_pred)):.4f}\tRMSE: {(mean_squared_error(y_test, y_pred, squared=False)):.4f}\tR2: {(r2_score(y_test, y_pred)):.4f}\tTesting time: {(end_time - start_time):.4f}"
)

Train model
Training time: 1.3186

Test model
MAE: 0.0003	MSE: 0.0000	RMSE: 0.0039	R2: 0.9998	Testing time: 0.1735


### Save the Model
Let's save the trained pipeline for future use.

In [75]:
joblib.dump(pipeline, "sample_model.joblib")

['sample_model.joblib']

🎉 Congratulations! We have created a machine learning pipeline for password strength prediction. The pipeline includes data preprocessing steps and a decision tree regressor as the base model. The pipeline has been trained and evaluated on the testing data, and the model has been saved for future use.

