In [4]:
# Standard Data Science Toolkit
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
sns.set_style('whitegrid')
import category_encoders as ce

# Machine Learning Preprocessing and Scoring Metrics
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler, MinMaxScaler, PolynomialFeatures, OneHotEncoder, LabelEncoder
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV, cross_val_score, RepeatedKFold, KFold
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, accuracy_score, f1_score, mean_squared_error, r2_score, explained_variance_score, roc_curve, auc
from sklearn.feature_selection import SelectFromModel
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline, FeatureUnion
import time
import psutil
import os

# Machine Learning Algorithms
from sklearn.linear_model import LinearRegression, LogisticRegression, Lasso, Ridge
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor, GradientBoostingClassifier, GradientBoostingRegressor
from xgboost import XGBClassifier, XGBRegressor
from sklearn.svm import SVC, SVR
from tpot import TPOTClassifier

# Natural Language Processing
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk import FreqDist
import string
import re

In [None]:
# Encode categorical variables using one-hot encoding:
X = pd.get_dummies(X, columns=[''], drop_first=True, dtype=int) 

In [None]:
# One-hot encoding
# Isolate categorical columns
X_train_categorical = X_train_fill_na.select_dtypes(exclude=["int64", "float64"]).copy()
X_train_categorical

ohe = OneHotEncoder(handle_unknown="ignore", sparse_output=False)

ohe.fit(X_train_categorical)
X_train_ohe = pd.DataFrame(
    ohe.transform(X_train_categorical),
    # index is important to ensure we can concatenate with other columns
    index=X_train_categorical.index,
    # we are dummying multiple columns at once, so stack the names
    columns=np.hstack(ohe.categories_)
)
X_train_ohe

In [None]:
# Label Encoder - Changes Target column values to be in range [0, n)
# Instantiate the encoder
encoder = LabelEncoder()
y_train_encoded = encoder.fit_transform(y_train)
y_test_encoded = encoder.transform(y_test)

In [None]:
# Column Transformer
# Instantiate Column Transformer
col_transformer = ColumnTransformer(transformers=[
    ('ohe', OneHotEncoder(categories='auto', handle_unknown='ignore'), ['category']),
    ('name_2', transformer(), ['columns_to_apply_to'])
], remainder='passthrough')


In [None]:
smote = SMOTE()
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

In [None]:
# Visualize the Decision Tree
fig, axes = plt.subplots(nrows = 1,ncols = 1, figsize = (3,3), dpi=300)
tree.plot_tree(model,
               feature_names = X.columns.tolist(), 
               class_names=np.unique(y).astype('str').tolist(),
               filled = True)
plt.show()

In [None]:
def test_regression_models(X_train, y_train):
    # Adjustments
    CV = 10
    SCORING = 'neg_mean_squared_error' 
    
    # Define the models to be tested
    models = {
        "Linear Regression": LinearRegression(),
        "K-Nearest Neighbors": KNeighborsRegressor(),
        "Decision Tree": DecisionTreeRegressor(random_state=42),
        "Random Forest": RandomForestRegressor(random_state=42),
        "Gradient Boosting": GradientBoostingRegressor(random_state=42),
        "XGBoost": XGBRegressor(random_state=42),
        "Support Vector Regressor": SVR()
    }
    
    # Initialize results dictionary to store metrics for each model
    results = {
        "Model": [],
        "Mean Squared Error": [],
        "Spread (std)": [],
        "Train Time (s)": [],
        "Memory Usage (MB)": []
    }
    
    # Perform model evaluation for each model
    for model_name, model in models.items():
        # Measure training time
        start_train_time = time.time()

        # Perform k-fold cross-validation to evaluate the model on the training data
        cv_scores = cross_val_score(model, X_train, y_train, cv=CV, scoring=SCORING)

        end_train_time = time.time()
        train_time = end_train_time - start_train_time

        # Measure memory usage (in MB)
        memory_usage = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)
        
        # Convert MSE scores to positive values
        mse_scores = -cv_scores

        # Store the metrics in the results dictionary
        results["Model"].append(model_name)
        results["Mean Squared Error"].append(round(np.mean(mse_scores), 4))
        results["Spread (std)"].append(round(np.std(mse_scores), 4))
        results["Train Time (s)"].append(round(train_time, 4))
        results["Memory Usage (MB)"].append(round(memory_usage, 0))
        
    # Create a DataFrame to display the results
    results_df = pd.DataFrame(results)
    
    # Print the results
    return results_df


