# Survey Data Harmonization Pipeline



A comprehensive notebook for harmonizing multiple survey datasets
while maintaining full transparency and reproducibility.

SETUP INSTRUCTIONS:
1. Run the Google Drive mount cell (for Colab users)
2. Configure your surveys in SURVEYS_CONFIG
3. Define your mapping rules in MAPPINGS
4. Run the processing pipeline
5. Export your analysis-ready dataset

For detailed guidance, see the configuration examples below.

### Google Colab Users

Run the following cells to:

1. Install dependencies
2. Mount Google drive and set the working directory.

In [None]:
!pip -q install pandas numpy openpyxl matplotlib

In [None]:
# Mount Google Drive in Colab
from google.colab import drive
import os

# Set your working directory - CUSTOMIZE THIS PATH
project_folder = "/content/drive/MyDrive/Colab Notebooks"

# Optional: Mount Google Drive (for Colab users)
try:
    from google.colab import drive
    drive.mount('/content/drive')
    os.chdir(project_folder)
    print("Google Drive mounted successfully")
except ImportError:
    print("ℹ️  Running locally - Google Drive mount not needed")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Google Drive mounted successfully


### Import Libraries

In [None]:
import pandas as pd
import numpy as np
import os
from pathlib import Path
import glob
from typing import Dict, List, Union, Optional, Tuple, Any
import warnings
warnings.filterwarnings('ignore')

### Configuration Section - Customize this for your data.

Each survey entry supports:

1.   **path**: File path. (supports wildcards like *.csv for multiple files)
2.  **instrument**: Name of the survey instrument.
3. **id_col**: Column name containing participant IDs.
4. **item_prefixes**: List of prefixes for item columns. (optional)
5. **sheet_name**: For Excel files, specify sheet name. (optional)
6. **skip_rows**: N6.umber of rows to skip at top of file. (optional)
7. **reverse_items**: List of items to reverse-score. (optional)
8. **missing_values**: Custom missing value codes (optional)

#### FLEXIBLE PATH OPTIONS:
- Single file: "data/survey1.csv"
- Multiple files: "data/wave*.csv" (uses glob pattern)
- Different folders: ["folder1/survey.csv", "folder2/survey.xlsx"]

In [None]:
SURVEYS_CONFIG = [
    {
        'path': 'data/baselinefile.csv',  # Single baseline file
        'instrument': 'SCARED',
        'id_col': 'participant_id',
        'item_prefixes': ['SCARED_'],
        'reverse_items': [],  # Add item numbers if any need reverse scoring
        'missing_values': [-999, 'N/A', '']
    },
    {
        'path': 'data/multiple_files*.xlsx',  # Multiple waves using wildcard
        'instrument': 'GAD7',
        'id_col': 'ID',
        'sheet_name': 'Data',  # Specific sheet in Excel
        'item_prefixes': ['GAD7_', 'gad_'],
        'skip_rows': 1,  # Skip header row if needed
        'missing_values': [-999, 99]
    },
    {
        'path': ['data/specificOne.csv', 'data/specificTwo.csv'],  # Explicit file list
        'instrument': 'PHQ9',
        'id_col': 'subject_id',
        'item_prefixes': ['PHQ9_', 'phq_'],
        'reverse_items': [],
        'missing_values': [-999]
    }
]

#### Mappings

Define how to recode survey responses to binary (0/1)

**MAPPING INSTRUCTIONS:**

Define how your survey responses should be converted to binary (0/1) format.

**STRUCTURE:**
```
{instrument_name: {item: {original_value: new_value}}}
```
- Use 'DEFAULT' for instrument-wide rules
- Add specific items to override defaults
- Values can be strings, numbers, or mixed

**COMMON MAPPING PATTERNS:**

PATTERN 1 - Likert scales (0-3 scale):

```
'DEFAULT': {0: 0, 1: 0, 2: 1, 3: 1}  # Split at midpoint
```

PATTERN 2 - Yes/No responses:

```
'DEFAULT': {'Yes': 1, 'No': 0, 'Y': 1, 'N': 0}
```

PATTERN 3 - Frequency scales:

```
'DEFAULT': {
    'Never': 0, 'Rarely': 0,
    'Sometimes': 1, 'Often': 1, 'Always': 1
}
```

PATTERN 4 - Conservative threshold (only highest = 1):
```
'DEFAULT': {0: 0, 1: 0, 2: 0, 3: 1}
```

