# Survey Data Processor

This notebook processes survey data from SQL query files and generates combined outputs for analysis.

## Overview
- Reads SQL query files for each survey
- Executes queries to load survey data
- Applies custom transformations
- Outputs combined files with NPS and driver scores
- Generates comprehensive dataset with all survey columns

In [None]:
import pandas as pd
import numpy as np
import sqlite3
import os
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Add parent directory to path for imports
import sys
sys.path.append('../')

from shared.metrics.common_metrics import *
from shared.utils.data_validator import *

## Configuration

### Survey Configuration
Add your survey names here. Each survey should have a corresponding `.sql` file in the `data/raw/` directory.

In [ ]:
# Survey Configuration - Update this list with your survey names
SURVEYS = [
    "PRISM",
    "Customer_Satisfaction_Q1", 
    "Employee_Engagement_2024",
    "Product_Feedback_Survey",
    "Brand_Perception_Study"
]

# Allowed Column Types
ALLOWED_COLUMN_TYPES = [
    "STANDARD",      # Required fields for identification and timestamps
    "LTR",          # Likelihood to Recommend (NPS) related columns
    "DRIVERS",      # Driver/satisfaction scores and metrics
    "METADATA"      # Demographics, comments, and supplementary data
]

# All Columns Dictionary - Column name mapped to column type
ALL_COLUMNS = {
    "ResponseID": "STANDARD",
    "Timestamp": "STANDARD", 
    "NPS_Score": "LTR",
    "NPS_Comment": "LTR",
    "Satisfaction_Rating": "DRIVERS",
    "Satisfaction_Comment": "DRIVERS",
    "Custom_Metric_1": "DRIVERS",
    "Custom_Metric_2": "DRIVERS",
    "Custom_Metric_3": "DRIVERS",
    "Custom_Metric_4": "DRIVERS",
    "Custom_Metric_5": "DRIVERS",
    "Demographics_Age": "METADATA",
    "Demographics_Role": "METADATA",
    "Demographics_Location": "METADATA",
    "Free_Text_Feedback": "METADATA"
}

# Validate that all column types are allowed
invalid_types = set(ALL_COLUMNS.values()) - set(ALLOWED_COLUMN_TYPES)
if invalid_types:
    raise ValueError(f"Invalid column types found: {invalid_types}. Must be one of: {ALLOWED_COLUMN_TYPES}")

# File paths
SQL_DIR = "../data/raw/"
OUTPUT_DIR = "../data/processed/"
TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")

# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"Processing {len(SURVEYS)} surveys:")
for i, survey in enumerate(SURVEYS, 1):
    print(f"  {i}. {survey}")

print(f"\nColumn Types ({len(ALLOWED_COLUMN_TYPES)}):")
for i, col_type in enumerate(ALLOWED_COLUMN_TYPES, 1):
    cols_of_type = [col for col, ctype in ALL_COLUMNS.items() if ctype == col_type]
    print(f"  {i}. {col_type}: {len(cols_of_type)} columns")

print(f"\nAll Columns ({len(ALL_COLUMNS)}):")
for col_type in ALLOWED_COLUMN_TYPES:
    cols_of_type = [col for col, ctype in ALL_COLUMNS.items() if ctype == col_type]
    print(f"  {col_type}:")
    for col in cols_of_type:
        print(f"    - {col}")
    print()

## Data Loading Functions

