In [None]:
# Mounting Google Drive
from google.colab import drive
drive.mount('/content/drive')


In [None]:
# Import libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import classification_report
import numpy as np

from sklearn import preprocessing
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler
from itertools import chain
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder


In [None]:
import pandas as pd
df = pd.read_csv('/content/drive/MyDrive/IS675_data/DL Project/age_gender.csv')

# Check the structure and data types
print(df.info())
# View the first few rows
print(df.head())

# 1. Check for missing values
missing_values = df.isnull().sum()
print("Missing Values:\n", missing_values[missing_values > 0])

# 2. Check for duplicate rows
duplicates = df.duplicated().sum()
print(f"Number of duplicate rows: {duplicates}")

# 3. Check for data types and inconsistent data types
data_types = df.dtypes
print("Data Types:\n", data_types)


In [None]:
def clean_pixels(df):
    # Check if the 'pixels' column is of type string (only convert if it is)
    if df['pixels'].dtype == 'object':  # Check data type of the column
        # Convert pixels string to list of integers
        df['pixels'] = df['pixels'].apply(lambda x: list(map(int, x.split())))
    return df

df = pd.read_csv('/content/drive/MyDrive/IS675_data/DL Project/age_gender.csv')

# Apply the cleaning function (only once)
df_cleaned = clean_pixels(df)

# Display the first few rows of the cleaned dataframe
print("Sample of cleaned data:")
print(df_cleaned.head())

# Check for any missing values
print("\
Missing values:")
print(df_cleaned.isnull().sum())


In [None]:
# Check the number of records (rows) and variables (columns)
num_records, num_variables = df.shape
print(f"Number of records (rows): {num_records}")
print(f"Number of variables (columns): {num_variables}")


In [None]:
# Age statistics
print(df['age'].describe())


In [None]:
# Gender and ethnicity counts
print(df['gender'].value_counts())
print(df['ethnicity'].value_counts())


In [None]:
# Let's do some basic analysis of the age, ethnicity, and gender distributions
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Create age distribution plot
plt.figure(figsize=(10, 6))
sns.histplot(data=df, x='age', bins=30)
plt.title('Age Distribution')
plt.xlabel('Age')
plt.ylabel('Count')
plt.show()

# Display summary statistics for age
print("\
Age Statistics:")
print(df['age'].describe())

# Display value counts for ethnicity and gender
print("\Ethnicity Distribution:")
print(df['ethnicity'].value_counts())

print("\Gender Distribution:")
print(df['gender'].value_counts())


In [None]:
import matplotlib.pyplot as plt

# Gender distribution plot
plt.figure(figsize=(8, 5))

# Create a bar plot for gender distribution
gender_counts = df['gender'].value_counts()
bars = gender_counts.plot(kind='bar', edgecolor='black', width=0.10)  # Adjust width for thinner bars

# Apply a colormap to the bars (similar shades as before)
for i, bar in enumerate(bars.patches):
    shade = i / len(bars.patches)  # Vary the shade from light to dark
    bar.set_facecolor(plt.cm.plasma(shade))  # Apply the plasma colormap with varying shades

# Calculate the percentage of each gender
total_count = gender_counts.sum()
percentages = (gender_counts / total_count) * 100

# Add the percentage text on top of the bars
for i, bar in enumerate(bars.patches):
    percentage = percentages[i]
    # Display the percentage value above the bar
    plt.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 1,
             f'{percentage:.2f}%',
             ha='center', va='bottom', fontsize=12, color='black')

plt.title('Gender Distribution', fontsize=14, color='darkblue')
plt.xlabel('Gender (0=Male, 1=Female)', fontsize=12, color='darkgreen')
plt.ylabel('Count', fontsize=12, color='darkgreen')
plt.grid(False)  # Remove grid for a cleaner look
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Check if the 'pixels' column contains strings before applying the split
if isinstance(df['pixels'].iloc[0], str):
    df['pixels'] = df['pixels'].apply(lambda x: np.array(x.split(), dtype="float32"))

# Convert any lists to NumPy arrays explicitly (if necessary)
df['pixels'] = df['pixels'].apply(np.array)