***SETUP GUIDE FOR MAPPINGS:***
1. Replace 'Survey_A', 'Survey_B' etc. with YOUR instrument names (must match SURVEYS_CONFIG)
2. Update the DEFAULT mappings to match YOUR response scales
3. Add item-specific overrides where needed

Diagnostic function: print_unmapped_values() allows you to check for what values need mapping.

In [None]:
MAPPINGS = {
    # Survey A: Generic Likert scale (0-3)
    'Survey_A': {
        'DEFAULT': {
            0: 0, '0': 0,  # Not at all → 0
            1: 0, '1': 0,  # A little → 0
            2: 1, '2': 1,  # Moderately → 1
            3: 1, '3': 1   # A lot → 1
        },
        # Override for specific items if needed:
        # 'Q5': {0: 0, 1: 0, 2: 0, 3: 1}  # More conservative threshold
    },

    # Survey B: Frequency scale
    'Survey_B': {
        'DEFAULT': {
            0: 0, '0': 0, 'Never': 0,
            1: 0, '1': 0, 'Rarely': 0,
            2: 1, '2': 1, 'Sometimes': 1,
            3: 1, '3': 1, 'Often': 1,
            4: 1, '4': 1, 'Always': 1
        }
    },

    # Survey C: Mixed response types
    'Survey_C': {
        'DEFAULT': {
            0: 0, '0': 0, 'No': 0, 'False': 0,
            1: 1, '1': 1, 'Yes': 1, 'True': 1
        },
        # Specific overrides for certain items:
        'scale_10': {  # Different threshold for this item
            0: 0, 1: 0, 2: 0, 3: 1, 4: 1, 5: 1
        }
    }
}

# SETUP GUIDE FOR MAPPINGS:
# 1. Replace 'Survey_A', 'Survey_B' etc. with YOUR instrument names (must match SURVEYS_CONFIG)
# 2. Update the DEFAULT mappings to match YOUR response scales
# 3. Add item-specific overrides where needed
# 4. Use the diagnostic function print_unmapped_values() to see what values need mapping

# Global fallback for unmapped values (applied when no specific mapping exists)
GLOBAL_DEFAULT_MAPPING = {
    'Yes': 1, 'No': 0,
    'True': 1, 'False': 0,
    1: 1, 0: 0
}

Run the following function to validate the configuration you have set up.

In [None]:
def validate_config():
    """Validate the survey configuration and provide helpful error messages."""
    print(" Validating survey configuration...")

    errors = []
    warnings_list = []

    for i, survey in enumerate(SURVEYS_CONFIG):
        survey_name = survey.get('instrument', f'Survey {i+1}')

        # Check required fields
        if 'path' not in survey:
            errors.append(f" {survey_name}: Missing 'path' field")
        if 'instrument' not in survey:
            errors.append(f" {survey_name}: Missing 'instrument' field")
        if 'id_col' not in survey:
            errors.append(f" {survey_name}: Missing 'id_col' field")

        # Check if files exist
        if 'path' in survey:
            path = survey['path']
            if isinstance(path, list):
                for p in path:
                    if not any(Path(p).parent.glob(Path(p).name)):
                        warnings_list.append(f"⚠️ {survey_name}: File not found: {p}")
            elif isinstance(path, str):
                if '*' in path:
                    matches = glob.glob(path)
                    if not matches:
                        warnings_list.append(f"⚠️ {survey_name}: No files match pattern: {path}")
                    else:
                        print(f"✅ {survey_name}: Found {len(matches)} files matching: {path}")
                else:
                    if not Path(path).exists():
                        warnings_list.append(f"⚠️ {survey_name}: File not found: {path}")

    if errors:
        print("\n CONFIGURATION ERRORS (must fix):")
        for error in errors:
            print(error)
        return False

    if warnings_list:
        print("\n WARNINGS (check these):")
        for warning in warnings_list:
            print(warning)

    print("\n Configuration validation complete!")
    return True

## Utility Functions

In [None]:
def expand_file_paths(path: Union[str, List[str]]) -> List[str]:
    """Expand file paths, handling wildcards and lists."""
    if isinstance(path, list):
        all_files = []
        for p in path:
            if '*' in p:
                all_files.extend(glob.glob(p))
            else:
                all_files.append(p)
        return all_files
    elif '*' in path:
        return glob.glob(path)
    else:
        return [path]

