In [1]:
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler

# Load MNIST
mnist = fetch_openml('mnist_784', version=1)
X, y = mnist["data"], mnist["target"].astype(int)

# Normalize
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train/Test Split
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=10000, random_state=42, stratify=y)

# Grid Search
param_grid = {
    'n_neighbors': [3, 4, 5],
    'weights': ['uniform', 'distance']
}
grid_search = GridSearchCV(KNeighborsClassifier(), param_grid, cv=3, verbose=1, n_jobs=-1)
grid_search.fit(X_train, y_train)

# Best Model Evaluation
best_knn = grid_search.best_estimator_
y_pred = best_knn.predict(X_test)
print("Test accuracy:", accuracy_score(y_test, y_pred))  # Should be >97%


Fitting 3 folds for each of 6 candidates, totalling 18 fits
Test accuracy: 0.9503


In [2]:
import numpy as np

def shift_image(image, direction):
    image = image.reshape(28, 28)
    shifted = np.zeros_like(image)

    if direction == 'left':
        shifted[:, :-1] = image[:, 1:]
    elif direction == 'right':
        shifted[:, 1:] = image[:, :-1]
    elif direction == 'up':
        shifted[:-1, :] = image[1:, :]
    elif direction == 'down':
        shifted[1:, :] = image[:-1, :]

    return shifted.reshape(-1)


In [3]:
X_augmented = []
y_augmented = []

for img, label in zip(X_train, y_train):
    X_augmented.append(img)
    y_augmented.append(label)
    for direction in ['left', 'right', 'up', 'down']:
        shifted = shift_image(img, direction)
        X_augmented.append(shifted)
        y_augmented.append(label)

X_augmented = np.array(X_augmented)
y_augmented = np.array(y_augmented)

# Train on augmented data
X_aug_scaled = scaler.fit_transform(X_augmented)
best_knn.fit(X_aug_scaled, y_augmented)
y_test_pred = best_knn.predict(X_test)
print("Test accuracy after augmentation:", accuracy_score(y_test, y_test_pred))


Test accuracy after augmentation: 0.9618


In [5]:
import seaborn as sns
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Load Titanic dataset from seaborn
df = sns.load_dataset('titanic')

# Drop rows with missing target (Survived)
df.dropna(subset=['survived'], inplace=True)

# Preprocess
df['sex'] = df['sex'].map({'male': 0, 'female': 1})
df['embarked'] = df['embarked'].map({'S': 0, 'C': 1, 'Q': 2})
df['embarked'].fillna(0, inplace=True)
df['age'].fillna(df['age'].median(), inplace=True)
df['fare'].fillna(df['fare'].median(), inplace=True)

# Define features and target
features = ['pclass', 'sex', 'age', 'sibsp', 'parch', 'fare', 'embarked']
X = df[features]
y = df['survived']

# Train/Test Split
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, random_state=42)
clf = RandomForestClassifier(random_state=42)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

