# HW4 Assignment 1 – Logistic Regression

Fit a logistic regression model on the `Lag1`–`Lag5` plus `Volume` predictors and report the accuracy on the 2005 test split. The notebook below keeps everything deterministic and self-contained so it can run without external libraries.


In [None]:
import csv
import math
import itertools
import random
from pathlib import Path
from typing import List, Sequence, Tuple

DATA_PATH = Path('smarket.csv')
FEATURE_NAMES = [f'Lag{i}' for i in range(1, 6)] + ['Volume']
VOLUME_INDEX = FEATURE_NAMES.index('Volume')


In [None]:
def load_smarket(path: Path) -> Tuple[List[List[float]], List[int], List[int]]:
    """Return raw features, labels (1 for Up, 0 for Down) and the associated years."""
    features, labels, years = [], [], []
    with path.open(newline='') as f:
        reader = csv.DictReader(f)
        for row in reader:
            years.append(int(row['Year']))
            feat = [float(row[name]) for name in FEATURE_NAMES]
            features.append(feat)
            labels.append(1 if row['Direction'].strip().lower() == 'up' else 0)
    return features, labels, years

X_all, y_all, years = load_smarket(DATA_PATH)
X_train_raw = [row for row, year in zip(X_all, years) if year < 2005]
y_train = [label for label, year in zip(y_all, years) if year < 2005]
X_test_raw = [row for row, year in zip(X_all, years) if year == 2005]
y_test = [label for label, year in zip(y_all, years) if year == 2005]

print(f'Total samples: {len(X_all)} (training: {len(X_train_raw)}, testing: {len(X_test_raw)})')


In [None]:
def compute_scaler(dataset: Sequence[Sequence[float]]) -> Tuple[List[float], List[float]]:
    n_features = len(dataset[0])
    means = [sum(row[i] for row in dataset) / len(dataset) for i in range(n_features)]
    stds: List[float] = []
    for i in range(n_features):
        mean_i = means[i]
        variance = sum((row[i] - mean_i) ** 2 for row in dataset) / len(dataset)
        stds.append(variance ** 0.5 if variance > 0 else 1.0)
    return means, stds

def apply_scaler(dataset: Sequence[Sequence[float]], means: Sequence[float], stds: Sequence[float]) -> List[List[float]]:
    scaled: List[List[float]] = []
    for row in dataset:
        scaled.append([(row[i] - means[i]) / stds[i] for i in range(len(row))])
    return scaled

def add_bias(dataset: Sequence[Sequence[float]]) -> List[List[float]]:
    return [[1.0] + list(row) for row in dataset]


In [None]:
def sigmoid(z: float) -> float:
    if z >= 0:
        ez = math.exp(-z)
        return 1.0 / (1.0 + ez)
    ez = math.exp(z)
    return ez / (1.0 + ez)

def solve_linear_system(matrix: List[List[float]], vector: List[float]) -> List[float]:
    n = len(vector)
    augmented = [row[:] + [vector[i]] for i, row in enumerate(matrix)]
    for col in range(n):
        pivot = max(range(col, n), key=lambda r: abs(augmented[r][col]))
        if abs(augmented[pivot][col]) < 1e-12:
            raise ValueError('Singular matrix encountered in Newton step')
        if pivot != col:
            augmented[col], augmented[pivot] = augmented[pivot], augmented[col]
        pivot_val = augmented[col][col]
        augmented[col] = [val / pivot_val for val in augmented[col]]
        for row in range(n):
            if row == col:
                continue
            factor = augmented[row][col]
            if factor == 0:
                continue
            augmented[row] = [augmented[row][i] - factor * augmented[col][i] for i in range(n + 1)]
    return [augmented[i][-1] for i in range(n)]

def train_logistic(features: Sequence[Sequence[float]], labels: Sequence[int], *, l2: float = 1e-3,
                   max_iter: int = 30, tol: float = 1e-9) -> List[float]:
    n_features = len(features[0])
    weights = [0.0] * n_features
    for _ in range(max_iter):
        gradient = [0.0] * n_features
        hessian = [[0.0] * n_features for _ in range(n_features)]
        for row, target in zip(features, labels):
            z = sum(w * x for w, x in zip(weights, row))
            p = sigmoid(z)
            diff = p - target
            for j in range(n_features):
                gradient[j] += diff * row[j]
            weight = p * (1.0 - p)
            for i in range(n_features):
                for j in range(n_features):
                    hessian[i][j] += weight * row[i] * row[j]
        for j in range(1, n_features):  # L2 regularization skips the bias term.
            gradient[j] += l2 * weights[j]
            hessian[j][j] += l2
        step = solve_linear_system(hessian, gradient)
        max_delta = max(abs(delta) for delta in step)
        weights = [w - delta for w, delta in zip(weights, step)]
        if max_delta < tol:
            break
    return weights

