In [1]:
# S3 Cell 1: Imports and S3 Setup
"""
NYTD S3 Data Integration - Cell 1: Imports and S3 Configuration
"""

import pandas as pd
import numpy as np
import boto3
import io
from datetime import datetime
import json
import os
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# Initialize S3 client
s3 = boto3.client('s3')

print("üîß Libraries and S3 client initialized!")
print("üìÖ Setup date:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print("üîë AWS credentials loaded from ~/.aws/credentials")
print("‚úÖ Ready to access S3 buckets!")

üîß Libraries and S3 client initialized!
üìÖ Setup date: 2025-06-26 11:53:47
üîë AWS credentials loaded from ~/.aws/credentials
‚úÖ Ready to access S3 buckets!


In [2]:
# S3 Cell 2: S3 Helper Functions
"""
NYTD S3 Data Integration - Cell 2: S3 Data Access Functions
"""

def list_s3_files(bucket, prefix="", suffix=None):
    """List files in S3 bucket"""
    try:
        paginator = s3.get_paginator('list_objects_v2')
        keys = []
        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for obj in page.get('Contents', []):
                key = obj['Key']
                if not suffix or key.lower().endswith(suffix.lower()):
                    keys.append(key)
        print(f"Found {len(keys)} files in s3://{bucket}/{prefix}")
        return keys
    except Exception as e:
        print(f"Error listing files: {e}")
        return []

def load_csv_from_s3(bucket, key):
    """Load CSV file from S3"""
    try:
        obj = s3.get_object(Bucket=bucket, Key=key)
        df = pd.read_csv(io.BytesIO(obj['Body'].read()))
        print(f"Loaded {df.shape[0]} rows, {df.shape[1]} columns from {key}")
        return df
    except Exception as e:
        print(f"Error loading {key}: {e}")
        return None

def load_excel_from_s3(bucket, key, sheet_name=None):
    """Load Excel file from S3"""
    try:
        obj = s3.get_object(Bucket=bucket, Key=key)
        excel_data = io.BytesIO(obj['Body'].read())
        
        if sheet_name:
            df = pd.read_excel(excel_data, sheet_name=sheet_name)
        else:
            # Read all sheets and return as dict
            df = pd.read_excel(excel_data, sheet_name=None)
        
        if isinstance(df, dict):
            total_rows = sum(sheet_df.shape[0] for sheet_df in df.values())
            print(f"Loaded Excel file {key} with {len(df)} sheets, {total_rows} total rows")
        else:
            print(f"Loaded {df.shape[0]} rows, {df.shape[1]} columns from {key}")
        
        return df
    except Exception as e:
        print(f"Error loading {key}: {e}")
        return None

def load_compressed_from_s3(bucket, key):
    """Load compressed .tab.gz file from S3"""
    try:
        obj = s3.get_object(Bucket=bucket, Key=key)
        compressed = io.BytesIO(obj['Body'].read())
        df = pd.read_csv(compressed, sep='\t', compression='gzip', dtype=str)
        print(f"Loaded {df.shape[0]} rows, {df.shape[1]} columns from {key}")
        return df
    except Exception as e:
        print(f"Error loading {key}: {e}")
        return None

print("‚úÖ S3 helper functions defined!")
print("üìÅ Can now load: CSV, Excel (.xlsx/.xls), and compressed (.tab.gz) files from S3")

‚úÖ S3 helper functions defined!
üìÅ Can now load: CSV, Excel (.xlsx/.xls), and compressed (.tab.gz) files from S3


In [3]:
# S3 Cell 3: NYTD S3 Integrator Class
"""
NYTD S3 Data Integration - Cell 3: Main Class Definition
"""

class NYTDS3Integrator:
    """NYTD data integration directly from S3 sources"""
    
    def __init__(self, curated_bucket='bdc-public-curated', raw_bucket='bdc-public-raw'):
        self.curated_bucket = curated_bucket
        self.raw_bucket = raw_bucket
        self.start_time = datetime.now()
        self.log = []
        self.quality_checks = {}
        
        # Cohort definitions
        self.cohorts = {
            '202': {'year': 2011, 'label': 'FY11'},
            '228': {'year': 2014, 'label': 'FY14'}, 
            '266': {'year': 2017, 'label': 'FY17'},
            '297': {'year': 2020, 'label': 'FY20'}
        }
        
        # Variables to pivot from long to wide format
        self.outcome_vars = [
            'CurrFTE', 'CurrPTE', 'EmplySklls', 'SocSecrty', 'EducAid', 
            'PubFinAs', 'PubFoodAs', 'PubHousAs', 'OthrFinAs', 'HighEdCert', 
            'CurrenRoll', 'CnctAdult', 'Homeless', 'SubAbuse', 'Incarc', 
            'Children', 'Marriage', 'Medicaid', 'OthrHlthIn', 'MedicalIn', 
            'MentlHlthIn', 'PrescripIn', 'OutcmRpt', 'OutcmFCS'
        ]
        
        # Demographic vars (one per person, don't pivot)
        self.demo_vars = [
            'StFCID', 'StFIPS', 'St', 'RecNumbr', 'DOB', 'Sex',
            'AmIAKN', 'Asian', 'BlkAfrAm', 'HawaiiPI', 'White', 
            'RaceUnkn', 'RaceDcln', 'HisOrgin', 'Baseline', 
            'Elig19', 'Elig21', 'SampleState', 'InSample'
        ]
        
        print("üöÄ NYTD S3 Integration System Initialized")
        print(f"ü™£ Curated Bucket: {curated_bucket}")
        print(f"ü™£ Raw Bucket: {raw_bucket}")
        print(f"üéØ Target Cohorts: {list(self.cohorts.keys())}")
        print(f"üìä Outcome Variables to Pivot: {len(self.outcome_vars)}")
        print(f"üë• Demographic Variables: {len(self.demo_vars)}")
    
    def _log(self, message, level="INFO"):
        """Add timestamped log entry"""
        timestamp = datetime.now().strftime("%H:%M:%S")
        log_entry = f"[{timestamp}] {level}: {message}"
        self.log.append(log_entry)
        print(f"  {log_entry}")

print("‚úÖ NYTDS3Integrator class defined!")
print("üîß Ready to discover and load NYTD data from S3")

‚úÖ NYTDS3Integrator class defined!
üîß Ready to discover and load NYTD data from S3


In [4]:
# S3 Cell 4: S3 Data Discovery Functions
"""
NYTD S3 Data Integration - Cell 4: Smart S3 File Discovery
"""

def discover_nytd_files(self):
    """Discover NYTD files in S3 buckets"""
    self._log("üîç Discovering NYTD files in S3...")
    
    discovered_files = {
        'curated': {},
        'raw': {}
    }
    
    # Search curated bucket
    curated_prefixes = ['nytd/', 'ndacan/nytd/', 'NYTD/', '']
    for prefix in curated_prefixes:
        self._log(f"üîç Searching curated bucket: s3://{self.curated_bucket}/{prefix}")
        files = list_s3_files(self.curated_bucket, prefix)
        if files:
            discovered_files['curated'][prefix] = files
    
    # Search raw bucket  
    raw_prefixes = ['ndacan/nytd/', 'nytd/', '']
    for prefix in raw_prefixes:
        self._log(f"üîç Searching raw bucket: s3://{self.raw_bucket}/{prefix}")
        files = list_s3_files(self.raw_bucket, prefix)
        if files:
            discovered_files['raw'][prefix] = files
    
    # Categorize files by type
    file_inventory = {
        'excel_files': [],
        'csv_files': [],
        'tab_gz_files': [],
        'other_files': []
    }
    
    for bucket_type, prefixes in discovered_files.items():
        for prefix, files in prefixes.items():
            for file in files:
                file_info = {
                    'bucket': self.curated_bucket if bucket_type == 'curated' else self.raw_bucket,
                    'key': file,
                    'type': bucket_type,
                    'prefix': prefix
                }
                
                if file.lower().endswith(('.xlsx', '.xls')):
                    file_inventory['excel_files'].append(file_info)
                elif file.lower().endswith('.csv'):
                    file_inventory['csv_files'].append(file_info)
                elif file.lower().endswith('.tab.gz'):
                    file_inventory['tab_gz_files'].append(file_info)
                else:
                    file_inventory['other_files'].append(file_info)
    
    # Log discovery results
    self._log(f"üìä File Discovery Results:")
    for file_type, files in file_inventory.items():
        self._log(f"   {file_type}: {len(files)} files")
        for file_info in files[:3]:  # Show first 3 of each type
            self._log(f"     ‚Ä¢ {file_info['key']}")
        if len(files) > 3:
            self._log(f"     ... and {len(files)-3} more")
    
    return file_inventory

def identify_cohort_from_filename(self, filename):
    """Smart cohort identification from filename"""
    filename_lower = filename.lower()
    
    # Strategy 1: Direct cohort number match
    for cohort in self.cohorts.keys():
        if cohort in filename_lower:
            return cohort
    
    # Strategy 2: Fiscal year match
    for cohort, info in self.cohorts.items():
        fy_patterns = [
            f"fy{info['year']}",
            f"fy{str(info['year'])[-2:]}",
            f"{info['year']}",
            f"c{str(info['year'])[-2:]}"  # C14, C17, etc.
        ]
        
        for pattern in fy_patterns:
            if pattern in filename_lower.replace('-','').replace('_',''):
                return cohort
    
    # Strategy 3: Pattern matching for known file patterns
    patterns = {
        'outcomes_c14': '202',  # FY11 cohort uses C14
        'outcomes_c17': '266',  # FY17 cohort uses C17
        'outcomes20': '297'     # FY20 cohort
    }
    
    for pattern, cohort in patterns.items():
        if pattern in filename_lower:
            return cohort
    
    return None

# Add methods to class
NYTDS3Integrator.discover_nytd_files = discover_nytd_files
NYTDS3Integrator.identify_cohort_from_filename = identify_cohort_from_filename

print("‚úÖ S3 data discovery functions added!")
print("üéØ Can now intelligently discover and categorize NYTD files in S3")

‚úÖ S3 data discovery functions added!
üéØ Can now intelligently discover and categorize NYTD files in S3


In [5]:
# S3 Cell 5: S3 Data Loading Functions
"""
NYTD S3 Data Integration - Cell 5: Intelligent Data Loading from S3
"""

def load_datasets_from_s3(self):
    """Load NYTD datasets from S3 with intelligent file prioritization"""
    self._log("üîÑ Loading NYTD datasets from S3...")
    
    # First discover what's available
    file_inventory = self.discover_nytd_files()
    
    datasets = {}
    
    # Strategy 1: Look for curated Excel files first (highest priority)
    excel_files = file_inventory['excel_files']
    if excel_files:
        self._log(f"üìä Found {len(excel_files)} Excel files, processing...")
        
        for file_info in excel_files:
            filename = os.path.basename(file_info['key'])
            cohort_found = self.identify_cohort_from_filename(filename)
            
            if cohort_found and cohort_found not in datasets:
                self._log(f"üéØ Loading cohort {cohort_found} from Excel: {file_info['key']}")
                df = load_excel_from_s3(file_info['bucket'], file_info['key'])
                
                if isinstance(df, dict):
                    # Multiple sheets - combine or pick the largest
                    largest_sheet = max(df.keys(), key=lambda x: df[x].shape[0])
                    self._log(f"   Using largest sheet: {largest_sheet}")
                    datasets[cohort_found] = df[largest_sheet]
                elif df is not None:
                    datasets[cohort_found] = df
            elif cohort_found:
                self._log(f"   Skipping {file_info['key']} - cohort {cohort_found} already loaded")
            else:
                self._log(f"‚ùì Could not identify cohort for {file_info['key']}")
    
    # Strategy 2: Look for CSV files for missing cohorts
    if len(datasets) < 4:  # We expect 4 cohorts
        csv_files = file_inventory['csv_files']
        if csv_files:
            self._log(f"üìÑ Checking {len(csv_files)} CSV files for missing cohorts...")
            
            for file_info in csv_files:
                filename = os.path.basename(file_info['key'])
                cohort_found = self.identify_cohort_from_filename(filename)
                
                if cohort_found and cohort_found not in datasets:
                    self._log(f"üéØ Loading cohort {cohort_found} from CSV: {file_info['key']}")
                    df = load_csv_from_s3(file_info['bucket'], file_info['key'])
                    if df is not None:
                        datasets[cohort_found] = df
    
    # Strategy 3: Fall back to compressed files from raw bucket
    if len(datasets) < 4:
        tab_gz_files = file_inventory['tab_gz_files']
        if tab_gz_files:
            self._log(f"üì¶ Checking {len(tab_gz_files)} compressed files for missing cohorts...")
            
            # Known file patterns for raw data
            file_patterns = {
                '202': 'outcomes_C14.tab.gz',
                '228': 'outcomes_C14.tab.gz', 
                '266': 'outcomes_C17.tab.gz',
                '297': 'Outcomes20_w3.tab.gz'
            }
            
            for cohort, pattern in file_patterns.items():
                if cohort not in datasets:
                    matching_files = [f for f in tab_gz_files 
                                    if pattern in f['key'] and f"/{cohort}/" in f['key']]
                    
                    if matching_files:
                        file_info = matching_files[0]  # Take first match
                        self._log(f"üéØ Loading cohort {cohort} from compressed: {file_info['key']}")
                        df = load_compressed_from_s3(file_info['bucket'], file_info['key'])
                        if df is not None:
                            datasets[cohort] = df
    
    # Strategy 4: Flexible pattern matching for any remaining files
    if len(datasets) < 4:
        self._log("üîç Attempting flexible pattern matching for remaining files...")
        
        all_files = (file_inventory['excel_files'] + 
                    file_inventory['csv_files'] + 
                    file_inventory['tab_gz_files'])
        
        for file_info in all_files:
            filename = os.path.basename(file_info['key'])
            
            # Try multiple identification strategies
            for cohort in self.cohorts.keys():
                if cohort not in datasets:
                    # Check for any mention of cohort year or number
                    year = self.cohorts[cohort]['year']
                    if (str(year) in filename or 
                        cohort in filename or
                        f"fy{str(year)[-2:]}" in filename.lower()):
                        
                        self._log(f"üéØ Flexible match: Loading cohort {cohort} from {file_info['key']}")
                        
                        if file_info['key'].endswith('.xlsx') or file_info['key'].endswith('.xls'):
                            df = load_excel_from_s3(file_info['bucket'], file_info['key'])
                        elif file_info['key'].endswith('.csv'):
                            df = load_csv_from_s3(file_info['bucket'], file_info['key'])
                        elif file_info['key'].endswith('.tab.gz'):
                            df = load_compressed_from_s3(file_info['bucket'], file_info['key'])
                        else:
                            continue
                            
                        if isinstance(df, dict):
                            largest_sheet = max(df.keys(), key=lambda x: df[x].shape[0])
                            df = df[largest_sheet]
                        
                        if df is not None:
                            datasets[cohort] = df
                            break
    
    if not datasets:
        self._log("‚ùå No NYTD datasets could be loaded from S3!", "ERROR")
        return None
    
    # Log final results
    self._log(f"‚úÖ Successfully loaded {len(datasets)} cohorts from S3:")
    for cohort, df in datasets.items():
        cohort_year = self.cohorts[cohort]['year']
        self._log(f"   ‚Ä¢ Cohort {cohort} (FY{cohort_year}): {df.shape[0]:,} rows √ó {df.shape[1]} cols")
    
    missing_cohorts = [c for c in self.cohorts.keys() if c not in datasets]
    if missing_cohorts:
        self._log(f"‚ö†Ô∏è  Missing cohorts: {missing_cohorts}", "WARNING")
    
    return datasets

# Add method to class
NYTDS3Integrator.load_datasets_from_s3 = load_datasets_from_s3

print("‚úÖ S3 data loading functions added!")
print("üéØ Can now intelligently load NYTD data from various S3 file formats")

‚úÖ S3 data loading functions added!
üéØ Can now intelligently load NYTD data from various S3 file formats


In [6]:
# S3 Cell 6: Data Processing Functions
"""
NYTD S3 Data Integration - Cell 6: Data Standardization and Processing
"""

def standardize_s3_datasets(self, datasets):
    """Standardize datasets loaded from S3"""
    self._log("üîß Standardizing S3 datasets...")
    
    standardized = {}
    
    # Get all unique columns across datasets
    all_columns = set()
    for df in datasets.values():
        all_columns.update(df.columns)
    
    self._log(f"   Found {len(all_columns)} unique columns across all datasets")
    
    for cohort, df in datasets.items():
        df_std = df.copy()
        
        # Add cohort identifiers
        df_std['cohort_id'] = cohort
        df_std['cohort_year'] = self.cohorts[cohort]['year']
        df_std['cohort_label'] = self.cohorts[cohort]['label']
        df_std['data_source'] = 'S3'
        
        # Add missing columns with NaN
        missing_cols = all_columns - set(df.columns)
        for col in missing_cols:
            if col not in ['cohort_id', 'cohort_year', 'cohort_label', 'data_source']:
                df_std[col] = np.nan
        
        # Standardize data types
        if 'Wave' in df_std.columns:
            df_std['Wave'] = pd.to_numeric(df_std['Wave'], errors='coerce')
        
        # Convert dates with S3-specific handling
        date_cols = ['RepDate', 'DOB', 'OutcmDte']
        for col in date_cols:
            if col in df_std.columns:
                if col == 'RepDate':
                    # Handle YYYYMM.0 format common in S3 data
                    def fix_repdate(date_val):
                        if pd.isna(date_val):
                            return pd.NaT
                        try:
                            date_str = str(date_val).replace('.0', '')
                            if len(date_str) == 6 and date_str.isdigit():  # YYYYMM format
                                year = date_str[:4]
                                month = date_str[4:6]
                                return pd.to_datetime(f"{year}-{month}-01")
                            return pd.to_datetime(date_val, errors='coerce')
                        except:
                            return pd.NaT
                    
                    df_std[col] = df_std[col].apply(fix_repdate)
                else:
                    df_std[col] = pd.to_datetime(df_std[col], errors='coerce')
        
        # Create unique person ID for linking
        if 'StFCID' in df_std.columns:
            df_std['person_id'] = df_std['StFCID'].astype(str)
        else:
            self._log(f"‚ö†Ô∏è  No StFCID column found in cohort {cohort}", "WARNING")
            df_std['person_id'] = df_std.index.astype(str)
        
        # Clean up outcome variables
        for var in self.outcome_vars:
            if var in df_std.columns:
                # Convert to numeric where possible
                df_std[var] = pd.to_numeric(df_std[var], errors='coerce')
        
        standardized[cohort] = df_std
        self._log(f"‚úÖ Standardized {cohort}: {df_std.shape}")
    
    return standardized

def combine_cohorts(self, datasets):
    """Combine all cohorts into single long-format dataset"""
    self._log("üîó Combining cohorts into integrated dataset...")
    
    # Stack all datasets
    combined_dfs = []
    for cohort, df in datasets.items():
        # Add a marker for which cohort this data came from
        df_marked = df.copy()
        df_marked['original_cohort'] = cohort
        combined_dfs.append(df_marked)
    
    integrated = pd.concat(combined_dfs, ignore_index=True, sort=False)
    
    # Sort by person and wave
    if 'person_id' in integrated.columns and 'Wave' in integrated.columns:
        integrated = integrated.sort_values(['person_id', 'Wave']).reset_index(drop=True)
    
    self._log(f"‚úÖ Integrated dataset: {integrated.shape[0]:,} rows √ó {integrated.shape[1]} cols")
    self._log(f"   Total individuals: {integrated['person_id'].nunique():,}")
    self._log(f"   Cohorts: {integrated['cohort_id'].nunique()}")
    
    if 'Wave' in integrated.columns:
        waves = sorted([w for w in integrated['Wave'].unique() if pd.notna(w)])
        self._log(f"   Waves: {waves}")
    
    return integrated

def convert_to_wide_format(self, long_df):
    """Convert from long format to wide format (pivot waves)"""
    self._log("üîÑ Converting to wide format...")
    
    # Identify variables present in the data
    available_outcome_vars = [var for var in self.outcome_vars if var in long_df.columns]
    available_demo_vars = [var for var in self.demo_vars if var in long_df.columns]
    
    self._log(f"   Pivoting {len(available_outcome_vars)} outcome variables")
    self._log(f"   Keeping {len(available_demo_vars)} demographic variables")
    
    if 'Wave' not in long_df.columns:
        self._log("‚ö†Ô∏è  No Wave column found - cannot pivot by wave", "WARNING")
        return long_df
    
    # Get demographic info (one record per person)
    # Use Wave 1 for demographics since they shouldn't change
    demo_data = long_df[long_df['Wave'] == 1].copy()
    
    if demo_data.empty:
        # If no Wave 1 data, use first available wave
        first_wave = long_df['Wave'].min()
        demo_data = long_df[long_df['Wave'] == first_wave].copy()
        self._log(f"   Using Wave {first_wave} for demographics (Wave 1 not available)")
    
    # Keep demographic variables + identifiers
    demo_cols = (['person_id', 'cohort_id', 'cohort_year', 'cohort_label', 'data_source', 'original_cohort'] + 
                 available_demo_vars)
    demo_cols = [col for col in demo_cols if col in demo_data.columns]
    demographics = demo_data[demo_cols].copy()
    
    # Remove duplicates (in case of multiple records per person in Wave 1)
    demographics = demographics.drop_duplicates(subset=['person_id'], keep='first')
    
    # Pivot outcome variables by wave
    pivot_data = []
    
    available_waves = sorted([w for w in long_df['Wave'].unique() if pd.notna(w)])
    
    for wave in available_waves:
        wave_data = long_df[long_df['Wave'] == wave].copy()
        
        # Select outcome variables for this wave
        wave_cols = ['person_id'] + available_outcome_vars
        wave_cols = [col for col in wave_cols if col in wave_data.columns]
        wave_subset = wave_data[wave_cols].copy()
        
        # Remove duplicates (keep first occurrence per person)
        wave_subset = wave_subset.drop_duplicates(subset=['person_id'], keep='first')
        
        # Rename outcome variables with wave suffix
        rename_dict = {}
        for var in available_outcome_vars:
            if var in wave_subset.columns:
                rename_dict[var] = f"{var}_{int(wave)}"
        
        wave_subset = wave_subset.rename(columns=rename_dict)
        pivot_data.append(wave_subset)
    
    # Merge all waves together
    wide_df = demographics.copy()
    
    for wave_df in pivot_data:
        wide_df = wide_df.merge(wave_df, on='person_id', how='left')
    
    self._log(f"‚úÖ Wide format: {wide_df.shape[0]:,} rows √ó {wide_df.shape[1]} cols")
    
    return wide_df

# Add methods to class
NYTDS3Integrator.standardize_s3_datasets = standardize_s3_datasets
NYTDS3Integrator.combine_cohorts = combine_cohorts
NYTDS3Integrator.convert_to_wide_format = convert_to_wide_format

print("‚úÖ Data processing functions added!")
print("üîß Can now standardize, combine, and pivot S3 data into wide format")

‚úÖ Data processing functions added!
üîß Can now standardize, combine, and pivot S3 data into wide format


In [7]:
# S3 Cell 7: Main S3 Integration Function
"""
NYTD S3 Data Integration - Cell 7: Complete Integration Workflow
"""

def run_s3_integration(self):
    """Execute complete S3 integration workflow"""
    print("\n" + "="*60)
    print("üöÄ NYTD S3 DATA INTEGRATION & WIDE FORMAT CONVERSION")
    print("="*60)
    
    try:
        # Step 1: Load datasets from S3
        self._log("üéØ Step 1: Loading datasets from S3...")
        datasets = self.load_datasets_from_s3()
        if not datasets:
            self._log("‚ùå No datasets loaded from S3!", "ERROR")
            return None
        
        # Step 2: Standardize datasets
        self._log("üéØ Step 2: Standardizing datasets...")
        standardized = self.standardize_s3_datasets(datasets)
        
        # Step 3: Combine cohorts
        self._log("üéØ Step 3: Combining cohorts...")
        integrated_long = self.combine_cohorts(standardized)
        
        # Step 4: Convert to wide format
        self._log("üéØ Step 4: Converting to wide format...")
        analytical_wide = self.convert_to_wide_format(integrated_long)
        
        # Step 5: Quality checks
        self._log("üéØ Step 5: Running quality checks...")
        qa_results = self.run_quality_checks(integrated_long, analytical_wide)
        
        # Step 6: Save results
        self._log("üéØ Step 6: Saving results...")
        saved_files = self.save_s3_results(integrated_long, analytical_wide, qa_results)
        
        # Final summary
        duration = (datetime.now() - self.start_time).total_seconds() / 60
        print(f"\nüéâ S3 INTEGRATION COMPLETE!")
        print(f"‚è±Ô∏è  Duration: {duration:.1f} minutes")
        print(f"üìä Analytical Dataset: {len(analytical_wide):,} individuals √ó {len(analytical_wide.columns)} variables")
        print(f"üìÅ Files Created:")
        for filename in saved_files.values():
            if filename:
                print(f"   ‚Ä¢ {filename}")
        
        return {
            'long_format': integrated_long,
            'wide_format': analytical_wide,
            'quality_assurance': qa_results,
            'files': saved_files,
            'processing_log': self.log
        }
        
    except Exception as e:
        self._log(f"‚ùå S3 Integration failed: {e}", "ERROR")
        import traceback
        self._log(f"   Full error: {traceback.format_exc()}", "ERROR")
        raise e

def run_quality_checks(self, long_df, wide_df):
    """Run quality assurance checks on the integrated data"""
    self._log("üîç Running quality assurance checks...")
    
    qa_results = {
        'record_counts': {
            'long_format_records': len(long_df),
            'wide_format_records': len(wide_df),
            'unique_individuals_long': long_df['person_id'].nunique() if 'person_id' in long_df.columns else 0,
            'unique_individuals_wide': wide_df['person_id'].nunique() if 'person_id' in wide_df.columns else 0
        },
        'data_integrity': {},
        'coverage_analysis': {},
        'wave_analysis': {}
    }
    
    # Check if wide format has one record per person
    if 'person_id' in wide_df.columns:
        qa_results['data_integrity']['one_record_per_person'] = len(wide_df) == wide_df['person_id'].nunique()
    
    # Check wave coverage
    for wave in [1, 2, 3]:
        wave_vars = [col for col in wide_df.columns if col.endswith(f'_{wave}')]
        if wave_vars:
            non_null_counts = {}
            for var in wave_vars[:10]:  # Check first 10 variables
                non_null_counts[var] = wide_df[var].notna().sum()
            qa_results['wave_analysis'][f'wave_{wave}'] = {
                'variables': len(wave_vars),
                'sample_coverage': non_null_counts
            }
    
    # Cohort distribution
    if 'cohort_id' in wide_df.columns:
        cohort_dist = wide_df['cohort_id'].value_counts().to_dict()
        qa_results['coverage_analysis']['cohort_distribution'] = cohort_dist
    
    # Missing data analysis for key variables
    key_vars = ['Homeless_1', 'Homeless_2', 'Homeless_3', 'CurrFTE_1', 'CurrFTE_2', 'CurrFTE_3']
    missing_analysis = {}
    for var in key_vars:
        if var in wide_df.columns:
            missing_pct = (wide_df[var].isnull().sum() / len(wide_df)) * 100
            missing_analysis[var] = round(missing_pct, 1)
    qa_results['coverage_analysis']['key_variables_missing_pct'] = missing_analysis
    
    self._log("‚úÖ Quality assurance complete")
    return qa_results

def save_s3_results(self, long_df, wide_df, qa_results):
    """Save final datasets and documentation"""
    self._log("üíæ Saving S3 integration results...")
    
    saved_files = {}
    
    try:
        # Save long format (integrated)
        long_filename = 'NYTD_S3_Integrated_Long_Format.csv'
        long_df.to_csv(long_filename, index=False)
        saved_files['long_format'] = long_filename
        self._log(f"‚úÖ Saved: {long_filename} ({len(long_df):,} rows)")
    except Exception as e:
        self._log(f"‚ùå Error saving long format: {e}", "ERROR")
        saved_files['long_format'] = None
    
    try:
        # Save wide format (analytical dataset)
        wide_filename = 'NYTD_S3_Analytical_Wide_Format.csv'
        wide_df.to_csv(wide_filename, index=False)
        saved_files['wide_format'] = wide_filename
        self._log(f"‚úÖ Saved: {wide_filename} ({len(wide_df):,} rows)")
    except Exception as e:
        self._log(f"‚ùå Error saving wide format: {e}", "ERROR")
        saved_files['wide_format'] = None
    
    try:
        # Create and save summary
        summary = pd.DataFrame({
            'Dataset': ['Long Format', 'Wide Format'],
            'Records': [len(long_df), len(wide_df)],
            'Variables': [len(long_df.columns), len(wide_df.columns)],
            'Individuals': [
                long_df['person_id'].nunique() if 'person_id' in long_df.columns else 0,
                wide_df['person_id'].nunique() if 'person_id' in wide_df.columns else 0
            ],
            'Filename': [saved_files.get('long_format', ''), saved_files.get('wide_format', '')]
        })
        
        summary_filename = 'NYTD_S3_Dataset_Summary.csv'
        summary.to_csv(summary_filename, index=False)
        saved_files['summary'] = summary_filename
        self._log(f"‚úÖ Saved: {summary_filename}")
    except Exception as e:
        self._log(f"‚ùå Error saving summary: {e}", "ERROR")
        saved_files['summary'] = None
    
    try:
        # Save comprehensive documentation
        documentation = {
            'integration_metadata': {
                'processing_date': datetime.now().isoformat(),
                'data_source': 'S3 bdc-public-curated and bdc-public-raw',
                'cohorts_processed': list(self.cohorts.keys()),
                'total_individuals': wide_df['person_id'].nunique() if 'person_id' in wide_df.columns else 0,
                'total_variables': len(wide_df.columns)
            },
            'quality_assurance': qa_results,
            'processing_log': self.log,
            'variable_catalog': {
                'demographic_variables': [col for col in wide_df.columns if not any(col.endswith(f'_{w}') for w in [1,2,3])],
                'wave_1_variables': [col for col in wide_df.columns if col.endswith('_1')],
                'wave_2_variables': [col for col in wide_df.columns if col.endswith('_2')],
                'wave_3_variables': [col for col in wide_df.columns if col.endswith('_3')]
            },
            'file_inventory': saved_files
        }
        
        doc_filename = 'NYTD_S3_Integration_Documentation.json'
        with open(doc_filename, 'w') as f:
            json.dump(documentation, f, indent=2, default=str)
        saved_files['documentation'] = doc_filename
        self._log(f"‚úÖ Saved: {doc_filename}")
    except Exception as e:
        self._log(f"‚ùå Error saving documentation: {e}", "ERROR")
        saved_files['documentation'] = None
    
    return saved_files

# Add methods to class
NYTDS3Integrator.run_s3_integration = run_s3_integration
NYTDS3Integrator.run_quality_checks = run_quality_checks
NYTDS3Integrator.save_s3_results = save_s3_results

print("‚úÖ Main S3 integration workflow function added!")
print("üéØ Ready to execute complete S3 integration pipeline")

‚úÖ Main S3 integration workflow function added!
üéØ Ready to execute complete S3 integration pipeline


In [8]:
# S3 Cell 8: Usage and Data Preview
"""
NYTD S3 Data Integration - Cell 8: Execute Integration and Preview Results
Run this cell AFTER running all previous cells (1-7)
"""

# Initialize the S3 integrator
print("üöÄ Initializing NYTD S3 Integration...")
s3_integrator = NYTDS3Integrator()

# Run the complete S3 integration
print("\nüîÑ Starting S3 integration process...")
results = s3_integrator.run_s3_integration()

# Preview the results
if results:
    print("\n" + "="*60)
    print("üìä S3 INTEGRATION RESULTS PREVIEW")
    print("="*60)
    
    wide_df = results['wide_format']
    long_df = results['long_format']
    
    print(f"\nüéØ ANALYTICAL DATASET (Wide Format)")
    print(f"üìÅ Source: S3 bdc-public-curated")
    print(f"üìä Shape: {wide_df.shape[0]:,} rows √ó {wide_df.shape[1]} columns")
    print(f"üë• Each row = 1 individual across all survey waves")
    
    print(f"\nüìã FIRST 5 RECORDS:")
    print(wide_df.head())
    
    print(f"\nüè∑Ô∏è COLUMN CATEGORIES:")
    
    # Show demographic columns
    demo_cols = [col for col in wide_df.columns 
                 if not any(col.endswith(f'_{w}') for w in [1,2,3])]
    print(f"\nüë• Demographic Variables ({len(demo_cols)}):")
    print("   ", demo_cols[:12])
    if len(demo_cols) > 12:
        print(f"   ... and {len(demo_cols)-12} more")
    
    # Show wave-specific columns
    for wave in [1, 2, 3]:
        wave_cols = [col for col in wide_df.columns if col.endswith(f'_{wave}')]
        if wave_cols:
            print(f"\nüìä Wave {wave} Outcome Variables ({len(wave_cols)}):")
            print("   ", wave_cols[:10])
            if len(wave_cols) > 10:
                print(f"   ... and {len(wave_cols)-10} more")
    
    print(f"\nüìà SAMPLE WAVE COMPARISON:")
    sample_vars = ['person_id', 'cohort_id', 'Homeless_1', 'Homeless_2', 'Homeless_3', 
                   'CurrFTE_1', 'CurrFTE_2', 'CurrFTE_3']
    available_vars = [var for var in sample_vars if var in wide_df.columns]
    if available_vars:
        print("First 3 individuals showing wave progression:")
        print(wide_df[available_vars].head(3))
    
    print(f"\nüìä COHORT DISTRIBUTION:")
    if 'cohort_id' in wide_df.columns:
        cohort_counts = wide_df['cohort_id'].value_counts().sort_index()
        for cohort, count in cohort_counts.items():
            year = s3_integrator.cohorts.get(cohort, {}).get('year', 'Unknown')
            print(f"   ‚Ä¢ Cohort {cohort} (FY{year}): {count:,} individuals")
    
    print(f"\nüìÅ FILES CREATED:")
    files = results.get('files', {})
    for file_type, filename in files.items():
        if filename:
            size = os.path.getsize(filename) if os.path.exists(filename) else 0
            print(f"   ‚Ä¢ {filename} ({size:,} bytes)")
    
    print(f"\nüéâ S3 INTEGRATION SUCCESSFUL!")
    print(f"üìÑ Main analytical file: {files.get('wide_format', 'Not saved')}")
    print(f"üìö Documentation: {files.get('documentation', 'Not saved')}")
    
    # Quick data quality summary
    qa = results.get('quality_assurance', {})
    if qa:
        print(f"\n‚úÖ QUALITY CHECKS:")
        record_counts = qa.get('record_counts', {})
        print(f"   ‚Ä¢ Wide format has one record per person: {qa.get('data_integrity', {}).get('one_record_per_person', 'Unknown')}")
        print(f"   ‚Ä¢ Total individuals: {record_counts.get('unique_individuals_wide', 'Unknown'):,}")
        print(f"   ‚Ä¢ Total observations: {record_counts.get('long_format_records', 'Unknown'):,}")
    
else:
    print("\n‚ùå S3 Integration failed!")
    print("üìã Check the error messages above for troubleshooting.")
    print("\nüîß Troubleshooting tips:")
    print("   1. Verify AWS credentials are configured")
    print("   2. Check S3 bucket permissions")
    print("   3. Confirm NYTD files exist in the specified buckets")
    print("   4. Run individual cells to identify the specific issue")

# Optional: Quick file verification
print(f"\nüìÇ LOCAL FILES CREATED:")
local_files = [f for f in os.listdir('.') if f.startswith('NYTD_S3_')]
for file in local_files:
    size = os.path.getsize(file)
    print(f"   ‚úÖ {file} ({size:,} bytes)")

if not local_files:
    print("   ‚ùå No S3 integration files found locally")

üöÄ Initializing NYTD S3 Integration...
üöÄ NYTD S3 Integration System Initialized
ü™£ Curated Bucket: bdc-public-curated
ü™£ Raw Bucket: bdc-public-raw
üéØ Target Cohorts: ['202', '228', '266', '297']
üìä Outcome Variables to Pivot: 24
üë• Demographic Variables: 19

üîÑ Starting S3 integration process...

üöÄ NYTD S3 DATA INTEGRATION & WIDE FORMAT CONVERSION
  [11:53:47] INFO: üéØ Step 1: Loading datasets from S3...
  [11:53:47] INFO: üîÑ Loading NYTD datasets from S3...
  [11:53:47] INFO: üîç Discovering NYTD files in S3...
  [11:53:47] INFO: üîç Searching curated bucket: s3://bdc-public-curated/nytd/
Found 1 files in s3://bdc-public-curated/nytd/
  [11:53:48] INFO: üîç Searching curated bucket: s3://bdc-public-curated/ndacan/nytd/
Found 11 files in s3://bdc-public-curated/ndacan/nytd/
  [11:53:48] INFO: üîç Searching curated bucket: s3://bdc-public-curated/NYTD/
Found 0 files in s3://bdc-public-curated/NYTD/
  [11:53:48] INFO: üîç Searching curated bucket: s3://bdc-p