In [ ]:
def validate_columns(df, survey_name, all_columns_dict):
    """
    Validate that all columns in the DataFrame are in the allowed columns dictionary.
    
    Args:
        df (pd.DataFrame): Survey data
        survey_name (str): Name of the survey
        all_columns_dict (dict): Dictionary of allowed column names and their types
    
    Returns:
        tuple: (is_valid, invalid_columns)
    """
    # Exclude automatically added columns from validation
    auto_added_columns = {'Survey_Name', 'Processed_Date'}
    df_columns = set(df.columns) - auto_added_columns
    allowed_columns = set(all_columns_dict.keys())
    
    invalid_columns = df_columns - allowed_columns
    
    if invalid_columns:
        print(f"  ✗ VALIDATION FAILED for {survey_name}")
        print(f"    Invalid columns: {sorted(invalid_columns)}")
        print(f"    Valid columns must be from: {sorted(allowed_columns)}")
        return False, invalid_columns
    else:
        print(f"  ✓ Column validation passed for {survey_name}")
        return True, set()

def load_sql_query(survey_name):
    """
    Load SQL query from file for a given survey.
    
    Args:
        survey_name (str): Name of the survey
    
    Returns:
        str: SQL query string
    """
    sql_file_path = os.path.join(SQL_DIR, f"{survey_name}.sql")
    
    if not os.path.exists(sql_file_path):
        raise FileNotFoundError(f"SQL file not found: {sql_file_path}")
    
    with open(sql_file_path, 'r', encoding='utf-8') as file:
        query = file.read().strip()
    
    return query

def execute_query_and_load_data(survey_name, connection=None):
    """
    Execute SQL query and load data for a survey.
    
    Args:
        survey_name (str): Name of the survey
        connection: Database connection (if None, will create SQLite connection)
    
    Returns:
        pd.DataFrame: Survey data
    """
    query = load_sql_query(survey_name)
    
    # If no connection provided, create a simple SQLite connection
    # Note: Update this section based on your actual database configuration
    if connection is None:
        # For demonstration - replace with your actual database connection
        conn = sqlite3.connect(':memory:')
        print(f"Warning: Using in-memory SQLite for {survey_name}. Update connection for production.")
    else:
        conn = connection
    
    try:
        df = pd.read_sql_query(query, conn)
        
        # Validate columns before processing
        is_valid, invalid_cols = validate_columns(df, survey_name, ALL_COLUMNS)
        if not is_valid:
            print(f"  ✗ Skipping {survey_name} due to column validation failure")
            return None
        
        df['Survey_Name'] = survey_name  # Add survey identifier
        print(f"✓ Loaded {len(df)} records from {survey_name}")
        return df
    except Exception as e:
        print(f"✗ Error loading data for {survey_name}: {str(e)}")
        return None
    finally:
        if connection is None and 'conn' in locals():
            conn.close()

## Data Processing Pipeline

In [None]:
# Initialize storage for processed data
all_survey_data = []
nps_driver_data = []
processing_summary = []

print("Starting data processing pipeline...\n")

In [ ]:
# Process each survey
for survey_name in SURVEYS:
    print(f"Processing survey: {survey_name}")
    print("-" * 50)
    
    try:
        # Load data from SQL query
        df = execute_query_and_load_data(survey_name)
        
        if df is not None and len(df) > 0:
            # Store original data structure info
            original_shape = df.shape
            original_columns = list(df.columns)
            
            # === CUSTOM TRANSFORMATIONS SECTION ===
            # TODO: Add your custom transformations here
            # Examples:
            # - Data cleaning and validation
            # - Column renaming/standardization
            # - Derived field calculations
            # - Data type conversions
            
            print(f"  Original shape: {original_shape}")
            print(f"  Columns: {len(original_columns)}")
            
            # Add processing timestamp
            df['Processed_Date'] = datetime.now()
            
            # Store full dataset
            all_survey_data.append(df.copy())
            
            # Extract LTR and driver scores if available
            ltr_driver_subset = extract_ltr_and_drivers(df, survey_name, ALL_COLUMNS)
            if ltr_driver_subset is not None:
                nps_driver_data.append(ltr_driver_subset)
            
            # Record processing summary
            processing_summary.append({
                'Survey_Name': survey_name,
                'Records_Processed': len(df),
                'Columns_Count': len(df.columns),
                'Has_LTR_Data': any(ALL_COLUMNS.get(col) == "LTR" for col in df.columns),
                'Has_Driver_Data': any(ALL_COLUMNS.get(col) == "DRIVERS" for col in df.columns),
                'Column_Validation': 'Passed',
                'Processing_Status': 'Success',
                'Processed_Time': datetime.now()
            })
            
            print(f"  ✓ Successfully processed {len(df)} records")
            
        else:
            status = 'Column Validation Failed' if df is None else 'No Data'
            print(f"  ✗ {status} for {survey_name}")
            processing_summary.append({
                'Survey_Name': survey_name,
                'Records_Processed': 0,
                'Columns_Count': 0,
                'Has_LTR_Data': False,
                'Has_Driver_Data': False,
                'Column_Validation': 'Failed' if df is None else 'N/A',
                'Processing_Status': status,
                'Processed_Time': datetime.now()
            })
            
    except Exception as e:
        print(f"  ✗ Error processing {survey_name}: {str(e)}")
        processing_summary.append({
            'Survey_Name': survey_name,
            'Records_Processed': 0,
            'Columns_Count': 0,
            'Has_LTR_Data': False,
            'Has_Driver_Data': False,
            'Column_Validation': 'Error',
            'Processing_Status': f'Error: {str(e)}',
            'Processed_Time': datetime.now()
        })
    
    print("\n")