# Accuracy
print("Titanic accuracy:", accuracy_score(y_test, y_pred))


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['embarked'].fillna(0, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['age'].fillna(df['age'].median(), inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values alw

Titanic accuracy: 0.7623318385650224


In [7]:
import urllib.request
import tarfile
import os

# URLs for spam and ham data
dataset_url = "https://spamassassin.apache.org/old/publiccorpus/20030228_spam.tar.bz2"
ham_url = "https://spamassassin.apache.org/old/publiccorpus/20030228_easy_ham.tar.bz2"

# Create dataset directory
os.makedirs("datasets", exist_ok=True)

# Download and extract function
def download_and_extract(url, extract_path):
    archive_path = os.path.join("datasets", os.path.basename(url))
    # Download if not already exists
    if not os.path.exists(archive_path):
        print(f"Downloading {url}...")
        urllib.request.urlretrieve(url, archive_path)
    # Extract
    with tarfile.open(archive_path, "r:bz2") as tar:
        tar.extractall(path=extract_path)
    print(f"Extracted to {extract_path}")

# Download and extract spam and ham
download_and_extract(dataset_url, "datasets")
download_and_extract(ham_url, "datasets")


Downloading https://spamassassin.apache.org/old/publiccorpus/20030228_spam.tar.bz2...
Extracted to datasets
Downloading https://spamassassin.apache.org/old/publiccorpus/20030228_easy_ham.tar.bz2...
Extracted to datasets


In [8]:
from email import policy
from email.parser import BytesParser

def load_emails(folder):
    emails = []
    for filename in os.listdir(folder):
        filepath = os.path.join(folder, filename)
        if os.path.isfile(filepath):  # Avoid subfolders like CVS
            with open(filepath, 'rb') as f:
                try:
                    email = BytesParser(policy=policy.default).parse(f)
                    emails.append(email)
                except:
                    continue  # skip corrupted files
    return emails

# Load the emails from extracted folders
spam_emails = load_emails('datasets/spam')
ham_emails = load_emails('datasets/easy_ham')

print(f"Loaded {len(spam_emails)} spam and {len(ham_emails)} ham emails.")


Loaded 501 spam and 2501 ham emails.


In [10]:
import os
import tarfile
import urllib.request
import re
from email import policy
from email.parser import BytesParser
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score

# --- Step 1: Download and Extract SpamAssassin Dataset ---
def download_and_extract(url, extract_to):
    filename = os.path.basename(url)
    archive_path = os.path.join("datasets", filename)
    os.makedirs("datasets", exist_ok=True)
    if not os.path.exists(archive_path):
        urllib.request.urlretrieve(url, archive_path)
    with tarfile.open(archive_path, "r:bz2") as tar:
        tar.extractall(path=extract_to)

download_and_extract("https://spamassassin.apache.org/old/publiccorpus/20030228_spam.tar.bz2", "datasets")
download_and_extract("https://spamassassin.apache.org/old/publiccorpus/20030228_easy_ham.tar.bz2", "datasets")

# --- Step 2: Load Emails ---
def load_emails(folder):
    emails = []
    for filename in os.listdir(folder):
        filepath = os.path.join(folder, filename)
        if os.path.isfile(filepath):
            with open(filepath, 'rb') as f:
                try:
                    email = BytesParser(policy=policy.default).parse(f)
                    emails.append(email)
                except:
                    continue
    return emails

spam_emails = load_emails("datasets/spam")
ham_emails = load_emails("datasets/easy_ham")

# --- Step 3: Email to Text Transformer ---
class EmailToTextTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None): return self
    def transform(self, X):
        return [self.email_to_text(email) for email in X]
    def email_to_text(self, email):
        try:
            return email.get_content()
        except:
            payload = email.get_payload(decode=True)
            if isinstance(payload, bytes):
                return payload.decode('utf-8', errors='replace')
            return str(payload)

# --- Step 4: Text Preprocessing ---
def preprocess(text):
    text = text.lower()
    text = re.sub(r'\d+', 'number', text)
    text = re.sub(r'http\S+', 'url', text)
    return text

# --- Step 5: Combine Labels and Emails ---
emails = spam_emails + ham_emails
labels = [1] * len(spam_emails) + [0] * len(ham_emails)

# --- Step 6: Build and Train Pipeline ---
pipeline = Pipeline([
    ('email_text', EmailToTextTransformer()),
    ('vectorizer', CountVectorizer(preprocessor=preprocess)),
    ('clf', LogisticRegression(max_iter=1000))
])

X_train, X_test, y_train, y_test = train_test_split(emails, labels, test_size=0.2, random_state=42)
pipeline.fit(X_train, y_train)

# --- Step 7: Evaluate ---
y_pred = pipeline.predict(X_test)
print("Precision:", precision_score(y_test, y_pred))
print("Recall:", recall_score(y_test, y_pred))


Precision: 0.8294573643410853
Recall: 0.8991596638655462