In [None]:
def test_classification_models(X_train, y_train):
    # Adjustments
    CV = 10
    SCORING = 'accuracy'
    
    
    # Define the models to be tested
    models = {
        "Logistic Regression": LogisticRegression(random_state=42),
        "K-Nearest Neighbors": KNeighborsClassifier(),
        "Naive Bayes": GaussianNB(),
        "Decision Tree": DecisionTreeClassifier(random_state=42),
        "Random Forest": RandomForestClassifier(random_state=42),
        "Gradient Boosting": GradientBoostingClassifier(random_state=42),
        "XGBoost": XGBClassifier(random_state=42),
        "Support Vector Machine": SVC(random_state=42)
    }
    
    # Initialize results dictionary to store metrics for each model
    results = {
        "Model": [],
        "Accuracy (%)": [],
        "Spread (std)": [],
        "Train Time (s)": [],
        "Memory Usage (MB)": []
    }
    
    # Perform model evaluation for each model
    for model_name, model in models.items():
        # Measure training time
        start_train_time = time.time()

        # Perform k-fold cross-validation to evaluate the model on the training data
        cv_scores = cross_val_score(model, X_train, y_train, cv=CV, scoring=SCORING)

        end_train_time = time.time()
        train_time = end_train_time - start_train_time

        # Measure memory usage (in MB)
        memory_usage = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)
        
        # Store the metrics in the results dictionary
        results["Model"].append(model_name)
        results["Accuracy (%)"].append(round(np.mean(cv_scores) * 100, 2))
        results["Spread (std)"].append(round(np.std(cv_scores), 4))
        results["Train Time (s)"].append(round(train_time, 4))
        results["Memory Usage (MB)"].append(round(memory_usage, 0))
        
    # Create a DataFrame to display the results
    results_df = pd.DataFrame(results)
    
    # Print the results
    return results_df

In [None]:
# Example of a BaggingClassifier
from sklearn.ensemble import BaggingClassifier
bagged_tree = BaggingClassifier(DecisionTreeClassifier(criterion='gini', max_depth=5), n_estimators=20)

In [None]:
# Feature Importances + Visualization
model = ''

features = model.feature_importances_

def plot_feature_importances(model):
    n_features = X_train.shape[1]
    plt.figure(figsize=(8,8))
    plt.barh(range(n_features), model.feature_importances_, align='center') 
    plt.yticks(np.arange(n_features), X_train.columns.values) 
    plt.xlabel('Feature importance')
    plt.ylabel('Feature')

plot_feature_importances(features)

In [None]:
# Using TPOT
# Construct and fit TPOT classifier
start_time = time.time()
tpot = TPOTClassifier(generations=5, population_size=10, random_state=42, n_jobs=-1, verbosity=2)
tpot.fit(X_train, y_train)
end_time = time.time()

# Results
print('TPOT classifier finished in %s seconds' % (end_time - start_time)) 
print('Best pipeline test accuracy: %.3f' % tpot.score(X_test, y_test))

# See best performing model and hyperparameters
best_model = tpot.fitted_pipeline_
print(best_model)

In [None]:
# Cross Validation
clf = ''
mean_clf_cv = cross_val_score(clf, X_train, y_train).mean()
print(f"Mean Cross Validation Score for Random Forest Classifier: {mean_clf_cv :.2%}")

In [None]:
# GridSearchCV
# Set hyperparamter grid
param_grid = {
    'hyperparameter_1': ['name_1', 'name_2'],
    'hyperparameter_2': [None, 1, 2, 3, 4, 5]
}

# Fit to model
gs = GridSearchCV(clf, param_grid, cv=3, return_train_score=True)
gs.fit(X_train, y_train)

# Evaluate training/testing score. Note train/test time also available
gs_train_score = gs.cv_results_['mean_train_score'].mean()
gs_test_score = gs.cv_results_['mean_test_score'].mean()
gs_best_params = gs.best_params_

print(f"Mean Training Score: {gs_train_score :.2%}")
print(f"Mean Test Score: {gs_test_score :.2%}")
print(f"Best Parameter Combination Found During Grid Search: {gs_best_params}")

In [None]:
# Pipelines with GridSearch
# Create pipeline
pipe = Pipeline([
    ('preprocessing_step_1', step_1_action),
    ('preprocessing_step_2', step_2_action),
    ('clf', clf())
])

# Utilize GridSearch, note the use of 'model__' followed by the hyperparamter
param_grid = [{'model__max_depth': [None, 2, 6, 10], 
         'model__min_samples_split': [5, 10]}]

# Create the grid, with "pipe" as the estimator
gridsearch = GridSearchCV(estimator=pipe, 
                          param_grid=param_grid, 
                          scoring='accuracy', 
                          cv=5)

# Fit pipeline
gridsearch.fit(X_train, y_train)

# Print accuracy
gridsearch.score(X_test, y_test)

"""
If not using grid search can also use:
pipe.fit(X_train, y_train)
pipe.score(X_test, y_test)
"""

In [None]:
# More complex pipeline

def preprocess_data_with_pipeline(X):
    
    ### Encoding categorical data ###
    original_features_encoded = ColumnTransformer(transformers=[
        ("ohe", OneHotEncoder(categories="auto", handle_unknown="ignore"), ["category"])
    ], remainder="passthrough")
    
    ### Feature engineering ###
    def is_odd(data):
        """
        Helper function that returns 1 if odd, 0 if even
        """
        return data % 2

    feature_eng = ColumnTransformer(transformers=[
        ("add_number_odd", FunctionTransformer(is_odd), ["number"])
    ], remainder="drop")
  
    ### Combine encoded and engineered features ###
    feature_union = FeatureUnion(transformer_list=[
        ("encoded_features", original_features_encoded),
        ("engineered_features", feature_eng)
    ])
    
    ### Pipeline (including scaling) ###
    pipe = Pipeline(steps=[
        ("feature_union", feature_union),
        ("scale", StandardScaler())
    ])
    
    transformed_data = pipe.fit_transform(X)
    
    ### Re-apply labels (optional step for readability) ###
    encoder = original_features_encoded.named_transformers_["ohe"]
    category_labels = encoder.categories_[0]
    all_cols = list(category_labels) + ["number", "number_odd"]
    return pd.DataFrame(transformed_data, columns=all_cols, index=X.index), pipe
    