def predict_probabilities(features: Sequence[Sequence[float]], weights: Sequence[float]) -> List[float]:
    return [sigmoid(sum(w * x for w, x in zip(weights, row))) for row in features]

def accuracy_from_probs(probs: Sequence[float], labels: Sequence[int], threshold: float) -> float:
    correct = 0
    for prob, target in zip(probs, labels):
        pred = 1 if prob >= threshold else 0
        if pred == target:
            correct += 1
    return correct / len(labels)


In [None]:
def build_folds(n_samples: int, k: int = 5, seed: int = 13) -> List[Tuple[List[int], List[int]]]:
    indices = list(range(n_samples))
    random.Random(seed).shuffle(indices)
    fold_sizes = [n_samples // k] * k
    for i in range(n_samples % k):
        fold_sizes[i] += 1
    folds: List[Tuple[List[int], List[int]]] = []
    start = 0
    for size in fold_sizes:
        end = start + size
        val_indices = indices[start:end]
        train_indices = indices[:start] + indices[end:]
        folds.append((train_indices, val_indices))
        start = end
    return folds

def cross_validate_l2(features: Sequence[Sequence[float]], labels: Sequence[int], *, num_folds: int = 5) -> Tuple[float, float]:
    folds = build_folds(len(features), k=num_folds)
    l2_values = [0.0, 1e-4, 1e-3, 1e-2, 1e-1]
    best_accuracy = 0.0
    best_l2 = l2_values[0]
    for l2 in l2_values:
        oof_probs: List[float] = []
        oof_labels: List[int] = []
        for train_ids, val_ids in folds:
            train_subset = [features[i] for i in train_ids]
            val_subset = [features[i] for i in val_ids]
            train_labels = [labels[i] for i in train_ids]
            val_labels = [labels[i] for i in val_ids]
            means, stds = compute_scaler(train_subset)
            train_scaled = add_bias(apply_scaler(train_subset, means, stds))
            val_scaled = add_bias(apply_scaler(val_subset, means, stds))
            weights = train_logistic(train_scaled, train_labels, l2=l2)
            oof_probs.extend(predict_probabilities(val_scaled, weights))
            oof_labels.extend(val_labels)
        acc = accuracy_from_probs(oof_probs, oof_labels, 0.5)
        if acc > best_accuracy:
            best_accuracy = acc
            best_l2 = l2
    return best_accuracy, best_l2


In [None]:
def select_features(dataset: Sequence[Sequence[float]], indices: Sequence[int]) -> List[List[float]]:
    return [[row[i] for i in indices] for row in dataset]

def search_feature_subsets(train_features: Sequence[Sequence[float]], labels: Sequence[int], *, min_size: int = 2,
                           tolerance: float = 0.01) -> dict:
    feature_ids = list(range(len(train_features[0])))
    evaluated: List[dict] = []
    for size in range(min_size, len(feature_ids) + 1):
        for subset in itertools.combinations(feature_ids, size):
            subset_matrix = select_features(train_features, subset)
            cv_accuracy, l2 = cross_validate_l2(subset_matrix, labels)
            evaluated.append({
                'subset': subset,
                'cv_accuracy': cv_accuracy,
                'l2': l2,
            })
    if not evaluated:
        raise RuntimeError('Unable to evaluate feature subsets.')
    best_accuracy = max(entry['cv_accuracy'] for entry in evaluated)
    volume_penalty = lambda subset: 1 if VOLUME_INDEX in subset else 0
    filtered = [entry for entry in evaluated if entry['cv_accuracy'] >= best_accuracy - tolerance]
    filtered.sort(key=lambda entry: (
        volume_penalty(entry['subset']),
        len(entry['subset']),
        -entry['cv_accuracy'],
        entry['subset'],
    ))
    return filtered[0]

subset_search = search_feature_subsets(X_train_raw, y_train, min_size=2)
selected_indices = subset_search['subset']
X_train = select_features(X_train_raw, selected_indices)
X_test = select_features(X_test_raw, selected_indices)
cv_accuracy = subset_search['cv_accuracy']
best_l2 = subset_search['l2']
selected_names = ', '.join(FEATURE_NAMES[i] for i in selected_indices)
print(f'Best subset: {selected_names} (CV accuracy={cv_accuracy:.3f}, l2={best_l2})')


In [None]:
train_means, train_stds = compute_scaler(X_train)
X_train_scaled = add_bias(apply_scaler(X_train, train_means, train_stds))
X_test_scaled = add_bias(apply_scaler(X_test, train_means, train_stds))
final_weights = train_logistic(X_train_scaled, y_train, l2=best_l2)
test_probabilities = predict_probabilities(X_test_scaled, final_weights)
test_accuracy = accuracy_from_probs(test_probabilities, y_test, 0.5)
print(f'Classification accuracy on 2005 test data (0.5 threshold): {test_accuracy:.3f}')