print(f"Data processing complete. Processed {len(all_survey_data)} surveys successfully.")

In [ ]:
def extract_ltr_and_drivers(df, survey_name, all_columns_dict):
    """
    Extract LTR (Likelihood to Recommend) and driver scores from survey data.
    
    Args:
        df (pd.DataFrame): Survey data
        survey_name (str): Name of the survey
        all_columns_dict (dict): Dictionary mapping column names to types
    
    Returns:
        pd.DataFrame or None: Subset with LTR and driver data
    """
    # Get LTR and DRIVERS columns from the column type mapping
    ltr_columns = [col for col, col_type in all_columns_dict.items() 
                   if col_type == "LTR" and col in df.columns]
    driver_columns = [col for col, col_type in all_columns_dict.items() 
                      if col_type == "DRIVERS" and col in df.columns]
    
    # Standard columns to include
    standard_columns = [col for col, col_type in all_columns_dict.items() 
                       if col_type == "STANDARD" and col in df.columns]
    
    # Auto-added columns to include
    auto_added_columns = ['Survey_Name', 'Processed_Date']
    available_auto_added = [col for col in auto_added_columns if col in df.columns]
    
    # Combine all relevant columns
    relevant_columns = list(set(standard_columns + ltr_columns + driver_columns + available_auto_added))
    
    if len(ltr_columns) > 0 or len(driver_columns) > 0:
        subset_df = df[relevant_columns].copy()
        print(f"    Extracted LTR/Driver data: {len(ltr_columns)} LTR columns, {len(driver_columns)} driver columns")
        print(f"    LTR columns: {ltr_columns}")
        print(f"    Driver columns: {driver_columns}")
        return subset_df
    else:
        print(f"    No LTR or driver columns found in {survey_name}")
        return None

## Output Generation

In [ ]:
print("Generating output files...\n")

# Output 1: Combined LTR and Driver Scores
if nps_driver_data:
    combined_ltr_drivers = pd.concat(nps_driver_data, ignore_index=True, sort=False)
    ltr_output_file = os.path.join(OUTPUT_DIR, f"Combined_LTR_Drivers_{TIMESTAMP}.csv")
    combined_ltr_drivers.to_csv(ltr_output_file, index=False)
    print(f"✓ LTR & Driver Scores: {ltr_output_file}")
    print(f"  - Total records: {len(combined_ltr_drivers)}")
    print(f"  - Surveys included: {combined_ltr_drivers['Survey_Name'].nunique()}")
    print(f"  - Columns: {len(combined_ltr_drivers.columns)}")