# Reset value of example_X
example_X = example_data.drop("target", axis=1)
# Test out our new function
result, pipe = preprocess_data_with_pipeline(example_X)
result

In [None]:
# Using autosklearn.classification
import autosklearn.classification

model = autosklearn.classification.AutoSklearnClassifier()
model.fit(X_train, y_train)
y_pred = cls.predict(X_test)

In [None]:
# Recommender Systems
# Imports (Similar to sklearn but run different)
from surprise import Reader, Dataset
from surprise.model_selection import cross_validate
from surprise.prediction_algorithms import SVD
from surprise.prediction_algorithms import KNNWithMeans, KNNBasic, KNNBaseline
from surprise.model_selection import GridSearchCV

# Read in values as Surprise dataset 
reader = Reader(rating_scale=(0, 5))    #Customize to rating range
data = Dataset.load_from_df(df, reader)

In [None]:
# Natural Language Processing

def clean_string(list_of_words):
# Clean string of new line characters and punctuation then make lower case    
    one_string = ''.join(list).replace('\n', ' ')
    translator = str.maketrans('', '', string.punctuation)
    words = one_string.translate(translator)
    lower = words.lower()
    return lower

# Tokenize and remove 'stop words'
tokenized_string = word_tokenize(cleaned_string)
stop_words = stopwords.words('english')
clean_tokens = [word for word in tokenized_string if word not in stop_words]

# Lemmatizaation
lemmatizer = WordNetLemmatizer()
lemmatized_tokens = lemmatizer.lemmatize(clean_tokens)

# Vectorize
def count_vectorize(lemmatized_tokens):
    vector = {}
    for word in lemmatized_tokens:
        if word in vector.keys():
            vector[word] += 1
        else:
            vector[word] = 1
    return vector

# Alternative vectorization:
freqdist = FreqDist(lemmatized_tokens)
most_common = freqdist.most_common(200)

In [None]:
# Deep Learning
# Print Historical Summary of Training & Validation Accuracy by Epoch
def display_historical_summaries(history):
  """ Custom function to utilize model fitness history to visualize accuracy. """
  plt.plot(history.history["accuracy"], label="training")
  plt.plot(history.history["val_accuracy"], label="validation")
  plt.legend()
  plt.show()

display_historical_summaries(history=history)

In [None]:
# Trasnfer Learning Example

# Pull in source model
output = vgg.layers[-1].output
output = Flatten()(output)
base_model = Model(vgg.input, output)

# Freeze model and all internal layers
base_model.trainable = False
for layer in base_model.layers:
  layer.trainable = False

# Visualize frozen layer architecture for base model
# pd.set_option("max_colwidth", -1)
layers = [(layer, layer.name, layer.trainable) for layer in base_model.layers]
pd.DataFrame(layers, columns=["Layer Type", "Layer Name", "Is Layer Trainable?"])

In [None]:
# Image Augmentation to reduce chances of learning from noise (overfitting)

ImageDataGenerator =  tf.keras.preprocessing.image.ImageDataGenerator

# Create image augmentation engine as generator-like object
generator = ImageDataGenerator(
    featurewise_center=False,
    samplewise_center=False,
    featurewise_std_normalization=False,
    samplewise_std_normalization=False,
    zca_whitening=False,
    rotation_range=5,
    zoom_range=0.1,
    width_shift_range=0.1,
    height_shift_range=0.1,
    horizontal_flip=False,
    vertical_flip=False,
)

# Fit training data to augmentation generator
generator.fit(X_train)

In [None]:
# Visualize Accuracy and Loss by Epochs
def plot_training_results(history):
    """
    Visualize results of the model training using `matplotlib`.

    The visualization will include charts for accuracy and loss,
    on the training and as well as validation data sets.

    INPUTS:
        history(tf.keras.callbacks.History):
            Contains data on how the model metrics changed
            over the course of training.

    OUTPUTS:
        None.
    """
    # Get accuracy for training and validation sets
    accuracy = history.history['accuracy']
    validation_accuracy = history.history['val_accuracy']

    # Get loss for training and validation sets
    loss = history.history['loss']
    validation_loss = history.history['val_loss']

    # Get range of epochs to produce common plotting range
    epochs_range = range(epochs)

    # Instantiate plotting figure space
    plt.figure(figsize=(20, 8))

    # Create training/validation accuracy subplot
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, accuracy, label='Training Accuracy')
    plt.plot(epochs_range, validation_accuracy, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    # Create training/validation loss subplot
    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, validation_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')

    # Render visualization
    plt.show()

plot_training_results(history)