# Plot a few sample images
fig, axes = plt.subplots(1, 5, figsize=(15, 5))
for i, ax in enumerate(axes):
    # Ensure proper reshaping
    pixels = df['pixels'].iloc[i].reshape(48, 48)  # Reshape to 48x48
    ax.imshow(pixels, cmap='gray')
    ax.axis('off')
plt.show()


In [None]:
df['age'] = df['age'].fillna(df['age'].mean()) #Handle Missing Values
df['pixels'] = df['pixels'].apply(lambda x: np.array(x) / 255.0) #Normalize the pixel values
df['age'] = df['age'] / 100.0 #Normalize the age column


In [None]:
# Filter out rows with invalid pixel shapes
df = df[df['pixels'].apply(lambda x: x.shape == (48 * 48,))]

# Verify again
shapes = df['pixels'].apply(lambda x: x.shape)
print("Remaining unique shapes in 'pixels':", shapes.unique())


In [None]:
# Convert the 'pixels' column into a 3D NumPy array
X = np.stack(df['pixels'].values).reshape(-1, 48, 48)  # Reshape to (num_samples, 48, 48)

# Extract the target variable
y = df['age'].values

# Verify the shapes
print(f"Features (X): {X.shape}, Target (y): {y.shape}")


In [None]:
from sklearn.model_selection import train_test_split

# Split data into training (80%) and temporary (20%) sets
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42)

# Further split the temporary set into validation (10%) and test (10%) sets
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Print the shapes to verify
print(f"Training set: {X_train.shape}, {y_train.shape}")
print(f"Validation set: {X_val.shape}, {y_val.shape}")
print(f"Test set: {X_test.shape}, {y_test.shape}")


In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# Convert data to tensors and add channel dimension
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32).unsqueeze(1),
                               torch.tensor(y_train, dtype=torch.float32))
val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32).unsqueeze(1),
                             torch.tensor(y_val, dtype=torch.float32))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32).unsqueeze(1),
                              torch.tensor(y_test, dtype=torch.float32))

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Verify DataLoader outputs
for inputs, targets in train_loader:
    print(f"Inputs shape: {inputs.shape}, Targets shape: {targets.shape}")
    break


In [None]:
from torchvision.models import resnet18, ResNet18_Weights
import torch.nn as nn

# Load the pre-trained ResNet-18 model with updated weights parameter
model = resnet18(weights=ResNet18_Weights.DEFAULT)

# Modify the first convolutional layer to accept 1 channel (grayscale)
model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)

# Modify the final fully connected layer for regression (1 output for age)
model.fc = nn.Linear(model.fc.in_features, 1)

# Move model to the appropriate device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


In [None]:
import torch.optim as optim

# Define the loss function
criterion = nn.MSELoss()

# Define the optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4)


In [None]:
num_epochs = 10  # You can adjust this depending on the dataset size and performance

for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0

    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), targets)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Calculate average training loss for the epoch
    train_loss = running_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {train_loss:.4f}")

    # Validation step
    model.eval()  # Set model to evaluation mode
    val_loss = 0.0
    with torch.no_grad():
        for val_inputs, val_targets in val_loader:
            val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
            val_outputs = model(val_inputs)
            val_loss += criterion(val_outputs.squeeze(), val_targets).item()

    # Calculate average validation loss
    val_loss /= len(val_loader)
    print(f"Validation Loss: {val_loss:.4f}")


In [None]:
model.eval()
test_loss = 0.0
absolute_error = 0.0

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        test_loss += criterion(outputs.squeeze(), targets).item()
        absolute_error += torch.sum(torch.abs(outputs.squeeze() - targets)).item()

# Calculate average test loss and Mean Absolute Error (MAE)
test_loss /= len(test_loader)
mae = absolute_error / len(test_dataset)

print(f"Test Loss (MSE): {test_loss:.4f}")
print(f"Mean Absolute Error (MAE): {mae:.4f}")


In [None]:
import matplotlib.pyplot as plt

# Fetch a batch of data from the test loader
inputs, targets = next(iter(test_loader))
inputs, targets = inputs.to(device), targets.to(device)

# Get model predictions
model.eval()
with torch.no_grad():
    predictions = model(inputs)

# Convert normalized predictions and targets back to the original scale
predictions = predictions.squeeze().cpu().numpy() * 100  # Scale back
targets = targets.cpu().numpy() * 100  # Scale back