else:
    print("✗ No LTR/Driver data found across surveys")

print()

# Output 2: All Survey Data Combined
if all_survey_data:
    combined_all_data = pd.concat(all_survey_data, ignore_index=True, sort=False)
    all_data_output_file = os.path.join(OUTPUT_DIR, f"Combined_All_Surveys_{TIMESTAMP}.csv")
    combined_all_data.to_csv(all_data_output_file, index=False)
    print(f"✓ All Survey Data: {all_data_output_file}")
    print(f"  - Total records: {len(combined_all_data)}")
    print(f"  - Surveys included: {combined_all_data['Survey_Name'].nunique()}")
    print(f"  - Total columns: {len(combined_all_data.columns)}")
else:
    print("✗ No survey data processed successfully")

print()

# Output 3: Processing Summary
summary_df = pd.DataFrame(processing_summary)
summary_output_file = os.path.join(OUTPUT_DIR, f"Processing_Summary_{TIMESTAMP}.csv")
summary_df.to_csv(summary_output_file, index=False)
print(f"✓ Processing Summary: {summary_output_file}")
print(f"  - Surveys processed: {len(summary_df)}")
print(f"  - Successful: {len(summary_df[summary_df['Processing_Status'] == 'Success'])}")
print(f"  - Failed: {len(summary_df[summary_df['Processing_Status'] != 'Success'])}")

## Summary and Validation

In [None]:
# Display processing summary
print("\n" + "="*60)
print("PROCESSING SUMMARY")
print("="*60)

summary_df = pd.DataFrame(processing_summary)
print(summary_df[['Survey_Name', 'Records_Processed', 'Processing_Status']].to_string(index=False))

print(f"\nTotal records processed: {summary_df['Records_Processed'].sum()}")
print(f"Average records per survey: {summary_df['Records_Processed'].mean():.1f}")
print(f"Success rate: {len(summary_df[summary_df['Processing_Status'] == 'Success']) / len(summary_df) * 100:.1f}%")

In [None]:
# Data quality checks
if all_survey_data:
    print("\n" + "="*60)
    print("DATA QUALITY CHECKS")
    print("="*60)
    
    combined_data = pd.concat(all_survey_data, ignore_index=True, sort=False)
    
    # Check for missing values
    missing_summary = combined_data.isnull().sum()
    print(f"Columns with missing values: {len(missing_summary[missing_summary > 0])}")
    
    # Check for duplicate ResponseIDs within surveys
    duplicate_check = combined_data.groupby('Survey_Name')['ResponseID'].apply(lambda x: x.duplicated().sum() if 'ResponseID' in combined_data.columns else 0)
    print(f"Surveys with duplicate ResponseIDs: {len(duplicate_check[duplicate_check > 0])}")
    
    # Data type summary
    print(f"Total unique columns across all surveys: {len(combined_data.columns)}")
    print(f"Numeric columns: {len(combined_data.select_dtypes(include=[np.number]).columns)}")
    print(f"Text columns: {len(combined_data.select_dtypes(include=['object']).columns)}")

## Next Steps

### Custom Transformations
Update the "CUSTOM TRANSFORMATIONS SECTION" in the processing loop above with:
- Data cleaning rules specific to your surveys
- Column standardization and renaming
- Calculated fields and derived metrics
- Data validation and quality checks

### Database Configuration
Update the `execute_query_and_load_data` function with your actual database connection details:
- Database type (PostgreSQL, SQL Server, MySQL, etc.)
- Connection parameters
- Authentication details

### Output Customization
Modify the output generation section to:
- Add additional output formats (Excel, JSON, etc.)
- Create survey-specific outputs
- Generate summary reports and visualizations

### File Organization
Ensure your SQL files are organized as:
```
data/raw/
├── PRISM.sql
├── Customer_Satisfaction_Q1.sql
├── Employee_Engagement_2024.sql
└── ...
```