In [None]:
# t-SNE Analysis of Human Activity Recognition Dataset

This notebook implements a comprehensive t-SNE analysis pipeline following the research methodology:

1. **Dataset Selection**: Human Activity Recognition dataset (~7K samples)
2. **Hyperparameter Testing**: perplexity=[5,30,50,100], early_exaggeration=[4,12]
3. **PCA Preprocessing**: Dimensionality reduction to 50 components
4. **t-SNE Implementation**: 2D embedding with various parameters
5. **Quantitative Evaluation**: Continuity, Mean Local Error, Shepard Correlations


In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics import pairwise_distances
from scipy.spatial.distance import pdist, squareform
from scipy.stats import spearmanr
import warnings
warnings.filterwarnings('ignore')

# Set style for better plots
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")


In [None]:
# TSNEAnalysis Class Definition
class TSNEAnalysis:
    def __init__(self, data_path):
        """Initialize the t-SNE analysis with dataset path"""
        self.data_path = data_path
        self.data = None
        self.X = None
        self.y = None
        self.X_scaled = None
        self.X_pca = None
        self.results = {}


In [None]:
# Data Loading and Exploration Method
def load_and_explore_data(self):
    """Load data and perform EDA"""
    print("="*50)
    print("LOADING AND EXPLORING DATA")
    print("="*50)
    
    # Load data
    self.data = pd.read_csv(self.data_path)
    print(f"Dataset shape: {self.data.shape}")
    print(f"Dataset size: {self.data.size:,} total values")
    
    # Basic info
    print("\nFirst few rows:")
    print(self.data.head())
    
    print("\nDataset Info:")
    print(self.data.info())
    
    print("\nColumn names:")
    print(self.data.columns.tolist())
    
    # Check for missing values
    missing_values = self.data.isnull().sum()
    print(f"\nMissing values: {missing_values.sum()}")
    if missing_values.sum() > 0:
        print(missing_values[missing_values > 0])
    
    # Statistical summary
    print("\nStatistical Summary:")
    print(self.data.describe())
    
    return self.data

# Add method to class
TSNEAnalysis.load_and_explore_data = load_and_explore_data


In [None]:
# Data Preparation Method
def prepare_data(self):
    """Prepare data for analysis - separate features and target"""
    print("\n" + "="*50)
    print("PREPARING DATA")
    print("="*50)
    
    # Assuming the last column is the target (common in many datasets)
    # Let's identify categorical columns first
    categorical_cols = self.data.select_dtypes(include=['object']).columns
    numerical_cols = self.data.select_dtypes(include=[np.number]).columns
    
    print(f"Categorical columns: {list(categorical_cols)}")
    print(f"Numerical columns: {list(numerical_cols)}")
    
    # If there are categorical columns, assume the last one is target
    if len(categorical_cols) > 0:
        target_col = categorical_cols[-1]
        feature_cols = [col for col in self.data.columns if col != target_col]
    else:
        # If all numerical, assume last column is target
        target_col = self.data.columns[-1]
        feature_cols = self.data.columns[:-1].tolist()
    
    print(f"Target column: {target_col}")
    print(f"Number of features: {len(feature_cols)}")
    
    # Separate features and target
    self.X = self.data[feature_cols]
    self.y = self.data[target_col]
    
    # Encode target if categorical
    if self.y.dtype == 'object':
        le = LabelEncoder()
        self.y_encoded = le.fit_transform(self.y)
        self.label_names = le.classes_
        print(f"Target classes: {self.label_names}")
        print(f"Class distribution:\n{pd.Series(self.y).value_counts()}")
    else:
        self.y_encoded = self.y
        self.label_names = None
        
    print(f"Feature matrix shape: {self.X.shape}")
    print(f"Target vector shape: {self.y_encoded.shape}")
    
    return self.X, self.y_encoded

# Add method to class
TSNEAnalysis.prepare_data = prepare_data