def read_table(file_path: str, **kwargs) -> pd.DataFrame:
    """Universal file reader supporting CSV and Excel with robust error handling."""
    path = Path(file_path)

    try:
        if path.suffix.lower() in ['.xlsx', '.xls']:
            df = pd.read_excel(file_path, **kwargs)
        elif path.suffix.lower() == '.csv':
            # Try different encodings and separators
            encodings = ['utf-8', 'latin-1', 'cp1252']
            separators = [',', ';', '\t']

            df = None
            for encoding in encodings:
                for sep in separators:
                    try:
                        df = pd.read_csv(file_path, encoding=encoding, sep=sep, **kwargs)
                        if df.shape[1] > 1:  # Successfully parsed multiple columns
                            break
                    except:
                        continue
                if df is not None and df.shape[1] > 1:
                    break

            if df is None:
                raise ValueError(f"Could not parse CSV file: {file_path}")
        else:
            raise ValueError(f"Unsupported file format: {path.suffix}")

        print(f" Loaded {file_path}: {df.shape[0]} rows × {df.shape[1]} columns")
        return df

    except Exception as e:
        print(f" Error loading {file_path}: {str(e)}")
        raise

def detect_item_columns(df: pd.DataFrame,
                       prefixes: Optional[List[str]] = None,
                       force_auto_detect: bool = False) -> List[str]:
    """
    Intelligently detect survey item columns.

    Args:
        df: DataFrame to analyze
        prefixes: List of column prefixes to look for
        force_auto_detect: If True, ignore prefixes and auto-detect based on data patterns
    """
    if prefixes and not force_auto_detect:
        # Look for columns matching any prefix
        item_cols = []
        for col in df.columns:
            if any(str(col).startswith(prefix) for prefix in prefixes):
                item_cols.append(col)

        if item_cols:
            print(f" Found {len(item_cols)} columns with specified prefixes")
            return item_cols

    # Auto-detection fallback
    print(" Auto-detecting item columns...")
    item_cols = []

    for col in df.columns:
        # Skip obvious non-item columns
        if str(col).lower() in ['id', 'participant_id', 'subject_id', 'age', 'gender', 'date']:
            continue

        # Look for columns with limited unique values (likely Likert scales)
        unique_vals = df[col].dropna().nunique()
        if 2 <= unique_vals <= 10:  # Typical range for survey items
            item_cols.append(col)

    print(f" Auto-detected {len(item_cols)} potential item columns")
    if len(item_cols) > 50:
        print(" Warning: Many columns detected - you may want to specify prefixes")

    return item_cols

def handle_missing_values(df: pd.DataFrame, missing_values: List = None) -> pd.DataFrame:
    """Replace custom missing value codes with NaN."""
    if missing_values:
        df = df.replace(missing_values, np.nan)
    return df

def reverse_score_items(df: pd.DataFrame, reverse_items: List[str], max_value: int = None) -> pd.DataFrame:
    """Reverse score specified items."""
    if not reverse_items:
        return df

    df_copy = df.copy()
    for item in reverse_items:
        if item in df_copy.columns:
            if max_value is None:
                # Auto-detect max value
                max_val = df_copy[item].max()
            else:
                max_val = max_value

            df_copy[item] = max_val - df_copy[item]
            print(f" Reverse scored {item} (max value: {max_val})")

    return df_copy

def apply_mapping(df: pd.DataFrame, item_cols: List[str],
                 instrument_mappings: Dict, instrument: str) -> Tuple[pd.DataFrame, Dict]:
    """Apply mapping rules to convert responses to binary."""
    df_mapped = df.copy()
    mapping_log = {}

    default_mapping = instrument_mappings.get('DEFAULT', {})

    for col in item_cols:
        col_mapping = instrument_mappings.get(col, default_mapping)
        combined_mapping = {**GLOBAL_DEFAULT_MAPPING, **default_mapping, **col_mapping}

        if combined_mapping:
            original_values = df[col].dropna().unique()
            df_mapped[col] = df[col].map(combined_mapping)

            # Log what was mapped
            mapped_values = {k: v for k, v in combined_mapping.items() if k in original_values}
            mapping_log[col] = {
                'mapped_explicitly': bool(mapped_values),
                'mapping': mapped_values,
                'unmapped_values': list(set(original_values) - set(combined_mapping.keys()))
            }

    return df_mapped, mapping_log

