# CSV File Analyzer

**Overview**

This script analyzes large CSV files and generates comprehensive metadata and insights that can be shared with AI platforms (like Claude) for further analysis.
Features

* Dataset Overview: Rows, columns, memory usage, duplicates
* Column Information: Data types, missing values, unique counts, statistical summaries
* Data Quality Metrics: Missing data analysis, completeness scores
* Statistical Summaries: Descriptive statistics for numeric columns
* Insights Detection: Identifies ID columns, constant columns, high cardinality, missing data patterns
* Multiple Output Formats: JSON (machine-readable), TXT (human-readable), Column list
    

In [4]:
"""
CSV File Analyzer - Generate comprehensive metadata and insights from large CSV files
"""

import pandas as pd
import numpy as np
from pathlib import Path
import json
from datetime import datetime
from collections import Counter

def analyze_csv(file_path, output_dir=None, sample_size=10):
    """
    Analyze a CSV file and generate comprehensive metadata and insights.
    
    Parameters:
    - file_path: Path to the CSV file
    - output_dir: Directory to save output files (defaults to same dir as CSV)
    - sample_size: Number of sample rows to include in the report
    """
    
    file_path = Path(file_path)
    
    if not file_path.exists():
        print(f"‚ùå Error: File not found at {file_path}")
        return
    
    if output_dir is None:
        output_dir = file_path.parent
    else:
        output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)
    
    print("=" * 80)
    print(f"CSV ANALYZER - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("=" * 80)
    print(f"üìÅ File: {file_path.name}")
    print(f"üìä Size: {file_path.stat().st_size / (1024*1024):.2f} MB")
    print()
    
    # Initialize metadata dictionary
    metadata = {
        "file_info": {
            "filename": file_path.name,
            "file_path": str(file_path),
            "file_size_mb": round(file_path.stat().st_size / (1024*1024), 2),
            "analysis_date": datetime.now().isoformat()
        }
    }
    
    # Read CSV
    print("üìñ Reading CSV file...")
    try:
        df = pd.read_csv(file_path, low_memory=False)
        print(f"‚úÖ Successfully loaded {len(df):,} rows √ó {len(df.columns)} columns\n")
    except Exception as e:
        print(f"‚ùå Error reading CSV: {e}")
        return
    
    # Basic dataset information
    print("=" * 80)
    print("1. DATASET OVERVIEW")
    print("=" * 80)
    
    metadata["dataset_overview"] = {
        "total_rows": int(len(df)),
        "total_columns": int(len(df.columns)),
        "memory_usage_mb": round(df.memory_usage(deep=True).sum() / (1024*1024), 2),
        "total_cells": int(len(df) * len(df.columns)),
        "duplicate_rows": int(df.duplicated().sum())
    }
    
    for key, value in metadata["dataset_overview"].items():
        print(f"  ‚Ä¢ {key.replace('_', ' ').title()}: {value:,}")
    
    # Column information
    print("\n" + "=" * 80)
    print("2. COLUMN INFORMATION")
    print("=" * 80)
    
    columns_info = []
    
    for col in df.columns:
        col_info = {
            "name": col,
            "dtype": str(df[col].dtype),
            "non_null_count": int(df[col].count()),
            "null_count": int(df[col].isna().sum()),
            "null_percentage": round(df[col].isna().sum() / len(df) * 100, 2),
            "unique_values": int(df[col].nunique()),
            "unique_percentage": round(df[col].nunique() / len(df) * 100, 2)
        }
        
        # Add data type specific information
        if pd.api.types.is_numeric_dtype(df[col]):
            col_info["min"] = float(df[col].min()) if not df[col].isna().all() else None
            col_info["max"] = float(df[col].max()) if not df[col].isna().all() else None
            col_info["mean"] = float(df[col].mean()) if not df[col].isna().all() else None
            col_info["median"] = float(df[col].median()) if not df[col].isna().all() else None
            col_info["std"] = float(df[col].std()) if not df[col].isna().all() else None
        else:
            # For non-numeric, get top values
            top_values = df[col].value_counts().head(5).to_dict()
            col_info["top_5_values"] = {str(k): int(v) for k, v in top_values.items()}
        
        columns_info.append(col_info)
        
        # Print summary
        print(f"\n  üìä {col}")
        print(f"     Type: {col_info['dtype']}")
        print(f"     Non-null: {col_info['non_null_count']:,} ({100-col_info['null_percentage']:.1f}%)")
        print(f"     Unique: {col_info['unique_values']:,} ({col_info['unique_percentage']:.1f}%)")
        
        if 'mean' in col_info:
            if col_info['min'] is not None:
                print(f"     Range: [{col_info['min']:.2f}, {col_info['max']:.2f}]")
                print(f"     Mean: {col_info['mean']:.2f}, Median: {col_info['median']:.2f}")
            else:
                print(f"     Range: [No data - all null values]")
    
    metadata["columns"] = columns_info
    
    # Data quality summary
    print("\n" + "=" * 80)
    print("3. DATA QUALITY METRICS")
    print("=" * 80)
    
    total_cells = len(df) * len(df.columns)
    total_missing = df.isna().sum().sum()
    
    metadata["data_quality"] = {
        "total_missing_values": int(total_missing),
        "missing_percentage": round(total_missing / total_cells * 100, 2),
        "columns_with_missing": int((df.isna().sum() > 0).sum()),
        "complete_rows": int(df.dropna().shape[0]),
        "complete_rows_percentage": round(df.dropna().shape[0] / len(df) * 100, 2)
    }
    
    print(f"  ‚Ä¢ Total Missing Values: {total_missing:,} ({metadata['data_quality']['missing_percentage']}%)")
    print(f"  ‚Ä¢ Columns with Missing Data: {metadata['data_quality']['columns_with_missing']}")
    print(f"  ‚Ä¢ Complete Rows (no missing): {metadata['data_quality']['complete_rows']:,} ({metadata['data_quality']['complete_rows_percentage']:.1f}%)")
    print(f"  ‚Ä¢ Duplicate Rows: {metadata['dataset_overview']['duplicate_rows']:,}")
    
    # Column types distribution
    print("\n" + "=" * 80)
    print("4. DATA TYPES DISTRIBUTION")
    print("=" * 80)
    
    dtype_counts = df.dtypes.value_counts().to_dict()
    metadata["data_types"] = {str(k): int(v) for k, v in dtype_counts.items()}
    
    for dtype, count in dtype_counts.items():
        print(f"  ‚Ä¢ {dtype}: {count} columns")
    
    # Numeric columns statistics
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 0:
        print("\n" + "=" * 80)
        print("5. NUMERIC COLUMNS - STATISTICAL SUMMARY")
        print("=" * 80)
        
        numeric_summary = df[numeric_cols].describe().to_dict()
        # Convert to regular python types, handling NaN values
        metadata["numeric_summary"] = {}
        for col, stats in numeric_summary.items():
            metadata["numeric_summary"][col] = {}
            for k, v in stats.items():
                if pd.isna(v):
                    metadata["numeric_summary"][col][k] = None
                else:
                    metadata["numeric_summary"][col][k] = float(v)
        
        for col in numeric_cols[:5]:  # Show first 5 numeric columns
            print(f"\n  üìà {col}")
            stats = df[col].describe()
            for stat_name, stat_value in stats.items():
                if pd.notna(stat_value):
                    print(f"     {stat_name}: {stat_value:.2f}")
                else:
                    print(f"     {stat_name}: N/A")
    
    # Sample data
    print("\n" + "=" * 80)
    print(f"6. SAMPLE DATA (First {sample_size} rows)")
    print("=" * 80)
    
    sample_data = df.head(sample_size).to_dict('records')
    metadata["sample_data"] = sample_data
    
    print(df.head(sample_size).to_string())
    
    # Potential insights
    print("\n" + "=" * 80)
    print("7. POTENTIAL INSIGHTS")
    print("=" * 80)
    
    insights = []
    
    # Check for ID columns
    potential_ids = [col for col in df.columns if df[col].nunique() == len(df)]
    if potential_ids:
        insight = f"Potential ID columns (100% unique): {', '.join(potential_ids)}"
        insights.append(insight)
        print(f"  üîç {insight}")
    
    # Check for constant columns
    constant_cols = [col for col in df.columns if df[col].nunique() == 1]
    if constant_cols:
        insight = f"Constant columns (only 1 unique value): {', '.join(constant_cols)}"
        insights.append(insight)
        print(f"  üîç {insight}")
    
    # Check for high cardinality columns
    high_cardinality = [col for col in df.columns 
                       if df[col].nunique() > len(df) * 0.9 and col not in potential_ids]
    if high_cardinality:
        insight = f"High cardinality columns (>90% unique): {', '.join(high_cardinality)}"
        insights.append(insight)
        print(f"  üîç {insight}")
    
    # Check for columns with high missing rate
    high_missing = [col for col in df.columns if df[col].isna().sum() / len(df) > 0.5]
    if high_missing:
        insight = f"Columns with >50% missing data: {', '.join(high_missing)}"
        insights.append(insight)
        print(f"  üîç {insight}")
    
    # Check for numeric columns with zeros
    for col in numeric_cols:
        zero_pct = (df[col] == 0).sum() / len(df) * 100
        if zero_pct > 50:
            insight = f"{col} has {zero_pct:.1f}% zero values"
            insights.append(insight)
            print(f"  üîç {insight}")
    
    metadata["insights"] = insights
    
    # Save outputs
    print("\n" + "=" * 80)
    print("8. SAVING OUTPUTS")
    print("=" * 80)
    
    # Save JSON metadata
    json_output = output_dir / f"{file_path.stem}_metadata.json"
    with open(json_output, 'w', encoding='utf-8') as f:
        json.dump(metadata, f, indent=2, ensure_ascii=False)
    print(f"  ‚úÖ JSON metadata saved: {json_output}")
    
    # Save text report
    txt_output = output_dir / f"{file_path.stem}_report.txt"
    with open(txt_output, 'w', encoding='utf-8') as f:
        f.write("=" * 80 + "\n")
        f.write(f"CSV ANALYSIS REPORT - {file_path.name}\n")
        f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write("=" * 80 + "\n\n")
        
        f.write("DATASET OVERVIEW\n")
        f.write("-" * 80 + "\n")
        for key, value in metadata["dataset_overview"].items():
            f.write(f"{key.replace('_', ' ').title()}: {value:,}\n")
        
        f.write("\n\nCOLUMN SUMMARY\n")
        f.write("-" * 80 + "\n")
        for col_info in columns_info:
            f.write(f"\n{col_info['name']}\n")
            f.write(f"  Type: {col_info['dtype']}\n")
            f.write(f"  Non-null: {col_info['non_null_count']:,} ({100-col_info['null_percentage']:.1f}%)\n")
            f.write(f"  Unique: {col_info['unique_values']:,} ({col_info['unique_percentage']:.1f}%)\n")
        
        f.write("\n\nINSIGHTS\n")
        f.write("-" * 80 + "\n")
        for insight in insights:
            f.write(f"‚Ä¢ {insight}\n")
    
    print(f"  ‚úÖ Text report saved: {txt_output}")
    
    # Save column list
    col_list_output = output_dir / f"{file_path.stem}_columns.txt"
    with open(col_list_output, 'w', encoding='utf-8') as f:
        f.write("COLUMN LIST\n")
        f.write("=" * 80 + "\n\n")
        for i, col in enumerate(df.columns, 1):
            f.write(f"{i}. {col} ({df[col].dtype})\n")
    print(f"  ‚úÖ Column list saved: {col_list_output}")
    
    print("\n" + "=" * 80)
    print("‚ú® ANALYSIS COMPLETE!")
    print("=" * 80)
    print(f"\nOutput files saved to: {output_dir}")
    print("\nYou can now share the generated files with Claude AI for further analysis!")
    
    return metadata


if __name__ == "__main__":
    # Configuration
    PROJECT_ROOT = Path("/mnt/c/Users/benny/OneDrive/Documents/Github/ago-lobitocorridor-analysis")
    CSV_FILE = PROJECT_ROOT / "outputs/tables/huambo_municipality_profiles.csv"
    OUTPUT_DIR = PROJECT_ROOT / "outputs/analysis_summary"  # Will save summary files here
    
    # Run analysis
    analyze_csv(
        file_path=CSV_FILE,
        output_dir=OUTPUT_DIR,
        sample_size=10  # Number of sample rows to include
    )

CSV ANALYZER - 2025-11-01 10:14:11
üìÅ File: huambo_municipality_profiles.csv
üìä Size: 142.87 MB

üìñ Reading CSV file...
‚úÖ Successfully loaded 904,604 rows √ó 104 columns

1. DATASET OVERVIEW
  ‚Ä¢ Total Rows: 904,604
  ‚Ä¢ Total Columns: 104
  ‚Ä¢ Memory Usage Mb: 845.11
  ‚Ä¢ Total Cells: 94,078,816
  ‚Ä¢ Duplicate Rows: 904,561

2. COLUMN INFORMATION

  üìä ADM2CD_c
     Type: object
     Non-null: 904,604 (100.0%)
     Unique: 39 (0.0%)

  üìä NAM_1
     Type: object
     Non-null: 904,604 (100.0%)
     Unique: 5 (0.0%)

  üìä NAM_2
     Type: object
     Non-null: 904,604 (100.0%)
     Unique: 39 (0.0%)

  üìä poverty__rivers
     Type: float64
     Non-null: 0 (0.0%)
     Unique: 0 (0.0%)
     Range: [No data - all null values]

  üìä poverty__streams
     Type: float64
     Non-null: 0 (0.0%)
     Unique: 0 (0.0%)
     Range: [No data - all null values]

  üìä poverty__lakes
     Type: float64
     Non-null: 0 (0.0%)
     Unique: 0 (0.0%)
     Range: [No data - all 