In [None]:
# Helper function for counting high correlations
def _count_high_correlations(self):
    """Count highly correlated feature pairs"""
    if self.X.shape[1] > 100:  # Sample for large datasets
        sample_X = self.X.sample(n=100, axis=1)
    else:
        sample_X = self.X
        
    corr_matrix = sample_X.corr()
    high_corr = (corr_matrix.abs() > 0.9) & (corr_matrix.abs() < 1.0)
    return high_corr.sum().sum() // 2  # Divide by 2 to avoid double counting

# Add helper method to class
TSNEAnalysis._count_high_correlations = _count_high_correlations


In [None]:
# Exploratory Data Analysis Method
def perform_eda(self):
    """Perform Exploratory Data Analysis"""
    print("\n" + "="*50)
    print("EXPLORATORY DATA ANALYSIS")
    print("="*50)
    
    # Create figure for EDA plots
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('Exploratory Data Analysis', fontsize=16)
    
    # 1. Target distribution
    if self.label_names is not None:
        labels = self.label_names
        counts = pd.Series(self.y).value_counts()
    else:
        labels = pd.Series(self.y_encoded).value_counts().index
        counts = pd.Series(self.y_encoded).value_counts()
        
    axes[0,0].pie(counts.values, labels=labels, autopct='%1.1f%%')
    axes[0,0].set_title('Target Distribution')
    
    # 2. Feature correlation heatmap (sample of features if too many)
    if self.X.shape[1] > 20:
        sample_features = self.X.iloc[:, :20]
        axes[0,1].set_title('Correlation Heatmap (First 20 features)')
    else:
        sample_features = self.X
        axes[0,1].set_title('Feature Correlation Heatmap')
        
    corr_matrix = sample_features.corr()
    sns.heatmap(corr_matrix, ax=axes[0,1], cmap='coolwarm', center=0, 
               square=True, cbar_kws={"shrink": .8})
    
    # 3. Feature variance
    feature_vars = self.X.var().sort_values(ascending=False)
    top_vars = feature_vars.head(20)
    axes[1,0].bar(range(len(top_vars)), top_vars.values)
    axes[1,0].set_title('Top 20 Features by Variance')
    axes[1,0].set_xlabel('Feature Index')
    axes[1,0].set_ylabel('Variance')
    
    # 4. Sample feature distributions
    if self.X.shape[1] >= 4:
        sample_cols = self.X.columns[:4]
    else:
        sample_cols = self.X.columns
        
    for i, col in enumerate(sample_cols):
        if i < 4:
            row, col_idx = (1, 1) if i < 2 else (1, 1)
            if i == 0:
                axes[1,1].hist(self.X.iloc[:, i], bins=30, alpha=0.7, label=f'Feature {i+1}')
            elif i == 1:
                axes[1,1].hist(self.X.iloc[:, i], bins=30, alpha=0.7, label=f'Feature {i+1}')
                
    axes[1,1].set_title('Sample Feature Distributions')
    axes[1,1].legend()
    
    plt.tight_layout()
    plt.savefig('eda_plots.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # Print feature statistics
    print(f"\nFeature Statistics:")
    print(f"Number of features: {self.X.shape[1]}")
    print(f"Features with zero variance: {(self.X.var() == 0).sum()}")
    print(f"Highly correlated feature pairs (>0.9): {self._count_high_correlations()}")

# Add method to class
TSNEAnalysis.perform_eda = perform_eda


In [None]:
# Data Normalization Method
def normalize_data(self):
    """Normalize the features"""
    print("\n" + "="*50)
    print("DATA NORMALIZATION")
    print("="*50)
    
    scaler = StandardScaler()
    self.X_scaled = scaler.fit_transform(self.X)
    
    print("Data normalized using StandardScaler (mean=0, std=1)")
    print(f"Original data shape: {self.X.shape}")
    print(f"Normalized data shape: {self.X_scaled.shape}")
    print(f"Normalized data mean: {np.mean(self.X_scaled):.6f}")
    print(f"Normalized data std: {np.std(self.X_scaled):.6f}")
    
    return self.X_scaled

# Add method to class
TSNEAnalysis.normalize_data = normalize_data


In [None]:
# PCA Dimensionality Reduction Method
def apply_pca(self, n_components=50):
    """Apply PCA for dimensionality reduction"""
    print("\n" + "="*50)
    print("PCA DIMENSIONALITY REDUCTION")
    print("="*50)
    
    # Adjust n_components if necessary
    max_components = min(self.X_scaled.shape[0], self.X_scaled.shape[1])
    n_components = min(n_components, max_components)
    
    pca = PCA(n_components=n_components)
    self.X_pca = pca.fit_transform(self.X_scaled)
    
    print(f"PCA applied: {self.X_scaled.shape[1]} -> {n_components} dimensions")
    print(f"Explained variance ratio: {pca.explained_variance_ratio_[:10]}")
    print(f"Cumulative explained variance: {pca.explained_variance_ratio_.cumsum()[:10]}")
    print(f"Total explained variance: {pca.explained_variance_ratio_.sum():.4f}")
    
    # Plot explained variance
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(range(1, min(21, len(pca.explained_variance_ratio_) + 1)), 
            pca.explained_variance_ratio_[:20], 'bo-')
    plt.title('Explained Variance by Component')
    plt.xlabel('Principal Component')
    plt.ylabel('Explained Variance Ratio')
    plt.grid(True)
    
    plt.subplot(1, 2, 2)
    plt.plot(range(1, len(pca.explained_variance_ratio_) + 1), 
            pca.explained_variance_ratio_.cumsum(), 'ro-')
    plt.title('Cumulative Explained Variance')
    plt.xlabel('Number of Components')
    plt.ylabel('Cumulative Explained Variance')
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig('pca_analysis.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    return self.X_pca

# Add method to class
TSNEAnalysis.apply_pca = apply_pca


In [None]:
# t-SNE Experiments Method
def run_tsne_experiments(self, perplexities=[5, 30, 50, 100], 
                       early_exaggerations=[4, 12], use_pca=True):
    """Run t-SNE with various hyperparameters"""
    print("\n" + "="*50)
    print("T-SNE EXPERIMENTS")
    print("="*50)
    
    # Choose input data
    if use_pca and self.X_pca is not None:
        input_data = self.X_pca
        print(f"Using PCA-reduced data: {input_data.shape}")
    else:
        input_data = self.X_scaled
        print(f"Using normalized data: {input_data.shape}")
    
    # Limit data size if too large (t-SNE is computationally expensive)
    if input_data.shape[0] > 5000:
        indices = np.random.choice(input_data.shape[0], 5000, replace=False)
        input_data = input_data[indices]
        y_sample = self.y_encoded[indices] if hasattr(self, 'y_encoded') else self.y_encoded[indices]
        self._tsne_indices = indices  # Store indices for evaluation
        print(f"Sampled data for t-SNE: {input_data.shape}")
    else:
        y_sample = self.y_encoded
        self._tsne_indices = np.arange(input_data.shape[0])  # Store all indices
    
    # Run experiments
    fig, axes = plt.subplots(len(early_exaggerations), len(perplexities), 
                           figsize=(5*len(perplexities), 5*len(early_exaggerations)))
    
    if len(early_exaggerations) == 1:
        axes = axes.reshape(1, -1)
    if len(perplexities) == 1:
        axes = axes.reshape(-1, 1)
    
    for i, early_exag in enumerate(early_exaggerations):
        for j, perplexity in enumerate(perplexities):
            print(f"\nRunning t-SNE: perplexity={perplexity}, early_exaggeration={early_exag}")
            
            tsne = TSNE(n_components=2, perplexity=perplexity, 
                       early_exaggeration=early_exag, random_state=42,
                       max_iter=1000, verbose=0)
            
            tsne_result = tsne.fit_transform(input_data)
            
            # Store results
            self.results[f'perp_{perplexity}_exag_{early_exag}'] = {
                'embedding': tsne_result,
                'perplexity': perplexity,
                'early_exaggeration': early_exag,
                'kl_divergence': tsne.kl_divergence_
            }
            
            # Plot
            scatter = axes[i, j].scatter(tsne_result[:, 0], tsne_result[:, 1], 
                                       c=y_sample, cmap='tab10', alpha=0.7, s=10)
            axes[i, j].set_title(f'Perplexity={perplexity}, Early Exag={early_exag}')
            axes[i, j].set_xlabel('t-SNE 1')
            axes[i, j].set_ylabel('t-SNE 2')
            
            print(f"KL divergence: {tsne.kl_divergence_:.4f}")
    
    plt.tight_layout()
    plt.savefig('tsne_experiments.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    return self.results

# Add method to class
TSNEAnalysis.run_tsne_experiments = run_tsne_experiments


In [None]:
# Helper Methods for Evaluation Metrics
def _calculate_continuity(self, original_distances, embedding_distances, k=7):
    """Calculate continuity metric"""
    n = original_distances.shape[0]
    continuity_scores = []
    
    for i in range(n):
        # Find k nearest neighbors in original space
        orig_neighbors = np.argsort(original_distances[i])[1:k+1]
        
        # Find k nearest neighbors in embedding space
        embed_neighbors = np.argsort(embedding_distances[i])[1:k+1]
        
        # Calculate overlap
        overlap = len(set(orig_neighbors) & set(embed_neighbors))
        continuity_scores.append(overlap / k)
    
    return np.mean(continuity_scores)

def _calculate_local_error(self, original_distances, embedding_distances, k=7):
    """Calculate mean local error"""
    n = original_distances.shape[0]
    local_errors = []
    
    for i in range(n):
        # Find k nearest neighbors in original space
        orig_neighbors = np.argsort(original_distances[i])[1:k+1]
        
        # Calculate rank errors in embedding space
        embed_ranks = np.argsort(np.argsort(embedding_distances[i]))
        
        rank_errors = []
        for neighbor in orig_neighbors:
            original_rank = np.where(np.argsort(original_distances[i]) == neighbor)[0][0]
            embedding_rank = embed_ranks[neighbor]
            rank_errors.append(abs(original_rank - embedding_rank))
        
        local_errors.append(np.mean(rank_errors))
    
    return np.mean(local_errors)

# Add helper methods to class
TSNEAnalysis._calculate_continuity = _calculate_continuity
TSNEAnalysis._calculate_local_error = _calculate_local_error


In [None]:
# Evaluation Summary Plotting Method
def _plot_evaluation_summary(self, evaluation_results):
    """Plot evaluation metrics summary"""
    metrics = ['shepard_correlation', 'continuity', 'local_error', 'kl_divergence']
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    axes = axes.flatten()
    
    for i, metric in enumerate(metrics):
        values = [result[metric] for result in evaluation_results.values()]
        labels = list(evaluation_results.keys())
        
        axes[i].bar(range(len(values)), values)
        axes[i].set_title(f'{metric.replace("_", " ").title()}')
        axes[i].set_xticks(range(len(labels)))
        axes[i].set_xticklabels(labels, rotation=45, ha='right')
        
        # Add value labels on bars
        for j, v in enumerate(values):
            axes[i].text(j, v, f'{v:.3f}', ha='center', va='bottom')
    
    plt.tight_layout()
    plt.savefig('evaluation_summary.png', dpi=300, bbox_inches='tight')
    plt.show()

# Add method to class
TSNEAnalysis._plot_evaluation_summary = _plot_evaluation_summary


In [None]:
# Embedding Evaluation Method
def evaluate_embeddings(self, use_pca=True):
    """Evaluate embeddings using various metrics"""
    print("\n" + "="*50)
    print("EMBEDDING EVALUATION")
    print("="*50)
    
    # Choose original data
    if use_pca and self.X_pca is not None:
        original_data = self.X_pca
    else:
        original_data = self.X_scaled
    
    # Use consistent sampling for both original data and embeddings
    n_eval_samples = min(1000, original_data.shape[0])
    
    # Get indices that were used for t-SNE (if sampling was done)
    if hasattr(self, '_tsne_indices'):
        # Use the same indices that were used for t-SNE
        available_indices = self._tsne_indices[:n_eval_samples]
        original_sample = original_data[available_indices]
    else:
        # Create new sample
        indices = np.random.choice(original_data.shape[0], n_eval_samples, replace=False)
        original_sample = original_data[indices]
    
    print(f"Evaluating with {original_sample.shape[0]} samples")
    
    # Compute original distances
    original_distances = pairwise_distances(original_sample)
    
    evaluation_results = {}
    
    for key, result in self.results.items():
        embedding = result['embedding']
        
        # Use the same number of samples for embedding
        embedding_sample = embedding[:n_eval_samples]
        
        # Compute embedding distances
        embedding_distances = pairwise_distances(embedding_sample)
        
        # Flatten distance matrices for correlation
        orig_dist_flat = original_distances[np.triu_indices_from(original_distances, k=1)]
        embed_dist_flat = embedding_distances[np.triu_indices_from(embedding_distances, k=1)]
        
        # Calculate metrics
        # 1. Spearman correlation (Shepard correlation)
        shepard_corr, _ = spearmanr(orig_dist_flat, embed_dist_flat)
        
        # 2. Continuity (preservation of neighborhoods)
        continuity = self._calculate_continuity(original_distances, embedding_distances)
        
        # 3. Mean Local Error
        local_error = self._calculate_local_error(original_distances, embedding_distances)
        
        evaluation_results[key] = {
            'shepard_correlation': shepard_corr,
            'continuity': continuity,
            'local_error': local_error,
            'kl_divergence': result['kl_divergence']
        }
        
        print(f"\n{key}:")
        print(f"  Shepard Correlation: {shepard_corr:.4f}")
        print(f"  Continuity: {continuity:.4f}")
        print(f"  Local Error: {local_error:.4f}")
        print(f"  KL Divergence: {result['kl_divergence']:.4f}")
    
    # Create evaluation summary plot
    self._plot_evaluation_summary(evaluation_results)
    
    return evaluation_results

# Add method to class
TSNEAnalysis.evaluate_embeddings = evaluate_embeddings


In [None]:
# Best Configuration Finding Method
def _find_best_configuration(self):
    """Find best t-SNE configuration based on multiple metrics"""
    scores = {}
    
    for key, metrics in self.evaluation_results.items():
        # Normalize metrics (higher is better for shepard_correlation and continuity)
        # Lower is better for local_error and kl_divergence
        score = (metrics['shepard_correlation'] + metrics['continuity'] - 
                metrics['local_error']/10 - metrics['kl_divergence']/100)
        scores[key] = score
    
    return max(scores.items(), key=lambda x: x[1])

# Add method to class
TSNEAnalysis._find_best_configuration = _find_best_configuration


In [None]:
# Report Generation Method
def generate_report(self):
    """Generate a comprehensive analysis report"""
    print("\n" + "="*50)
    print("ANALYSIS REPORT")
    print("="*50)
    
    print(f"Dataset: {self.data_path}")
    print(f"Original dimensions: {self.X.shape}")
    print(f"Number of classes: {len(np.unique(self.y_encoded))}")
    
    if hasattr(self, 'X_pca') and self.X_pca is not None:
        print(f"PCA dimensions: {self.X_pca.shape}")
    
    print(f"\nt-SNE Experiments conducted: {len(self.results)}")
    
    # Find best configuration based on multiple metrics
    if hasattr(self, 'evaluation_results'):
        best_config = self._find_best_configuration()
        print(f"\nBest configuration based on combined metrics: {best_config}")
    
    print("\nFiles generated:")
    print("- eda_plots.png: Exploratory data analysis")
    print("- pca_analysis.png: PCA variance analysis")
    print("- tsne_experiments.png: t-SNE results with different parameters")
    print("- evaluation_summary.png: Evaluation metrics comparison")

# Add method to class
TSNEAnalysis.generate_report = generate_report


In [None]:
# Complete Analysis Pipeline Method
def run_complete_analysis(self):
    """Run the complete t-SNE analysis pipeline"""
    print("Starting Complete t-SNE Analysis Pipeline")
    print("="*60)
    
    # Step 1: Load and explore data
    self.load_and_explore_data()
    
    # Step 2: Prepare data
    self.prepare_data()
    
    # Step 3: EDA
    self.perform_eda()
    
    # Step 4: Normalization
    self.normalize_data()
    
    # Step 5: PCA (optional)
    self.apply_pca(n_components=50)
    
    # Step 6: t-SNE experiments
    self.run_tsne_experiments(
        perplexities=[5, 30, 50, 100],
        early_exaggerations=[4, 12],
        use_pca=True
    )
    
    # Step 7: Evaluation
    self.evaluation_results = self.evaluate_embeddings(use_pca=True)
    
    # Step 8: Generate report
    self.generate_report()
    
    print("\nAnalysis completed successfully!")
    return self

# Add method to class
TSNEAnalysis.run_complete_analysis = run_complete_analysis


In [None]:
## Initialize Analysis

Create the analysis object and run the complete pipeline. You can run this cell to execute the full analysis, or run individual cells below for step-by-step analysis.


In [None]:
# Initialize Analysis and Run Complete Pipeline
analysis = TSNEAnalysis("train.csv")

# Run complete analysis
analysis.run_complete_analysis()


In [None]:
## Alternative: Step-by-Step Analysis

If you prefer to run the analysis step by step, you can use the cells below instead of the complete pipeline above. This allows for more interactive exploration and customization of parameters.


In [None]:
# Step 1: Initialize and Load Data
# analysis = TSNEAnalysis("train.csv")
# analysis.load_and_explore_data()


In [None]:
# Step 2: Prepare Data
# analysis.prepare_data()


In [None]:
# Step 3: Exploratory Data Analysis
# analysis.perform_eda()


In [None]:
# Step 4: Data Normalization
# analysis.normalize_data()


In [None]:
# Step 5: PCA Dimensionality Reduction
# analysis.apply_pca(n_components=50)


In [None]:
# Step 6: t-SNE Experiments
# analysis.run_tsne_experiments(
#     perplexities=[5, 30, 50, 100],
#     early_exaggerations=[4, 12],
#     use_pca=True
# )


In [None]:
# Step 7: Evaluate Embeddings
# analysis.evaluation_results = analysis.evaluate_embeddings(use_pca=True)


In [None]:
# Step 8: Generate Report
# analysis.generate_report()


In [None]:
## Notes

### Usage Options:

1. **Complete Pipeline**: Run cell 17 to execute the entire analysis at once
2. **Step-by-Step**: Uncomment and run cells 19-26 individually for interactive analysis

### Customization:

- Modify `perplexities` and `early_exaggerations` parameters in the t-SNE experiments
- Adjust `n_components` for PCA to test different dimensionalities
- Change evaluation sample size in `evaluate_embeddings()` method

### Generated Files:

- `eda_plots.png`: Exploratory data analysis visualizations
- `pca_analysis.png`: PCA variance explanation plots
- `tsne_experiments.png`: t-SNE results with all parameter combinations
- `evaluation_summary.png`: Quantitative evaluation metrics comparison

### Evaluation Metrics:

- **Shepard Correlation**: Measures preservation of pairwise distances (higher is better)
- **Continuity**: Fraction of k-nearest neighbors preserved (higher is better)
- **Local Error**: Average ranking error in neighborhoods (lower is better)
- **KL Divergence**: t-SNE optimization objective (lower is better)