# Plot the results
plt.figure(figsize=(10, 6))
plt.plot(targets, label="Actual Values", marker='o', linestyle='--')
plt.plot(predictions, label="Predicted Values", marker='o', linestyle='-')
plt.xlabel("Sample Index")
plt.ylabel("Age")
plt.title("Actual vs Predicted Ages (Original Scale)")
plt.legend()
plt.show()


In [None]:
# Define the learning rate scheduler
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=2, factor=0.5)

# Training loop with learning rate logging
num_epochs = 10
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Training step
    model.train()
    train_loss = 0.0
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)

    # Validation step
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            val_loss += criterion(outputs.squeeze(), targets).item()

    val_loss /= len(val_loader)

    # Update the learning rate based on validation loss
    scheduler.step(val_loss)

    # Get current learning rate
    current_lr = scheduler.optimizer.param_groups[0]['lr']

    # Save the best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_tuned_model_with_scheduler.pth')

    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}, Learning Rate: {current_lr:.6f}")

print("Training complete. Best Validation Loss: {:.4f}".format(best_val_loss))


In [None]:
# Load the model weights only
model.load_state_dict(torch.load('best_tuned_model_with_scheduler.pth', weights_only=True))
model.eval()

test_loss = 0.0
absolute_error = 0.0

with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        test_loss += criterion(outputs.squeeze(), targets).item()
        absolute_error += torch.sum(torch.abs(outputs.squeeze() - targets)).item()

# Calculate average test loss and Mean Absolute Error (MAE)
test_loss /= len(test_loader)
mae = absolute_error / len(test_dataset)

print(f"Test Loss (MSE): {test_loss:.4f}")
print(f"Mean Absolute Error (MAE): {mae:.4f}")


In [None]:
!pip install optuna optuna-integration[pytorch_lightning]


In [None]:
import optuna
from optuna.integration import PyTorchLightningPruningCallback

def objective(trial):
    # Define the hyperparameter space
    lr = trial.suggest_float('lr', 1e-4, 1e-2, log=True)
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128])
    optimizer_name = trial.suggest_categorical('optimizer', ['adam', 'sgd', 'rmsprop'])

    # Set up DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Load and configure the model
    model = resnet18(weights=ResNet18_Weights.DEFAULT)
    model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
    model.fc = nn.Linear(model.fc.in_features, 1)
    model = model.to(device)

    # Define optimizer
    if optimizer_name == 'adam':
        optimizer = optim.Adam(model.parameters(), lr=lr)
    elif optimizer_name == 'sgd':
        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    elif optimizer_name == 'rmsprop':
        optimizer = optim.RMSprop(model.parameters(), lr=lr)

    # Define loss function
    criterion = nn.MSELoss()

    # Train and evaluate
    val_loss = train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, num_epochs=3)
    return val_loss

# Run Bayesian Optimization
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=20)

# Best hyperparameters
print("Best hyperparameters:", study.best_params)
print("Best validation loss:", study.best_value)


In [None]:
# Set up DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64)

# Load and configure the model
model = resnet18(weights=ResNet18_Weights.DEFAULT)
model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
model.fc = nn.Linear(model.fc.in_features, 1)
model = model.to(device)

# Define optimizer and loss
optimizer = optim.Adam(model.parameters(), lr=0.00042021477048710334)
criterion = nn.MSELoss()

# Train with the best hyperparameters
num_epochs = 10  # Adjust based on computational resources
train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, num_epochs=num_epochs)


In [None]:
# Test evaluation
test_loss = 0.0
absolute_error = 0.0

model.eval()
with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        test_loss += criterion(outputs.squeeze(), targets).item()
        absolute_error += torch.sum(torch.abs(outputs.squeeze() - targets)).item()

# Calculate average test loss and Mean Absolute Error (MAE)
test_loss /= len(test_loader)
mae = absolute_error / len(test_dataset)

print(f"Test Loss (MSE): {test_loss:.4f}")
print(f"Mean Absolute Error (MAE): {mae:.4f}")


In [None]:
# Generate a html file
!jupyter nbconvert --to html "/content/drive/MyDrive/IS675_data/DL Project/Final_DL.ipynb"
