In [None]:
"""
CSIRO Image2Biomass - Tools for EDA, data quality checks, and model validation
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')

In [None]:
class DataValidator:
    """Validate data quality and identify anomalies"""

    def __init__(self, train_csv_path: str):
        self.df = pd.read_csv(train_csv_path)
        self.df_pivot = self._pivot_to_wide()

    def _pivot_to_wide(self) -> pd.DataFrame:
        """Convert long format to wide format"""
        return self.df.pivot_table(
            index=['image_path', 'Sampling_Date', 'State', 'Species', 
                   'Pre_GSHH_NDVI', 'Height_Ave_cm'],
            columns='target_name',
            values='target'
        ).reset_index()
    
    def check_hierarchical_consistency(self) -> pd.DataFrame:
        """Check if GDM = Green + Clover and Total = GDM + Dead"""
        df = self.df_pivot.copy()
        
        # Calculate expected values
        df['GDM_expected'] = df['Dry_Green_g'] + df['Dry_Clover_g']
        df['Total_expected'] = df['GDM_g'] + df['Dry_Dead_g']
        
        # Calculate discrepancies
        df['GDM_error'] = np.abs(df['GDM_g'] - df['GDM_expected'])
        df['Total_error'] = np.abs(df['Dry_Total_g'] - df['Total_expected'])
        
        # Flag inconsistent samples
        threshold = 1.0  # 1 gram tolerance
        inconsistent = df[
            (df['GDM_error'] > threshold) | (df['Total_error'] > threshold)
        ]
        
        print(f"Total samples: {len(df)}")
        print(f"Inconsistent samples: {len(inconsistent)} ({len(inconsistent)/len(df)*100:.2f}%)")
        print(f"Mean GDM error: {df['GDM_error'].mean():.3f}g")
        print(f"Mean Total error: {df['Total_error'].mean():.3f}g")
        
        return inconsistent

    def analyze_state_distribution(self) -> pd.DataFrame:
        """Analyze biomass distributions by state"""
        df = self.df_pivot.copy()

        # Group by state
        state_stats = df.groupby('State').agg({
            'Dry_Green_g': ['mean', 'std', 'min', 'max'],
            'Dry_Dead_g': ['mean', 'std', 'min', 'max', lambda x: (x == 0).sum()],
            'Dry_Clover_g': ['mean', 'std', 'min', 'max', lambda x: (x == 0).sum()],
            'Dry_Total_g': ['mean', 'std', 'min', 'max']
        })

        # Check for WA anomaly
        wa_dead = df[df['State'] == 'WA']['Dry_Dead_g']
        print('\nState-wise statistics:')
        if len(wa_dead) > 0:
            print(f"\nWA Dead Biomass Analysis:")
            print(f"  Mean: {wa_dead.mean():.3f}g")
            print(f"  Zeros: {(wa_dead == 0).sum()}/{len(wa_dead)} ({(wa_dead == 0).sum()/len(wa_dead)*100:.1f}%)")
        
        return state_stats

    def analyze_zero_inflation(self) -> Dict[str, float]:
        """Analyze zero-inflation in each target"""
        df = self.df_pivot.copy()

        targets = ['Dry_Green_g', 'Dry_Dead_g', 'Dry_Clover_g', 'GDM_g', 'Dry_Total_g']
        zero_ratios = {}

        print("\nZero-Inflation Analysis:")
        for target in targets:
            n_zeros = (df[target] == 0).sum()
            ratio = n_zeros / len(df) * 100
            zero_ratios[target] = ratio
            print(f"  {target}: {n_zeros}/{len(df)} ({ratio:.2f}% zeros)")
            
            # Additional stats for non-zero values
            non_zero = df[df[target] > 0][target]
            if len(non_zero) > 0:
                print(f"Non-zero: mean={non_zero.mean():.2f}, std={non_zero.std():.2f}, cv={non_zero.std()/non_zero.mean():.2f}")
        
        return zero_ratios
    
    def check_correlation_with_metadata(self):
        """Analyze correlation between targets and metadata"""
        df = self.df_pivot.copy()
        
        # Select numeric columns
        numeric_cols = ['Pre_GSHH_NDVI', 'Height_Ave_cm', 
                       'Dry_Green_g', 'Dry_Dead_g', 'Dry_Clover_g', 
                       'GDM_g', 'Dry_Total_g']
        
        # Remove NaN values
        df_clean = df[numeric_cols].dropna()
        
        # Calculate correlation
        corr_matrix = df_clean.corr()
        
        # Plot heatmap
        plt.figure(figsize=(10, 8))
        sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap='coolwarm', 
                   center=0, square=True, linewidths=1)
        plt.title('Correlation Matrix: Metadata vs Targets')
        plt.tight_layout()
        plt.savefig('correlation_matrix.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        print("\nCorrelation Analysis:")
        print("NDVI correlations:")
        print(corr_matrix['Pre_GSHH_NDVI'].sort_values(ascending=False))
        print("\nHeight correlations:")
        print(corr_matrix['Height_Ave_cm'].sort_values(ascending=False))
        
        return corr_matrix
    
    def detect_outliers(self, n_std: float = 3.0) -> pd.DataFrame:
        """Detect outliers using z-score method"""
        df = self.df_pivot.copy()
        
        targets = ['Dry_Green_g', 'Dry_Dead_g', 'Dry_Clover_g', 'GDM_g', 'Dry_Total_g']
        outliers = []
        
        for target in targets:
            mean = df[target].mean()
            std = df[target].std()
            z_scores = np.abs((df[target] - mean) / std)
            
            target_outliers = df[z_scores > n_std]
            outliers.append({
                'target': target,
                'n_outliers': len(target_outliers),
                'outlier_indices': target_outliers.index.tolist()
            })
            
            print(f"{target}: {len(target_outliers)} outliers (>{n_std}σ)")
        
        return pd.DataFrame(outliers)

In [None]:
class PredictionAnalyzer:
    """Analyze model predictions and calculate metrics"""

    def __init__(self, 
                 predictions: np.ndarray, 
                 targets: np.ndarray,
                 competition_weights: np.ndarray):
        """
        Args:
            predictions: (N, 5) array of predictions
            targets: (N, 5) array of ground truth
            competition_weights: (5,) array of target weights
        """
        self.predictions = predictions
        self.targets = targets
        self.weights = competition_weights
        self.target_names = ['Dry_Green_g', 'Dry_Dead_g', 'Dry_Clover_g', 'GDM_g', 'Dry_Total_g']
    
    def calculate_r2_score(self) -> Dict[str, float]:
        """Caculate R2 score for each target and weighted average"""
        results = {}

        # Pre-target R2 scores
        for i, name in enumerate(self.target_names):
            y_true = self.targets[:, i]
            y_pred = self.predictions[:, i]
            
            ss_res = np.sum((y_true - y_pred) ** 2)
            ss_tot = np.sum((y_true - y_true.mean()) ** 2)
            r2 = 1 - (ss_res / (ss_tot + 1e-8))
            
            results[f'r2_{name}'] = r2
        
        # Weighted global R2
        pred_flat = self.predictions.flatten()
        target_flat = self.targets.flatten()
        weights_expanded = np.repeat(self.weights, len(self.targets))
        
        weighted_mean = np.sum(weights_expanded * target_flat) / np.sum(weights_expanded)
        ss_res = np.sum(weights_expanded * (target_flat - pred_flat) ** 2)
        ss_tot = np.sum(weights_expanded * (target_flat - weighted_mean) ** 2)
        
        results['r2_weighted_global'] = 1 - (ss_res / (ss_tot + 1e-8))
        
        return results
    
    def calculate_mae_rmse(self) -> Dict[str, float]:
        """Calculate MAE and RMSE for each target"""
        results = {}
        
        for i, name in enumerate(self.target_names):
            y_true = self.targets[:, i]
            y_pred = self.predictions[:, i]
            
            mae = np.mean(np.abs(y_true - y_pred))
            rmse = np.sqrt(np.mean((y_true - y_pred) ** 2))
            
            results[f'mae_{name}'] = mae
            results[f'rmse_{name}'] = rmse
        
        return results
    
    def plot_predictions_vs_actual(self, save_path: str = 'pred_vs_actual.png'):
        """Create scatter plots of predictions vs actual values"""
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        axes = axes.flatten()
        
        for i, name in enumerate(self.target_names):
            ax = axes[i]
            y_true = self.targets[:, i]
            y_pred = self.predictions[:, i]
            
            # Scatter plot
            ax.scatter(y_true, y_pred, alpha=0.5, s=20)
            
            # Ideal line
            max_val = max(y_true.max(), y_pred.max())
            ax.plot([0, max_val], [0, max_val], 'r--', lw=2, label='Ideal')
            
            # Calculate R2
            ss_res = np.sum((y_true - y_pred) ** 2)
            ss_tot = np.sum((y_true - y_true.mean()) ** 2)
            r2 = 1 - (ss_res / (ss_tot + 1e-8))
            
            ax.set_xlabel('Actual (g)', fontsize=10)
            ax.set_ylabel('Predicted (g)', fontsize=10)
            ax.set_title(f'{name}\nR² = {r2:.4f}', fontsize=11)
            ax.legend()
            ax.grid(True, alpha=0.3)
        
        # Remove extra subplot
        fig.delaxes(axes[5])
        
        plt.tight_layout()
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close()
        print(f"Plot saved to {save_path}")

    def analyze_residuals(self) -> pd.DataFrame:
        """Analyze prediction residuals"""
        residuals = self.targets - self.predictions
        
        results = []
        for i, name in enumerate(self.target_names):
            res = residuals[:, i]
            results.append({
                'target': name,
                'mean_residual': res.mean(),
                'std_residual': res.std(),
                'max_abs_residual': np.abs(res).max(),
                'median_abs_residual': np.median(np.abs(res))
            })
        
        return pd.DataFrame(results)
    
def comprehensive_validation_report(
    train_csv_path: str,
    predictions_path: str = None,
    targets_path: str = None
):
    """Generate comprehensive validation report"""
    print("="*60)
    print("CSIRO Biomass Prediction - Validation Report")
    print("="*60)
    
    # Data validation
    print("\n" + "="*60)
    print("1. DATA QUALITY VALIDATION")
    print("="*60)
    
    validator = DataValidator(train_csv_path)
    
    # Check hierarchical consistency
    print("\n--- Hierarchical Consistency ---")
    inconsistent = validator.check_hierarchical_consistency()
    
    # State distributions
    print("\n--- State Distributions ---")
    state_stats = validator.analyze_state_distributions()
    
    # Zero inflation
    print("\n--- Zero Inflation ---")
    zero_ratios = validator.analyze_zero_inflation()
    
    # Correlation analysis
    print("\n--- Correlation Analysis ---")
    corr_matrix = validator.check_correlation_with_metadata()
    
    # Outlier detection
    print("\n--- Outlier Detection ---")
    outliers = validator.detect_outliers(n_std=3.0)
    
    # Model predictions analysis (if provided)
    if predictions_path and targets_path:
        print("\n" + "="*60)
        print("2. PREDICTION ANALYSIS")
        print("="*60)
        
        predictions = np.load(predictions_path)
        targets = np.load(targets_path)
        weights = np.array([0.1, 0.1, 0.1, 0.2, 0.5])
        
        analyzer = PredictionAnalyzer(predictions, targets, weights)
        
        # Calculate metrics
        print("\n--- R² Scores ---")
        r2_scores = analyzer.calculate_r2_score()
        for metric, value in r2_scores.items():
            print(f"{metric}: {value:.4f}")
        
        print("\n--- MAE/RMSE ---")
        mae_rmse = analyzer.calculate_mae_rmse()
        for metric, value in mae_rmse.items():
            print(f"{metric}: {value:.3f}")
        
        # Residual analysis
        print("\n--- Residual Analysis ---")
        residuals = analyzer.analyze_residuals()
        print(residuals.to_string(index=False))
        
        # Generate plots
        print("\n--- Generating Plots ---")
        analyzer.plot_predictions_vs_actual()
    
    print("\n" + "="*60)
    print("Validation report complete!")
    print("="*60)


if __name__ == "__main__":
    # Example usage
    comprehensive_validation_report(
        train_csv_path="train.csv",
        # predictions_path="val_predictions.npy",  # Uncomment if available
        # targets_path="val_targets.npy"           # Uncomment if available
    )

In [4]:
from pathlib import Path
from PIL import Image

image_input = Path("D:/Learn/Kaggle Competition/CSIRO---Image2Biomass-Prediction/data/train/ID2131261930.jpg")

# Open the image
with Image.open(image_input) as img:
    # img.size returns (width, height)
    width, height = img.size
    print(f"Dimensions (PIL): {width}x{height}")
    print(f"Mode: {img.mode}") # e.g., 'RGB', 'L' (grayscale)

Dimensions (PIL): 2000x1000
Mode: RGB


In [None]:
import cv2
from pathlib import Path

image_input = Path("D:/Learn/Kaggle Competition/CSIRO---Image2Biomass-Prediction/data/train/ID4464212.jpg")

# Read the image (cv2 reads as numpy array)
# Note: str() is needed because cv2.imread doesn't always accept Path objects directly in older versions
img = cv2.imread(str(image_input))

if img is not None:
    # img.shape returns (height, width, channels)
    h, w, c = img.shape
    print(f"Shape (OpenCV): Height={h}, Width={w}, Channels={c}")
else:
    print("Could not load image. Check the path.")