def auto_binarize_fallback(df: pd.DataFrame, item_cols: List[str], mapping_log: Dict) -> Tuple[pd.DataFrame, Dict]:
    """Apply automatic binarization for unmapped values."""
    df_binary = df.copy()

    for col in item_cols:
        unmapped_values = mapping_log.get(col, {}).get('unmapped_values', [])

        if unmapped_values:
            print(f" Auto-binarizing {col}: {unmapped_values}")

            # Sort unique non-null values
            unique_vals = sorted([v for v in df[col].dropna().unique() if pd.notna(v)])

            if len(unique_vals) > 1:
                # Split at midpoint
                midpoint = len(unique_vals) / 2

                for i, val in enumerate(unique_vals):
                    binary_val = 0 if i < midpoint else 1
                    df_binary.loc[df_binary[col] == val, col] = binary_val

                mapping_log[col]['auto_binarized'] = True
                mapping_log[col]['auto_mapping'] = {val: (0 if i < midpoint else 1)
                                                   for i, val in enumerate(unique_vals)}

    return df_binary, mapping_log

def create_transparency_catalog(all_mapping_logs: Dict) -> pd.DataFrame:
    """Create a transparency catalog showing all mapping decisions."""
    catalog_data = []

    for instrument, mapping_log in all_mapping_logs.items():
        for item, log in mapping_log.items():
            catalog_data.append({
                'instrument': instrument,
                'item': item,
                'mapping_type': 'explicit' if log.get('mapped_explicitly') else 'auto',
                'original_values': str(log.get('mapping', {}).keys()) if log.get('mapping') else 'auto',
                'binary_mapping': str(log.get('mapping', {})) or str(log.get('auto_mapping', {})),
                'had_unmapped_values': bool(log.get('unmapped_values'))
            })

    return pd.DataFrame(catalog_data)

def to_long_format(df: pd.DataFrame, item_cols: List[str],
                  id_col: str, instrument: str) -> pd.DataFrame:
    """Convert survey data to long format."""
    # Melt to long format
    long_df = df.melt(
        id_vars=[id_col],
        value_vars=item_cols,
        var_name='item',
        value_name='value'
    )

    # Add instrument column
    long_df['instrument'] = instrument

    # Extract item numbers if possible
    long_df['item_no'] = long_df['item'].str.extract(r'(\d+)$').astype('Int64')

    # Rename id column to standard name
    long_df = long_df.rename(columns={id_col: 'participant_id'})

    # Remove missing values
    long_df = long_df.dropna(subset=['value'])

    return long_df[['participant_id', 'item', 'item_no', 'value', 'instrument']]


# Main Processing Pipeline

In [None]:
def process_all_surveys():
    """Main processing function that orchestrates the entire pipeline."""

    if not validate_config():
        print(" Please fix configuration errors before proceeding.")
        return None, None

    all_long_data = []
    all_mapping_logs = {}

    print("\n Starting survey processing pipeline...\n")

    for survey_config in SURVEYS_CONFIG:
        instrument = survey_config['instrument']
        print(f"\n{'='*50}")
        print(f"Processing: {instrument}")
        print(f"{'='*50}")

        # Expand file paths
        file_paths = expand_file_paths(survey_config['path'])

        if not file_paths:
            print(f" No files found for {instrument}")
            continue

        # Process each file
        instrument_data = []
        for file_path in file_paths:
            print(f"\n Processing file: {file_path}")

            # Load data
            read_kwargs = {}
            if 'sheet_name' in survey_config:
                read_kwargs['sheet_name'] = survey_config['sheet_name']
            if 'skip_rows' in survey_config:
                read_kwargs['skiprows'] = survey_config['skip_rows']

            df = read_table(file_path, **read_kwargs)

            # Handle missing values
            df = handle_missing_values(df, survey_config.get('missing_values', []))

            # Verify ID column exists
            id_col = survey_config['id_col']
            if id_col not in df.columns:
                print(f" ID column '{id_col}' not found in {file_path}")
                print(f"Available columns: {list(df.columns)}")
                continue

            # Detect item columns
            item_cols = detect_item_columns(
                df,
                survey_config.get('item_prefixes'),
                survey_config.get('force_auto_detect', False)
            )

            if not item_cols:
                print(f" No item columns detected in {file_path}")
                continue

            print(f" Processing {len(item_cols)} items")

            # Reverse score items if needed
            df = reverse_score_items(df, survey_config.get('reverse_items', []))

            # Apply mappings
            instrument_mappings = MAPPINGS.get(instrument, {})
            df_mapped, mapping_log = apply_mapping(df, item_cols, instrument_mappings, instrument)

            # Auto-binarize fallback
            df_binary, mapping_log = auto_binarize_fallback(df_mapped, item_cols, mapping_log)

            # Convert to long format
            long_data = to_long_format(df_binary, item_cols, id_col, instrument)
            instrument_data.append(long_data)

            print(f" Processed {len(long_data)} participant×item observations")

        # Combine all files for this instrument
        if instrument_data:
            combined_instrument_data = pd.concat(instrument_data, ignore_index=True)
            all_long_data.append(combined_instrument_data)
            all_mapping_logs[instrument] = mapping_log

            print(f" Total for {instrument}: {len(combined_instrument_data)} observations")

    # Combine all instruments
    if all_long_data:
        combined_long = pd.concat(all_long_data, ignore_index=True)
        catalog = create_transparency_catalog(all_mapping_logs)

        print(f"\n PIPELINE COMPLETE!")
        print(f" Final dataset: {len(combined_long)} total observations")
        print(f" Participants: {combined_long['participant_id'].nunique()}")
        print(f" Instruments: {combined_long['instrument'].nunique()}")
        print(f" Items: {combined_long['item'].nunique()}")

        return combined_long, catalog
    else:
        print(" No data processed successfully")
        return None, None

