In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
%matplotlib inline

In [3]:
from IPython.display import display

In [4]:
import os
import time
import logging
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
%matplotlib inline

In [None]:
import torch
import torch.nn as nn

In [None]:
from sklearn.metrics import mean_squared_error
from skimage.metrics import structural_similarity as ssim

In [5]:
from datascifuncs.tidbit_tools import load_json, write_json, print_json, check_directory_name

In [6]:
main_dir = 'EmotionFaceClassifier'
check_directory_name(main_dir)

Directory set to /Users/dsl/Documents/GitHub/EmotionFaceClassifier, matches target dir string EmotionFaceClassifier.


True

In [7]:
from utils.decomposition_feature_extract import create_X_y

In [8]:
common_dicts = load_json('./configs/input_mappings.json')

In [9]:
emotion_colors = common_dicts['plotly_styles']['Training']['color']

In [10]:
# Read in FER 2013 data
fer2013_path = 'data/fer2013_paths.csv'
fer2013 = pd.read_csv(fer2013_path)

In [11]:
fer2013.head()

Unnamed: 0,emotion_id,pixels,Usage,emotion,image,usage,emo_count_id,img_path,color
0,0,70 80 82 72 58 58 60 63 54 58 60 48 89 115 121...,Training,Angry,[[ 70 80 82 ... 52 43 41]\n [ 65 61 58 ...,Training,1,data/Training/Angry/Angry-1.jpg,red
1,0,151 150 147 155 148 133 111 140 170 174 182 15...,Training,Angry,[[151 150 147 ... 129 140 120]\n [151 149 149 ...,Training,2,data/Training/Angry/Angry-2.jpg,red
2,2,231 212 156 164 174 138 161 173 182 200 106 38...,Training,Fear,[[231 212 156 ... 44 27 16]\n [229 175 148 ...,Training,1,data/Training/Fear/Fear-1.jpg,slategray
3,4,24 32 36 30 32 23 19 20 30 41 21 22 32 34 21 1...,Training,Sad,[[ 24 32 36 ... 173 172 173]\n [ 25 34 29 ...,Training,1,data/Training/Sad/Sad-1.jpg,blue
4,6,4 0 0 0 0 0 0 0 0 0 0 0 3 15 23 28 48 50 58 84...,Training,Neutral,[[ 4 0 0 ... 27 24 25]\n [ 1 0 0 ... 26 23...,Training,1,data/Training/Neutral/Neutral-1.jpg,sienna


In [12]:
# Select training data
print(fer2013.shape)
train_df = fer2013[fer2013['usage']=='Training']
print(train_df.shape)

(35887, 9)
(28709, 9)


In [13]:
train_df['emotion'].unique()

array(['Angry', 'Fear', 'Sad', 'Neutral', 'Happy', 'Surprise', 'Disgust'],
      dtype=object)

In [17]:
X, y = create_X_y(train_df, 'img_path', 'emotion')

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [None]:
# Assume X and y are loaded as numpy arrays
X = torch.tensor(X, dtype=torch.float32).to(device)
y = torch.tensor(y).to(device)

In [None]:
class PCA(nn.Module):
    def __init__(self, n_components):
        super().__init__()
        self.n_components = n_components

    def fit(self, X):
        self.mean_ = torch.mean(X, dim=0)
        X_centered = X - self.mean_
        U, S, V = torch.pca_lowrank(X_centered, q=self.n_components)
        self.components_ = V.T

    def transform(self, X):
        X_centered = X - self.mean_
        return torch.matmul(X_centered, self.components_.T)

    def inverse_transform(self, X_transformed):
        return torch.matmul(X_transformed, self.components_) + self.mean_

In [None]:
def calculate_metrics(X_true, X_pred):
    X_true_np = X_true.cpu().numpy()
    X_pred_np = X_pred.cpu().numpy()
    
    mse = mean_squared_error(X_true_np, X_pred_np)
    psnr = 10 * np.log10((255**2) / mse)  # Assuming pixel values are in [0, 255]
    
    # Reshape if necessary (assuming images are square)
    img_size = int(np.sqrt(X_true_np.shape[1]))
    X_true_2d = X_true_np.reshape(-1, img_size, img_size)
    X_pred_2d = X_pred_np.reshape(-1, img_size, img_size)
    
    ssim_value = ssim(X_true_2d, X_pred_2d, 
                      data_range=X_true_2d.max() - X_true_2d.min(), 
                      multichannel=True)
    
    return {
        'MSE': mse,
        'PSNR': psnr,
        'SSIM': ssim_value
    }

In [None]:
def run_single_analysis(X, y, analysis_config):
    start_time = time.time()
    
    logging.info("Starting analysis")

    # Check for GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device: {device}")

    
    n_components = analysis_config['total_components']
    model = PCA(n_components).to(device)

    logging.info("Fitting PCA model")
    model.fit(X)
    
    logging.info("Transforming data")
    features = model.transform(X)

    results = []
    for category in torch.unique(y):
        X_category = X[y == category]
        features_category = features[y == category]

        for recon_components in analysis_config['components_for_reconstruction']:
            logging.info(f"Processing category {category.item()} with {recon_components} components")
            
            partial_features = torch.zeros_like(features_category)
            partial_features[:, :recon_components] = features_category[:, :recon_components]
            
            recon_images = model.inverse_transform(partial_features)
            avg_image = torch.mean(recon_images, dim=0)

            metrics = calculate_metrics(X_category, recon_images)

            results.append({
                'category': category.item(),
                'components': recon_components,
                'avg_image': avg_image.cpu().numpy(),
                'metrics': metrics
            })

    end_time = time.time()
    total_time = end_time - start_time
    logging.info(f"Analysis completed in {total_time:.2f} seconds")

    return results, total_time

In [None]:
analysis_config = {
    'total_components': 100,
    'components_for_reconstruction': [1, 10, 30, 50, 100]
}

In [None]:
results, total_time = run_single_analysis(X, y, analysis_config)

In [None]:
# Saving results
save_dir = os.path.join('models', 'unsupervised')
os.makedirs(save_dir, exists_ok=True)
save_file_name = 'pca_results.pt'
save_path = os.path.join(save_dir, save_file_name)
torch.save({
    'results': results,
    'total_time': total_time,
    'config': analysis_config
}, save_path)
logging.info(f"Results saved to {save_path}")

# If you prefer numpy compressed format:
npz_file_name = 'pca_results.npz'
npz_save_path = os.path.join(save_dir, npz_file_name)
np_results = np.array(results, dtype=object)
np.savez_compressed(npz_save_path, results=np_results, total_time=total_time, config=analysis_config)
logging.info("Results also saved in numpy compressed format")