def export_results(combined_long: pd.DataFrame, catalog: pd.DataFrame,
                  output_dir: str = "outputs"):
    """Export the final results with multiple format options."""

    # Create output directory
    Path(output_dir).mkdir(exist_ok=True)

    # Primary CSV export
    csv_path = Path(output_dir) / "analysis_ready_long.csv"
    combined_long.to_csv(csv_path, index=False)
    print(f" Exported primary dataset: {csv_path}")

    # Excel workbook with multiple sheets
    excel_path = Path(output_dir) / "analysis_ready_outputs.xlsx"
    with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
        combined_long.to_excel(writer, sheet_name='long', index=False)
        catalog.to_excel(writer, sheet_name='catalog', index=False)

        # Summary statistics
        summary = combined_long.groupby('instrument')['value'].agg([
            'count', 'sum', 'mean'
        ]).round(3)
        summary['prevalence_%'] = (summary['mean'] * 100).round(1)
        summary.to_excel(writer, sheet_name='summary')

    print(f" Exported Excel workbook: {excel_path}")

    return csv_path, excel_path

### Diagnostic and Helper Functions

In [None]:
def print_unmapped_values():
    """Print all unique values that need mapping - helpful for configuration."""
    print(" Scanning for unmapped values in your data...\n")

    for survey_config in SURVEYS_CONFIG:
        instrument = survey_config['instrument']
        print(f"\n{instrument}:")
        print("-" * 30)

        file_paths = expand_file_paths(survey_config['path'])
        for file_path in file_paths[:1]:  # Just check first file
            try:
                df = read_table(file_path)
                item_cols = detect_item_columns(df, survey_config.get('item_prefixes'))

                for col in item_cols[:5]:  # Show first 5 items
                    unique_vals = df[col].dropna().unique()
                    print(f"  {col}: {sorted(unique_vals)}")
            except Exception as e:
                print(f"  Error reading {file_path}: {e}")

def quick_data_preview():
    """Show a quick preview of your data structure."""
    print(" Quick data preview:\n")

    for survey_config in SURVEYS_CONFIG:
        instrument = survey_config['instrument']
        print(f"{instrument}:")

        file_paths = expand_file_paths(survey_config['path'])
        if file_paths:
            try:
                df = read_table(file_paths[0])
                print(f"  Shape: {df.shape}")
                print(f"  Columns: {list(df.columns)}")
                print(f"  ID column '{survey_config['id_col']}' exists: {survey_config['id_col'] in df.columns}")
                print()
            except Exception as e:
                print(f"  Error: {e}\n")

# Final Execution

In [None]:
# Run validation and preview
print(" Survey Data Harmonization Pipeline")
print("=" * 50)

# Uncomment these lines for diagnostics:
# quick_data_preview()
# print_unmapped_values()

# Process all surveys
combined_long, catalog = process_all_surveys()

# Export results
if combined_long is not None:
    csv_path, excel_path = export_results(combined_long, catalog)

    print(f"\n SUCCESS! Your harmonized dataset is ready:")
    print(f" Primary file: {csv_path}")
    print(f" Full workbook: {excel_path}")
    print(f"\nThe 'long' format dataset contains:")
    print(f"- participant_id: Unique participant identifier")
    print(f"- item: Original survey item name")
    print(f"- item_no: Item number (if extractable)")
    print(f"- value: Harmonized binary value (0/1)")
    print(f"- instrument: Survey instrument name")

    print(f"\n Check the 'catalog' sheet for transparency on all mapping decisions!")