In [191]:
%matplotlib inline

In [192]:

from pathlib import Path
import os
from dotenv import load_dotenv
import pytz
import pandas as pd
from datetime import datetime
import json
from google.oauth2 import service_account 
from googleapiclient.discovery import build
import plotly.express as px
import traceback
import plotly.io as pio
pio.renderers.default = 'notebook'

# Load environment variables
load_dotenv()


False

# Section 1: Imports and Configuration

In [193]:

# Import configuration
from config import (
    SCOPES,
    GOOGLE_SHEET_ID,
    GOOGLE_SHEET_RANGE,
    GOOGLE_CREDENTIALS_PATH
)

# Shift Patterns Configuration
SHIFT_PATTERNS = {
    'standard': {'start': '08:00', 'end': '17:00'},
    'early': {'start': '06:00', 'end': '14:00'},
    'late': {'start': '14:00', 'end': '22:00'}
}

GROUP_SHIFTS = {
    'dalmuir': 'standard',
    'kilmalid': 'standard',
    'kb3': 'standard',  # Also handle 'NOPS'
    # Add lowercase versions
    'DALMUIR': 'standard',
    'KILMALID': 'standard',
    'KB3': 'standard',
    'NOPS': 'standard'
}

# Enhanced configuration for group mapping
GROUP_MAPPING = {
    'DALMUIR': ['dalmuir', 'Dalmuir'],
    'KILMALID': ['kilmalid', 'Kilmalid'],
    'KB3': ['kb3', 'KB3', 'NOPS'],
    'SUPERVISORS': ['dalm_sup', 'kilm_sup', 'KB3_sup']
}
# Update GROUP_SHIFTS with standardized names
GROUP_SHIFTS = {
    name: 'standard'
    for group_list in GROUP_MAPPING.values()
    for name in group_list
}

# Data quality thresholds
DATA_QUALITY_THRESHOLDS = {
    'min_heart_rate': 40,
    'max_heart_rate': 200,
    'min_records_per_day': 100,
    'max_missing_consecutive': 30  # minutes
}

# File paths configuration
DATA_DIR = Path(os.getenv('DATA_DIR', 'data'))
OUTPUT_DIR = Path(os.getenv('OUTPUT_DIR', 'outputs'))

# Create directories if they don't exist
DATA_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# File paths
BIOMETRIC_DATA_PATH = '/Users/andylow/Desktop/Data/prAnalysis/historic_processed_data_for_plotting.json'
CASK_DATA_PATH = '/Users/andylow/Desktop/Data/prAnalysis/cask_movements.csv'

DATA_QUALITY_CONFIG = {
    'sparse_data_threshold': 12,  # minimum records per hour
    'forward_fill_limit': 3,      # maximum consecutive values to fill
    'min_active_users': 3,        # minimum users for valid analysis
    'outlier_threshold': 3.0,     # z-score threshold for outliers
    'min_correlation': 0.7,       # threshold for strong correlations
    'significant_p_value': 0.05   # threshold for statistical significance
}

DEFAULT_TIMEZONE = pytz.timezone('Europe/London')  # Adjust as needed

In [194]:
# Add after imports section
print("Configuration Summary:")
print(f"SHIFT_PATTERNS: {SHIFT_PATTERNS}")
print(f"GROUP_MAPPING: {GROUP_MAPPING}")
print(f"DATA_QUALITY_THRESHOLDS: {DATA_QUALITY_THRESHOLDS}")

Configuration Summary:
SHIFT_PATTERNS: {'standard': {'start': '08:00', 'end': '17:00'}, 'early': {'start': '06:00', 'end': '14:00'}, 'late': {'start': '14:00', 'end': '22:00'}}
GROUP_MAPPING: {'DALMUIR': ['dalmuir', 'Dalmuir'], 'KILMALID': ['kilmalid', 'Kilmalid'], 'KB3': ['kb3', 'KB3', 'NOPS'], 'SUPERVISORS': ['dalm_sup', 'kilm_sup', 'KB3_sup']}
DATA_QUALITY_THRESHOLDS: {'min_heart_rate': 40, 'max_heart_rate': 200, 'min_records_per_day': 100, 'max_missing_consecutive': 30}


# Section 2: Data Loading Functions

In [195]:
def load_json_data(file_path):
    """
    Load and parse the JSON biometric data with enhanced data extraction
    """
    try:
        print(f"\nAttempting to load JSON data from: {file_path}")
        
        if not os.path.exists(file_path):
            print(f"Error: File not found at {file_path}")
            return None
            
        with open(file_path, 'r') as f:
            data = json.load(f)
        
        # Initialize lists for different types of data
        raw_data_records = []
        daily_metrics = []
        user_summary = []
        
        for user_id, user_data in data.items():
            try:
                # Extract raw time series data
                raw_data = user_data.get('historic_raw_data', [])
                for record in raw_data:
                    record['user_id'] = user_id
                    raw_data_records.append(record)
                
                # Extract daily metrics more comprehensively
                daily_data = {
                    'rhr': user_data.get('historic_daily_rhr', {}),
                    'sleep_scores': user_data.get('historic_daily_sleep_scores', {}),
                    'calm_scores': user_data.get('historic_daily_calm_scores', {}),
                    'activity_scores': user_data.get('historic_daily_activity_scores', {}),
                    'cs': user_data.get('historic_daily_cs', {}),
                    'sedentary_minutes': user_data.get('historic_sedentary_minutes_per_day', {}),
                    'liss_minutes': user_data.get('historic_liss_minutes_per_day', {}),
                    'moderate_minutes': user_data.get('historic_moderate_exercise_minutes_per_day', {}),
                    'intense_minutes': user_data.get('historic_intense_exercise_minutes_per_day', {}),
                    'consistency_score': user_data.get('historic_consistency_score', {})
                }
                
                # Convert daily metrics to records
                dates = set().union(*[d.keys() for d in daily_data.values() if d])
                for date in dates:
                    daily_metrics.append({
                        'user_id': user_id,
                        'date': date,
                        'rhr': daily_data['rhr'].get(date),
                        'sleep_score': daily_data['sleep_scores'].get(date),
                        'calm_score': daily_data['calm_scores'].get(date),
                        'activity_score': daily_data['activity_scores'].get(date),
                        'corescore': daily_data['cs'].get(date),
                        'sedentary_minutes': daily_data['sedentary_minutes'].get(date),
                        'liss_minutes': daily_data['liss_minutes'].get(date),
                        'moderate_minutes': daily_data['moderate_minutes'].get(date),
                        'intense_minutes': daily_data['intense_minutes'].get(date),
                        'consistency_score': daily_data['consistency_score'].get(date)
                    })
                
                # Extract user summary
                user_summary.append({
                    'user_id': user_id,
                    'name': user_data.get('historic_name'),
                    'earliest_date': user_data.get('historic_earliest_date'),
                    'latest_date': user_data.get('historic_latest_date'),
                    'avg_rhr': user_data.get('historic_resting_heart_rate'),
                    'avg_sleep_score': user_data.get('historic_average_sleep_scores'),
                    'avg_calm_score': user_data.get('historic_avg_calm_scores'),
                    'avg_sedentary_minutes': user_data.get('historic_avg_sedentary_minutes_per_day'),
                    'avg_liss_minutes': user_data.get('historic_avg_liss_minutes_per_day'),
                    'avg_moderate_minutes': user_data.get('historic_avg_moderate_exercise_minutes_per_day'),
                    'avg_intense_minutes': user_data.get('historic_avg_intense_exercise_minutes_per_day'),
                    'avg_consistency_score': user_data.get('historic_avg_consistency_score')
                })
                
            except Exception as e:
                print(f"Error processing user {user_id}: {str(e)}")
                continue

        # Convert to DataFrames with simplified column names
        raw_df = pd.DataFrame(raw_data_records)
        daily_df = pd.DataFrame(daily_metrics)
        user_df = pd.DataFrame(user_summary)
        
        return {
            'raw_data': raw_df,
            'daily_metrics': daily_df,
            'user_summary': user_df
        }
        
    except Exception as e:
        print(f"Error loading JSON data: {str(e)}")
        print(f"Traceback: {traceback.format_exc()}")
        return None

def load_google_sheet_data():
    """
    Load user mappings from Google Sheets with verbose logging
    """
    try:
        print("\nAttempting to load Google Sheets data...")
        
        # Verify credentials file exists
        if not os.path.exists(GOOGLE_CREDENTIALS_PATH):
            print(f"Credentials file not found at: {GOOGLE_CREDENTIALS_PATH}")
            return None

        print("Found credentials file")
        
        try:
            credentials = service_account.Credentials.from_service_account_file(
                GOOGLE_CREDENTIALS_PATH, 
                scopes=SCOPES
            )
            print("Successfully loaded credentials")
        except Exception as e:
            print(f"Error loading credentials: {e}")
            return None
        
        # Set up Google Sheets API
        try:
            service = build('sheets', 'v4', credentials=credentials)
            print("Successfully built sheets service")
        except Exception as e:
            print(f"Error building sheets service: {e}")
            return None
        
        # Call the Sheets API
        try:
            sheet = service.spreadsheets()
            result = sheet.values().get(
                spreadsheetId=GOOGLE_SHEET_ID,
                range=GOOGLE_SHEET_RANGE
            ).execute()
            print("Successfully retrieved sheet data")
        except Exception as e:
            print(f"Error retrieving sheet data: {e}")
            return None
        
        # Convert to DataFrame
        values = result.get('values', [])
        if not values:
            print('No data found in Google Sheet')
            return None
            
        # Print first few rows of raw data
        print("\nFirst few rows of raw sheet data:")
        for row in values[:3]:
            print(row)
            
        headers = [col.strip().lower() for col in values[0]]
        data = values[1:]
        df = pd.DataFrame(data, columns=headers)
        
        # Convert data types
        print("\nConverting data types...")
        try:
            df['age'] = pd.to_numeric(df['age'], errors='coerce')
            df['activity_level'] = pd.to_numeric(df['activity_level'], errors='coerce')
            df['progress_this_week'] = pd.to_numeric(df['progress_this_week'], errors='coerce')
            df['birthday'] = pd.to_datetime(df['birthday'], errors='coerce')
            print("Data type conversion successful")
        except Exception as e:
            print(f"Error converting data types: {e}")
        
        # Check for duplicates
        duplicates = df[df['user_id'].duplicated(keep=False)]
        if not duplicates.empty:
            print("\nFound duplicate user_ids:")
            print("Number of duplicates:", len(duplicates))
            print("\nDuplicate entries:")
            for user_id in duplicates['user_id'].unique():
                print(f"\nEntries for user_id: {user_id}")
                print(duplicates[duplicates['user_id'] == user_id])
        
        # Handle duplicate user_ids
        if df['user_id'].duplicated().any():
            print("\nHandling duplicate user_ids...")
            
            # Group by user_id and check for exact duplicates
            duplicates = df[df['user_id'].duplicated(keep=False)]
            
            # For exact duplicates, keep only one copy
            df = df.drop_duplicates()
            
            # For different entries with same user_id, keep the most complete record
            for user_id in df[df['user_id'].duplicated()]['user_id'].unique():
                user_records = df[df['user_id'] == user_id]
                # Keep the record with fewer null values
                best_record = user_records.isnull().sum(axis=1).idxmin()
                df = df[~(df['user_id'] == user_id)] # Remove all records for this user_id
                df = pd.concat([df, user_records.loc[[best_record]]])  # Add back the best record
            
            print(f"After handling duplicates: {len(df)} unique users")
        
        # Print final data info
        print("\nFinal DataFrame info:")
        print(df.info())
        print("\nNull values summary:")
        print(df.isnull().sum())
        
        return df
    
    except Exception as e:
        print(f"Unexpected error in load_google_sheet_data: {e}")
        return None

def load_cask_data(file_path):
    """
    Load cask movement data from CSV with verbose logging
    """
    try:
        print(f"\nAttempting to load cask data from: {file_path}")
        
        if not os.path.exists(file_path):
            print(f"Error: File not found at {file_path}")
            return None
            
        df = pd.read_csv(file_path)
        print(f"Successfully loaded CSV with {len(df)} rows")
        print("\nColumns:", df.columns.tolist())
        print("\nFirst few rows:")
        print(df.head())
        
        # Convert KB3 naming
        df['site'] = df['site'].replace('KB3', 'NOPS')
        
        # Create proper date from month and year
        df['date'] = pd.to_datetime(df[['year', 'month']].assign(day=1))
        
        # Ensure numeric columns are numeric
        numeric_cols = ['receipts', 'dispatches', 'receipts_mom_var', 'dispatches_mom_var', 
                       'ytd_receipts', 'ytd_dispatches']
        for col in numeric_cols:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors='coerce')
                print(f"\nConverted {col} to numeric. Sample values:")
                print(df[col].head())
        
        # Convert percentage values if needed
        for col in ['receipts_mom_var', 'dispatches_mom_var']:
            if col in df.columns:
                if (df[col].abs() > 100).any():
                    df[col] = df[col] / 100
                    print(f"\nConverted {col} to decimal format")
        
        return df
    
    except Exception as e:
        print(f"Error loading cask data: {e}")
        traceback.print_exc()
        return None

def validate_data_load(biometric_df, user_df, cask_df):
    """
    Test data loading functions and print detailed summaries
    """
    print("\n=== Data Loading Validation ===")
    
    print("\nBiometric Data Summary:")
    if biometric_df is not None:
        print(f"Number of records: {len(biometric_df)}")
        print(f"Columns: {biometric_df.columns.tolist()}")
        print(f"Number of unique users: {biometric_df['user_id'].nunique()}")
        if 'timestamp' in biometric_df.columns:
            print(f"Date range: {biometric_df['timestamp'].min()} to {biometric_df['timestamp'].max()}")
        print("\nSample data:")
        print(biometric_df.head())
    else:
        print("No biometric data loaded")
        
    print("\nUser Mapping Summary:")
    if user_df is not None:
        print(f"Number of users: {len(user_df)}")
        print(f"Columns: {user_df.columns.tolist()}")
        if 'group' in user_df.columns:
            print("Groups represented:", user_df['group'].unique())
        print("\nSample data:")
        print(user_df.head())
    else:
        print("No user mapping data loaded")
        
    print("\nCask Movement Summary:")
    if cask_df is not None:
        print(f"Number of records: {len(cask_df)}")
        print(f"Columns: {cask_df.columns.tolist()}")
        if 'site' in cask_df.columns:
            print("Sites represented:", cask_df['site'].unique())
        print("\nSample data:")
        print(cask_df.head())
    else:
        print("No cask movement data loaded")

In [196]:
# Add after data loading functions
# Test data loading
# Test data loading
biometric_sample = load_json_data(BIOMETRIC_DATA_PATH)
if biometric_sample is not None:
    print("\nSample of loaded raw biometric data:")
    print(biometric_sample['raw_data'].head())
    
    print("\nSample of daily metrics:")
    print(biometric_sample['daily_metrics'].head())
    
    print("\nSample of user summary:")
    print(biometric_sample['user_summary'].head())

user_sample = load_google_sheet_data()
if user_sample is not None:
    print("\nSample of loaded user data:")
    print(user_sample.head())
    
cask_sample = load_cask_data(CASK_DATA_PATH)
if cask_sample is not None:
    print("\nSample of loaded cask data:")
    print(cask_sample.head())


Attempting to load JSON data from: /Users/andylow/Desktop/Data/prAnalysis/historic_processed_data_for_plotting.json

Sample of loaded raw biometric data:
   corescore  heart_rate                  timestamp  \
0        NaN        67.2  2024-08-13T23:00:00+00:00   
1        NaN        65.6  2024-08-13T23:05:00+00:00   
2        NaN        64.4  2024-08-13T23:10:00+00:00   
3        NaN        66.4  2024-08-13T23:15:00+00:00   
4        NaN        64.6  2024-08-13T23:20:00+00:00   

                                user_id  
0  00e0b767-8e3c-481f-aa37-04d329388bc6  
1  00e0b767-8e3c-481f-aa37-04d329388bc6  
2  00e0b767-8e3c-481f-aa37-04d329388bc6  
3  00e0b767-8e3c-481f-aa37-04d329388bc6  
4  00e0b767-8e3c-481f-aa37-04d329388bc6  

Sample of daily metrics:
                                user_id        date   rhr  sleep_score  \
0  00e0b767-8e3c-481f-aa37-04d329388bc6  2024-09-18  None    28.277075   
1  00e0b767-8e3c-481f-aa37-04d329388bc6  2024-08-16  None    25.169716   
2  00e0b767-8e

# Section 3: Data Cleaning and Processing Functions

In [197]:
# Section 3: Data Cleaning and Processing Functions


def convert_to_timezone(df, timezone=DEFAULT_TIMEZONE):
    """
    Convert timestamp column to specified timezone
    """
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    if df['timestamp'].dt.tz is None:
        df['timestamp'] = df['timestamp'].dt.tz_localize('UTC')
    df['timestamp'] = df['timestamp'].dt.tz_convert(timezone)
    return df

def handle_missing_heart_rates(df):
    """
    Handle missing heart rate values through interpolation where appropriate
    """
    print("\nHandling missing heart rate values...")
    
    # Group by user and timestamp hour to handle missing values
    df = df.copy()
    df['hour'] = df['timestamp'].dt.hour
    
    def interpolate_group(group):
        # Set timestamp as index for time-based interpolation
        group = group.set_index('timestamp')
        
        # Only interpolate if we have enough good data points
        if group['heart_rate'].notna().sum() > len(group) * 0.3:  # At least 30% good data
            group['heart_rate'] = group['heart_rate'].interpolate(
                method='time', 
                limit=30  # Max 30 minutes of interpolation
            )
        
        # Reset index to get timestamp back as a column
        return group.reset_index()
    
    # Process each user separately
    result_dfs = []
    for user_id in df['user_id'].unique():
        user_data = df[df['user_id'] == user_id].copy()
        user_data = interpolate_group(user_data)
        result_dfs.append(user_data)
    
    # Combine all processed data
    df_processed = pd.concat(result_dfs, ignore_index=True)
    
    # Report remaining missing values
    missing_after = df_processed['heart_rate'].isnull().sum()
    print(f"Remaining missing heart rate values: {missing_after}")
    
    return df_processed


def clean_biometric_data(biometric_dict, user_mapping_df):
    try:
        print("\nStarting biometric data cleaning...")
        
        # Extract the data from the dictionary and validate input
        if not isinstance(biometric_dict, dict):
            print("Error: Expected dictionary input for biometric data")
            return None
            
        cleaned_df = biometric_dict['raw_data'].copy()
        daily_metrics = biometric_dict['daily_metrics'].copy()
        
        # Standardize group names in user mapping first
        user_mapping_df = user_mapping_df.copy()
        user_mapping_df['group'] = user_mapping_df['group'].fillna('Unknown')
        
        def standardize_group(group):
            if pd.isna(group):
                return 'Unknown'
            group = str(group).upper()
            
            # First check if it's a supervisor group
            if '_SUP' in group:
                return 'SUPERVISORS'
                
            # Then check other groups
            for std_name, variants in GROUP_MAPPING.items():
                if group.upper() in [v.upper() for v in variants]:
                    return std_name
            return 'Other'
        
        user_mapping_df['standardized_group'] = user_mapping_df['group'].apply(standardize_group)
        
        print(f"Standardized groups: {user_mapping_df['standardized_group'].unique().tolist()}")
        
        # Merge with user mapping
        cleaned_df = cleaned_df.merge(
            user_mapping_df[['user_id', 'standardized_group', 'org']],
            on='user_id',
            how='left'
        )
        
        # Merge with daily metrics
        cleaned_df = cleaned_df.merge(
            daily_metrics,
            on='user_id',
            how='left'
        )
        
        # Fill any remaining nulls in standardized_group
        cleaned_df['standardized_group'] = cleaned_df['standardized_group'].fillna('Unknown')
        
        print(f"Data shape after merges: {cleaned_df.shape}")
        
        # Add validation for metrics
        numeric_columns = [
            'rhr', 'sleep_score', 'calm_score', 'activity_score',
            'corescore', 'sedentary_minutes', 'liss_minutes',
            'moderate_minutes', 'intense_minutes', 'consistency_score'
        ]
        
        for col in numeric_columns:
            if col in cleaned_df.columns:
                cleaned_df[col] = pd.to_numeric(cleaned_df[col], errors='coerce')
                print(f"Converted {col} to numeric type")
        
        # Add activity ratio calculations using new column names
        cleaned_df['active_ratio'] = (
            cleaned_df['liss_minutes'].fillna(0) + 
            cleaned_df['moderate_minutes'].fillna(0) + 
            cleaned_df['intense_minutes'].fillna(0)
        ) / (cleaned_df['sedentary_minutes'].fillna(1))  # Add 1 to avoid division by zero
        
        # Apply working hours filter
        def is_working_hours(row):
            try:
                if pd.isna(row['standardized_group']):
                    return False
                
                shift = SHIFT_PATTERNS.get(GROUP_SHIFTS.get(row['standardized_group']))
                if not shift:
                    return False
                
                time = row['timestamp'].time()
                start = datetime.strptime(shift['start'], '%H:%M').time()
                end = datetime.strptime(shift['end'], '%H:%M').time()
                
                return start <= time <= end
                
            except Exception as e:
                print(f"Error processing row: {row}")
                print(f"Error details: {str(e)}")
                return False
        
        cleaned_df['is_working_hours'] = cleaned_df.apply(is_working_hours, axis=1)
        
        # Add quality flags
        cleaned_df['data_quality'] = 'good'
        cleaned_df.loc[cleaned_df['heart_rate'].isnull(), 'data_quality'] = 'missing_heart_rate'
        cleaned_df.loc[~cleaned_df['is_working_hours'], 'data_quality'] = 'outside_hours'
        
        # Add flags for missing wellbeing metrics
        for metric in ['sleep_score', 'calm_score', 'corescore']:
            if metric in cleaned_df.columns:
                cleaned_df.loc[cleaned_df[metric].isnull(), 'data_quality'] = f'missing_{metric}'
        
        print("\nCleaning summary:")
        print(f"Total records: {len(cleaned_df)}")
        print(f"Working hours records: {cleaned_df['is_working_hours'].sum()}")
        print("Data quality distribution:")
        print(cleaned_df['data_quality'].value_counts())
        
        # Handle missing values in the metrics
        for col in numeric_columns:
            if col in cleaned_df.columns:
                cleaned_df[col] = cleaned_df.groupby('user_id')[col].ffill().bfill()
                print(f"Filled missing values in {col}")
        
        cleaned_df = handle_missing_heart_rates(cleaned_df)
        
        return cleaned_df
        
    except Exception as e:
        print(f"Error in clean_biometric_data: {str(e)}")
        print(f"Traceback: {traceback.format_exc()}")
        return None

def validate_data_consistency(biometric_df, user_df, cask_df):
    """
    Validate consistency between different data sources
    """
    consistency_report = {
        'status': True,
        'issues': [],
        'warnings': []
    }
    
    try:
        # Check user coverage
        biometric_users = set(biometric_df['user_id'].unique())
        mapped_users = set(user_df['user_id'].unique())
        
        missing_users = biometric_users - mapped_users
        if missing_users:
            consistency_report['warnings'].append(
                f"Found {len(missing_users)} users in biometric data without mapping"
            )
            
        # Check group consistency
        cask_sites = set(cask_df['site'].str.upper())
        user_groups = set(user_df['group'].dropna().str.upper())
        
        # Standardize groups for comparison
        standardized_sites = {
            std_name
            for site in cask_sites
            for std_name, variants in GROUP_MAPPING.items()
            if site in [v.upper() for v in variants]
        }
        
        standardized_groups = {
            std_name
            for group in user_groups
            for std_name, variants in GROUP_MAPPING.items()
            if group in [v.upper() for v in variants]
        }
        
        missing_groups = standardized_sites - standardized_groups
        if missing_groups:
            consistency_report['warnings'].append(
                f"Sites without matching user groups: {missing_groups}"
            )
            
        extra_groups = standardized_groups - standardized_sites
        if extra_groups:
            consistency_report['warnings'].append(
                f"User groups without matching sites: {extra_groups}"
            )
            
    except Exception as e:
        consistency_report['status'] = False
        consistency_report['issues'].append(f"Error in consistency validation: {str(e)}")
        
    return consistency_report
    
def validate_group_assignments(user_df, cask_df):
    """
    Validate group assignments between user mapping and cask data
    """
    user_groups = set(user_df['group'].dropna().str.lower())
    cask_sites = set(cask_df['site'].str.lower())
    
    print("\nGroup Assignment Validation:")
    print(f"User groups: {user_groups}")
    print(f"Cask sites: {cask_sites}")
    
    # Check for mismatches
    missing_groups = cask_sites - user_groups
    if missing_groups:
        print(f"\nWarning: Sites without matching user groups: {missing_groups}")
    
    extra_groups = user_groups - cask_sites
    if extra_groups:
        print(f"\nWarning: User groups without matching sites: {extra_groups}")

def validate_data_quality(biometric_df, user_df, cask_df):
    """
    Comprehensive data quality validation
    """
    quality_report = {
        'status': True,
        'issues': [],
        'warnings': [],
        'metrics': {}
    }
    
    # Biometric data validation
    try:
        # Check heart rate ranges
        invalid_hr = biometric_df[
            (biometric_df['heart_rate'].notna()) & 
            ((biometric_df['heart_rate'] < DATA_QUALITY_THRESHOLDS['min_heart_rate']) |
             (biometric_df['heart_rate'] > DATA_QUALITY_THRESHOLDS['max_heart_rate']))
        ]
        
        if not invalid_hr.empty:
            quality_report['warnings'].append(
                f"Found {len(invalid_hr)} records with unusual heart rate values"
            )
            
        # Check data completeness
        records_per_user = biometric_df.groupby('user_id').size()
        low_data_users = records_per_user[
            records_per_user < DATA_QUALITY_THRESHOLDS['min_records_per_day']
        ]
        
        if not low_data_users.empty:
            quality_report['warnings'].append(
                f"Found {len(low_data_users)} users with insufficient data"
            )
            
        # Calculate data coverage
        total_users = user_df['user_id'].nunique()
        users_with_data = biometric_df['user_id'].nunique()
        coverage = (users_with_data / total_users) * 100
        
        quality_report['metrics']['user_coverage'] = coverage
        
    except Exception as e:
        quality_report['status'] = False
        quality_report['issues'].append(f"Error in biometric validation: {str(e)}")

    return quality_report
    
def summarize_data_quality(biometric_df, user_df, cask_df):
    """
    Generate comprehensive data quality summary
    """
    print("\nData Quality Summary:")
    
    # Biometric data quality
    print("\nBiometric Data:")
    print(f"Total records: {len(biometric_df)}")
    print(f"Users with data: {biometric_df['user_id'].nunique()}")
    print("\nMissing data rates:")
    for col in ['heart_rate', 'corescore']:
        missing_rate = (biometric_df[col].isnull().sum() / len(biometric_df)) * 100
        print(f"{col}: {missing_rate:.1f}% missing")
    
    # User mapping quality
    print("\nUser Mapping:")
    print(f"Total users: {len(user_df)}")
    print("\nGroup distribution:")
    print(user_df['group'].value_counts(dropna=False))
    
    # Cask data quality
    print("\nCask Movement Data:")
    print(f"Total records: {len(cask_df)}")
    print("\nSite distribution:")
    print(cask_df['site'].value_counts())
    
def process_cask_data(df):
    """
    Process cask movement data for analysis
    Args:
        df: Raw cask movement DataFrame
    Returns:
        Processed DataFrame with additional metrics
    """
    processed_df = df.copy()
    
    # Calculate efficiency metrics
    processed_df['dispatch_receipt_ratio'] = processed_df['dispatches'] / processed_df['receipts']
    
    # Calculate rolling averages
    processed_df['rolling_receipts'] = processed_df.groupby('site')['receipts'].rolling(window=3, min_periods=1).mean().reset_index(0, drop=True)
    processed_df['rolling_dispatches'] = processed_df.groupby('site')['dispatches'].rolling(window=3, min_periods=1).mean().reset_index(0, drop=True)
    
    return processed_df

def create_analysis_dataset(biometric_df, cask_df):
    """
    Combine biometric and cask data for analysis
    """
    try:
        print("\nCreating analysis dataset...")
        
        # First, aggregate biometric data to daily level
        daily_biometric = biometric_df[biometric_df['is_working_hours']].groupby(
            ['standardized_group', biometric_df['timestamp'].dt.date]
        ).agg({
            'heart_rate': ['mean', 'std', 'min', 'max'],
            'user_id': 'nunique'  # Number of users contributing data
        }).reset_index()
        
        # Flatten column names
        daily_biometric.columns = ['group', 'date', 'heart_rate_mean', 'heart_rate_std', 
                                 'heart_rate_min', 'heart_rate_max', 'active_users']
        
        # Convert date to datetime for merging
        daily_biometric['date'] = pd.to_datetime(daily_biometric['date'])
        
        # Add hour from timestamp
        daily_biometric['hour'] = daily_biometric['date'].dt.hour
        daily_biometric['day_of_week'] = daily_biometric['date'].dt.day_name()
        daily_biometric['week'] = daily_biometric['date'].dt.isocalendar().week
        daily_biometric['month'] = daily_biometric['date'].dt.month
        
        # Prepare cask data for merging
        cask_df['date'] = pd.to_datetime(cask_df['date'])
        
        print("\nBefore merge:")
        print("Daily biometric shape:", daily_biometric.shape)
        print("Daily biometric date range:", daily_biometric['date'].min(), "to", daily_biometric['date'].max())
        print("Cask data shape:", cask_df.shape)
        print("Cask data date range:", cask_df['date'].min(), "to", cask_df['date'].max())
        
        # Merge on group and month
        analysis_df = pd.merge(
            daily_biometric,
            cask_df,
            left_on=['group', daily_biometric['date'].dt.to_period('M')],
            right_on=['site', cask_df['date'].dt.to_period('M')],
            how='inner'
        )
        
        return analysis_df
        
    except Exception as e:
        print(f"Error in create_analysis_dataset: {str(e)}")
        print(f"Traceback: {traceback.format_exc()}")
        return None

In [198]:
# Test cleaning on sample data
if biometric_sample is not None and user_sample is not None:
    cleaned_bio = clean_biometric_data(biometric_sample, user_sample)
    if cleaned_bio is not None:
        print("\nSample of cleaned biometric data:")
        print(cleaned_bio.head())
        print("\nCleaning summary:")
        print(cleaned_bio['data_quality'].value_counts())


Starting biometric data cleaning...
Standardized groups: ['Other', 'SUPERVISORS', 'DALMUIR', 'KILMALID', 'KB3']
Data shape after merges: (218863436, 17)
Converted rhr to numeric type
Converted sleep_score to numeric type
Converted calm_score to numeric type
Converted activity_score to numeric type
Converted sedentary_minutes to numeric type
Converted liss_minutes to numeric type
Converted moderate_minutes to numeric type
Converted intense_minutes to numeric type
Converted consistency_score to numeric type


# Section 4: Analysis Functions

In [None]:

def flag_sparse_data(group_df):
    """
    Flag periods with sparse data
    Args:
        group_df: DataFrame slice for a specific time period
    Returns:
        DataFrame with sparse data flag added
    """
    expected_records = DATA_QUALITY_CONFIG['sparse_data_threshold']
    actual_records = len(group_df)
    group_df['data_sparse'] = actual_records < expected_records
    return group_df

def create_data_quality_plots(quality_results):
    """
    Create visualizations for data quality metrics
    Args:
        quality_results: Results from analyze_data_quality
    Returns:
        Dictionary of plotly figures
    """
    figures = {}
    
    # Coverage by group
    coverage_data = quality_results['group_coverage']
    fig_quality = px.bar(
        coverage_data.reset_index(),
        x='group',
        y=['sparse_hours_sum', 'active_users_mean'],
        title='Data Quality Metrics by Group',
        barmode='group',
        labels={
            'value': 'Count',
            'variable': 'Metric'
        }
    )
    
    figures['data_quality'] = fig_quality
    
    # Quality timeline
    low_quality = quality_results['low_quality_periods']
    fig_timeline = px.scatter(
        low_quality,
        x='date',
        y='group',
        size='sparse_hours',
        color='active_users',
        title='Quality Issues Timeline',
        labels={
            'sparse_hours': 'Hours with Sparse Data',
            'active_users': 'Active Users'
        }
    )
    
    figures['quality_timeline'] = fig_timeline
    
    return figures

def generate_quality_summary(quality_results):
    """
    Generate HTML summary of data quality analysis
    Args:
        quality_results: Dictionary containing quality analysis results
    Returns:
        HTML formatted string
    """
    coverage = quality_results['group_coverage']
    low_quality = quality_results['low_quality_periods']
    
    summary = f"""
    <h4>Data Coverage Summary</h4>
    <p>Total groups analyzed: {len(coverage)}</p>
    <p>Average active users per group: {coverage['active_users_mean'].mean():.1f}</p>
    <p>Total periods with quality issues: {len(low_quality)}</p>
    
    <h4>Quality Issues Breakdown</h4>
    <ul>
        <li>Periods with sparse data: {(low_quality['sparse_hours'] > 0).sum()}</li>
        <li>Periods with low user count: {(low_quality['active_users'] < DATA_QUALITY_CONFIG['min_active_users']).sum()}</li>
    </ul>
    """
    
    return summary

def generate_group_analysis(pattern_results):
    """
    Generate HTML summary of group pattern analysis
    Args:
        pattern_results: Dictionary containing pattern analysis results
    Returns:
        HTML formatted string
    """
    daily_patterns = pattern_results['daily_patterns']
    weekly_patterns = pattern_results['weekly_patterns']
    
    summary = f"""
    <h4>Daily Patterns</h4>
    <p>Peak heart rate hours identified for each group:</p>
    <ul>
    """
    
    # Add peak hours for each group
    for group in daily_patterns['mean_patterns'].index.get_level_values('group').unique():
        group_data = daily_patterns['mean_patterns'].xs(group, level='group')
        peak_hour = group_data['heart_rate_mean'].idxmax()
        summary += f"<li>{group}: Hour {peak_hour}</li>"
    
    summary += """
    </ul>
    
    <h4>Weekly Patterns</h4>
    <p>Key observations:</p>
    <ul>
    """
    
    # Add weekly pattern observations
    for group in weekly_patterns.index.get_level_values('group').unique():
        group_data = weekly_patterns.xs(group, level='group')
        peak_day = group_data['heart_rate_mean'].idxmax()
        summary += f"<li>{group}: Peak activity on {peak_day}</li>"
    
    summary += "</ul>"
    
    return summary

def analyze_group_patterns(analysis_df):
    """
    Analyze patterns within and between groups
    """
    results = {}
    
    # Define metric groups
    wellbeing_metrics = [
        'heart_rate_mean', 'historic_daily_sleep_scores', 
        'historic_daily_calm_scores', 'historic_daily_cs'
    ]
    
    activity_metrics = [
        'historic_sedentary_minutes_per_day',
        'historic_liss_minutes_per_day',
        'historic_moderate_exercise_minutes_per_day',
        'historic_intense_exercise_minutes_per_day'
    ]
    
    # Daily patterns for each metric group
    for metrics in [wellbeing_metrics, activity_metrics]:
        patterns = analysis_df.groupby(['group', 'hour'])[metrics].mean()
        variation = analysis_df.groupby(['group', 'hour'])[metrics].std()
        
        results[f'{metrics[0]}_patterns'] = {
            'mean_patterns': patterns,
            'variations': variation
        }
    
    # Add weekly aggregations
    weekly_patterns = analysis_df.groupby(['group', 'day_of_week'])[
        wellbeing_metrics + activity_metrics + ['receipts', 'dispatches']
    ].mean()
    
    results['weekly_patterns'] = weekly_patterns
    
    return results
    
    # Function for pairwise group comparisons
def analyze_group_differences(analysis_df):
    """
    Statistical analysis of differences between groups
    Args:
        analysis_df: Combined analysis DataFrame containing biometric and productivity data
    Returns:
        Dictionary containing statistical test results and outlier analyses
    """
    from scipy import stats
    import numpy as np
    
    results = {}
    
    # Define metrics to analyze
    metrics = ['heart_rate_mean', 'receipts', 'dispatches', 'dispatch_receipt_ratio']
    
    def pairwise_comparison(data, metric):
        """
        Perform pairwise statistical tests between groups
        Args:
            data: DataFrame containing the data
            metric: Column name to analyze
        Returns:
            List of dictionaries containing comparison results
        """
        groups = data['group'].unique()
        comparisons = []
        
        for i in range(len(groups)):
            for j in range(i+1, len(groups)):
                group1, group2 = groups[i], groups[j]
                group1_data = data[data['group'] == group1][metric].dropna()
                group2_data = data[data['group'] == group2][metric].dropna()
                
                # Check for sufficient data
                if len(group1_data) < 2 or len(group2_data) < 2:
                    continue
                
                # Check normality
                _, p_val1 = stats.normaltest(group1_data)
                _, p_val2 = stats.normaltest(group2_data)
                
                # Choose appropriate test based on normality
                if p_val1 > 0.05 and p_val2 > 0.05:
                    # Use t-test for normal distributions
                    stat, pval = stats.ttest_ind(group1_data, group2_data)
                    test_type = 't-test'
                else:
                    # Use Mann-Whitney U test for non-normal distributions
                    stat, pval = stats.mannwhitneyu(group1_data, group2_data, alternative='two-sided')
                    test_type = 'Mann-Whitney U'
                
                # Calculate effect size (Cohen's d)
                effect_size = (group1_data.mean() - group2_data.mean()) / np.sqrt(
                    ((group1_data.std() ** 2) + (group2_data.std() ** 2)) / 2
                )
                
                comparisons.append({
                    'group1': group1,
                    'group2': group2,
                    'metric': metric,
                    'test_type': test_type,
                    'statistic': stat,
                    'p_value': pval,
                    'effect_size': effect_size,
                    'significant': pval < DATA_QUALITY_CONFIG['significant_p_value']
                })
        
        return comparisons
    
    def identify_outliers(data, metric):
        """
        Identify outliers using robust statistical methods
        Args:
            data: DataFrame containing the data
            metric: Column name to analyze
        Returns:
            DataFrame with outlier flags and z-scores
        """
        group_stats = data.groupby('group')[metric].agg(['median', 'std']).reset_index()
        
        # Calculate robust z-scores using median and MAD
        mad = lambda x: np.median(np.abs(x - np.median(x)))
        
        outliers = []
        for group in data['group'].unique():
            group_data = data[data['group'] == group]
            median = group_data[metric].median()
            mad_value = mad(group_data[metric])
            
            # Calculate modified z-scores
            z_scores = 0.6745 * (group_data[metric] - median) / mad_value
            
            outliers.extend([{
                'group': group,
                'timestamp': row['timestamp'],
                'value': row[metric],
                'z_score': z_score,
                'is_outlier': abs(z_score) > DATA_QUALITY_CONFIG['outlier_threshold']
            } for row, z_score in zip(group_data.itertuples(), z_scores)])
        
        return pd.DataFrame(outliers)
    
    # Perform comparisons for each metric
    for metric in metrics:
        results[f'{metric}_comparisons'] = pairwise_comparison(analysis_df, metric)
        results[f'{metric}_outliers'] = identify_outliers(analysis_df, metric)
    
    # Add summary statistics
    summary_stats = analysis_df.groupby('group')[metrics].agg([
        'count', 'mean', 'std', 'min', 'max', 'median'
    ]).round(2)
    
    results['summary_statistics'] = summary_stats
    
    return results


def analyze_productivity_correlations(analysis_df):
    """
    Analyze correlations between biometric data and productivity metrics
    Args:
        analysis_df: Combined analysis DataFrame
    Returns:
        Dictionary containing correlation analyses and time-lagged correlations
    """
    import numpy as np
    from scipy import stats
    
    results = {}
    
    # Define metric groups
    biometric_cols = [
        'heart_rate_mean', 'historic_daily_sleep_scores',
        'historic_daily_calm_scores', 'historic_daily_cs',
        'historic_daily_activity_scores'
    ]
    
    activity_cols = [
        'historic_sedentary_minutes_per_day',
        'historic_liss_minutes_per_day',
        'historic_moderate_exercise_minutes_per_day',
        'historic_intense_exercise_minutes_per_day',
        'active_ratio'
    ]
    
    productivity_cols = ['receipts', 'dispatches', 'dispatch_receipt_ratio']
    
    def calculate_correlation_matrix(data, method='pearson'):
        """
        Calculate correlation matrix with confidence intervals
        """
        # Calculate correlation matrix
        corr_matrix = data[biometric_cols + activity_cols + productivity_cols].corr(method=method)
        
        # Calculate confidence intervals using Fisher z-transformation
        n = len(data)
        z_score = stats.norm.ppf(0.975)  # 95% confidence interval
        
        # Initialize confidence interval matrices
        ci_lower = pd.DataFrame(np.zeros_like(corr_matrix), 
                              index=corr_matrix.index, 
                              columns=corr_matrix.columns)
        ci_upper = ci_lower.copy()
        
        for i in corr_matrix.index:
            for j in corr_matrix.columns:
                if i != j:
                    r = corr_matrix.loc[i,j]
                    z = np.arctanh(r)
                    se = 1/np.sqrt(n-3)
                    z_lower = z - z_score*se
                    z_upper = z + z_score*se
                    ci_lower.loc[i,j] = np.tanh(z_lower)
                    ci_upper.loc[i,j] = np.tanh(z_upper)
        
        return {
            'correlation_matrix': corr_matrix,
            'ci_lower': ci_lower,
            'ci_upper': ci_upper
        }
    
    def calculate_lagged_correlation(data, col1, col2, max_lag=3):
        """
        Calculate time-lagged correlations between two variables
        """
        lagged_corrs = []
        
        for lag in range(max_lag + 1):
            # Calculate correlation
            corr = data[col1].corr(data[col2].shift(lag))
            
            # Calculate confidence interval
            n = len(data) - lag
            z = np.arctanh(corr)
            se = 1/np.sqrt(n-3)
            z_score = stats.norm.ppf(0.975)
            ci_lower = np.tanh(z - z_score*se)
            ci_upper = np.tanh(z + z_score*se)
            
            lagged_corrs.append({
                'lag': lag,
                'correlation': corr,
                'ci_lower': ci_lower,
                'ci_upper': ci_upper,
                'n_observations': n
            })
        
        return lagged_corrs
    
    # Calculate overall correlations
    results['overall_correlations'] = calculate_correlation_matrix(analysis_df, method='pearson')
    results['rank_correlations'] = calculate_correlation_matrix(analysis_df, method='spearman')
    
    # Calculate group-specific correlations
    group_correlations = {}
    for group in analysis_df['group'].unique():
        group_data = analysis_df[analysis_df['group'] == group]
        group_correlations[group] = calculate_correlation_matrix(group_data)
    
    results['group_correlations'] = group_correlations
    
    # Calculate lagged correlations for wellbeing metrics
    lagged_correlations = {}
    for bio_col in biometric_cols + activity_cols:
        for prod_col in productivity_cols:
            key = f'{bio_col}_vs_{prod_col}'
            lagged_correlations[key] = calculate_lagged_correlation(
                analysis_df, bio_col, prod_col
            )
    
    results['lagged_correlations'] = lagged_correlations
    
    # Add statistical significance tests
    significance_tests = {}
    for metric in biometric_cols + activity_cols:
        for prod_metric in productivity_cols:
            correlation = stats.pearsonr(
                analysis_df[metric].dropna(),
                analysis_df[prod_metric].dropna()
            )
            significance_tests[f'{metric}_vs_{prod_metric}'] = {
                'correlation': correlation[0],
                'p_value': correlation[1]
            }
    
    results['significance_tests'] = significance_tests
    
    return results

In [None]:
# Add after analysis functions
# Test analysis on sample data
if cleaned_bio is not None and cask_sample is not None:
    analysis_df_sample = create_analysis_dataset(cleaned_bio, cask_sample)
    if analysis_df_sample is not None:
        print("\nSample of analysis dataset:")
        print(analysis_df_sample.head())
        
        pattern_results = analyze_group_patterns(analysis_df_sample)
        print("\nSample of pattern analysis results:")
        print(pattern_results['daily_patterns']['mean_patterns'].head())

# Section 5: Visualization Functions

In [None]:


def create_daily_pattern_plots(analysis_results):
    """
    Create visualizations of daily patterns
    Args:
        analysis_results: Results from analyze_group_patterns
    Returns:
        Dictionary of plotly figures
    """
    figures = {}
    
    # Heart rate patterns throughout the day
    daily_patterns = analysis_results['daily_patterns']['mean_patterns'].reset_index()
    
    fig_heart_rate = px.line(
        daily_patterns,
        x='hour',
        y='heart_rate_mean',
        color='group',
        title='Average Heart Rate Throughout Working Hours',
        labels={'heart_rate_mean': 'Average Heart Rate', 'hour': 'Hour of Day'}
    )
    fig_heart_rate.update_layout(
        hovermode='x unified',
        showlegend=True
    )
    
    figures['daily_heart_rate'] = fig_heart_rate
    
    return figures

def create_productivity_plots(analysis_df):
    """
    Create visualizations of productivity metrics
    Args:
        analysis_df: Combined analysis DataFrame
    Returns:
        Dictionary of plotly figures
    """
    figures = {}
    
    # Monthly productivity trends
    fig_productivity = px.line(
        analysis_df,
        x='date',
        y=['receipts', 'dispatches'],
        color='group',
        title='Monthly Productivity Metrics',
        facet_col='variable',
        labels={'value': 'Count', 'date': 'Month'}
    )
    fig_productivity.update_layout(
        hovermode='x unified',
        showlegend=True
    )
    
    figures['monthly_productivity'] = fig_productivity
    
    # Efficiency ratio plot
    fig_efficiency = px.line(
        analysis_df,
        x='date',
        y='dispatch_receipt_ratio',
        color='group',
        title='Dispatch/Receipt Efficiency Ratio',
        labels={'dispatch_receipt_ratio': 'Efficiency Ratio', 'date': 'Month'}
    )
    
    figures['efficiency_ratio'] = fig_efficiency
    
    return figures

def create_correlation_plots(analysis_results):
    """
    Create correlation visualizations
    Args:
        analysis_results: Results from analyze_productivity_correlations
    Returns:
        Dictionary of plotly figures
    """
    figures = {}
    
    # Overall correlation heatmap
    corr_matrix = analysis_results['overall_correlations']
    fig_corr = px.imshow(
        corr_matrix,
        title='Correlation Matrix: Biometrics vs Productivity',
        labels=dict(color="Correlation"),
        color_continuous_scale='RdBu'
    )
    
    figures['correlation_matrix'] = fig_corr
    
    # Group-specific correlation comparison
    group_corrs = analysis_results['group_correlations']
    # Create subplot for each group
    from plotly.subplots import make_subplots
    fig_group_corr = make_subplots(
        rows=1, 
        cols=len(group_corrs),
        subplot_titles=list(group_corrs.keys())
    )
    
    for i, (group, corr) in enumerate(group_corrs.items(), 1):
        fig_group_corr.add_trace(
            px.imshow(corr).data[0],
            row=1, col=i
        )
    
    figures['group_correlations'] = fig_group_corr
    
    return figures

def analyze_data_quality(analysis_df):
    """
    Analyze data quality and coverage
    Args:
        analysis_df: Combined analysis DataFrame
    Returns:
        Dictionary containing data quality metrics and analyses
    """
    import numpy as np
    from scipy import stats
    
    results = {}
    
    # Data coverage by group
    coverage = analysis_df.groupby('group').agg({
        'sparse_hours': ['sum', 'mean'],
        'active_users': ['mean', 'min', 'max', 'std'],
        'heart_rate_mean': ['count', 'isnull', lambda x: x.isnull().sum()/len(x)],
        'receipts': ['count', 'isnull', lambda x: x.isnull().sum()/len(x)],
        'dispatches': ['count', 'isnull', lambda x: x.isnull().sum()/len(x)]
    })
    
    # Rename columns for clarity
    coverage.columns = [
        f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col 
        for col in coverage.columns
    ]
    
    results['group_coverage'] = coverage
    
    # Identify periods of low data quality
    def identify_low_quality_periods(data):
        """
        Identify periods with data quality issues
        Args:
            data: DataFrame slice to analyze
        Returns:
            DataFrame with quality flags
        """
        return pd.DataFrame({
            'date': data.index,
            'sparse_data': data['sparse_hours'] > DATA_QUALITY_CONFIG['sparse_data_threshold'],
            'low_users': data['active_users'] < DATA_QUALITY_CONFIG['min_active_users'],
            'missing_biometric': data['heart_rate_mean'].isnull(),
            'missing_productivity': data['receipts'].isnull() | data['dispatches'].isnull()
        })
    
    # Analyze quality by time period
    daily_quality = analysis_df.resample('D', on='timestamp').apply(identify_low_quality_periods)
    weekly_quality = analysis_df.resample('W', on='timestamp').apply(identify_low_quality_periods)
    monthly_quality = analysis_df.resample('M', on='timestamp').apply(identify_low_quality_periods)
    
    results['quality_by_period'] = {
        'daily': daily_quality,
        'weekly': weekly_quality,
        'monthly': monthly_quality
    }
    
    # Analyze data consistency
    def check_data_consistency(data):
        """
        Check for data consistency issues
        Args:
            data: DataFrame to analyze
        Returns:
            Dictionary of consistency checks
        """
        checks = {
            'value_ranges': {
                'heart_rate': {
                    'min': data['heart_rate_mean'].min(),
                    'max': data['heart_rate_mean'].max(),
                    'outliers': stats.zscore(data['heart_rate_mean']) > 3
                },
                'productivity': {
                    'min_receipts': data['receipts'].min(),
                    'max_receipts': data['receipts'].max(),
                    'min_dispatches': data['dispatches'].min(),
                    'max_dispatches': data['dispatches'].max(),
                    'outliers_receipts': stats.zscore(data['receipts']) > 3,
                    'outliers_dispatches': stats.zscore(data['dispatches']) > 3
                }
            },
            'timestamps': {
                'gaps': data['timestamp'].diff() > pd.Timedelta(hours=1),
                'duplicates': data['timestamp'].duplicated().sum(),
                'future_dates': data['timestamp'] > pd.Timestamp.now(),
                'past_dates': data['timestamp'] < pd.Timestamp.now() - pd.Timedelta(days=365)
            }
        }
        return checks
    
    results['data_consistency'] = check_data_consistency(analysis_df)
    
    return results

def create_quality_assessment_plots(analysis_df, quality_results):
    """
    Create comprehensive data quality visualization plots
    Args:
        analysis_df: Combined analysis DataFrame
        quality_results: Results from analyze_data_quality
    Returns:
        Dictionary containing plotly figures for data quality assessment
    """
    import plotly.express as px
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots
    
    figures = {}
    
    # Data coverage heatmap
    coverage_data = quality_results['group_coverage']
    fig_coverage = px.imshow(
        coverage_data,
        title='Data Coverage by Group',
        labels=dict(x='Metric', y='Group', color='Value'),
        aspect='auto'
    )
    figures['coverage_heatmap'] = fig_coverage
    
    # Time series of data quality metrics
    fig_quality_timeline = make_subplots(
        rows=3, cols=1,
        subplot_titles=('Daily Quality', 'Weekly Quality', 'Monthly Quality')
    )
    
    quality_periods = quality_results['quality_by_period']
    
    for i, (period, data) in enumerate(quality_periods.items(), 1):
        for col in ['sparse_data', 'low_users', 'missing_biometric', 'missing_productivity']:
            fig_quality_timeline.add_trace(
                go.Scatter(
                    x=data.index,
                    y=data[col],
                    name=f'{period}_{col}',
                    mode='lines+markers'
                ),
                row=i, col=1
            )
    
    fig_quality_timeline.update_layout(
        height=900,
        title_text="Data Quality Timeline",
        showlegend=True
    )
    figures['quality_timeline'] = fig_quality_timeline
    
    # Data consistency plots
    consistency_data = quality_results['data_consistency']
    
    # Value range analysis
    fig_ranges = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Heart Rate Distribution', 'Heart Rate Outliers',
                       'Productivity Distribution', 'Productivity Outliers')
    )
    
    # Heart rate distribution
    fig_ranges.add_trace(
        go.Histogram(
            x=analysis_df['heart_rate_mean'],
            name='Heart Rate'
        ),
        row=1, col=1
    )
    
    # Heart rate outliers
    fig_ranges.add_trace(
        go.Scatter(
            x=analysis_df['timestamp'],
            y=analysis_df['heart_rate_mean'],
            mode='markers',
            marker=dict(
                color=consistency_data['value_ranges']['heart_rate']['outliers'],
                colorscale='Viridis'
            ),
            name='Heart Rate Outliers'
        ),
        row=1, col=2
    )
    
    # Productivity distributions
    fig_ranges.add_trace(
        go.Histogram(
            x=analysis_df['receipts'],
            name='Receipts'
        ),
        row=2, col=1
    )
    fig_ranges.add_trace(
        go.Histogram(
            x=analysis_df['dispatches'],
            name='Dispatches'
        ),
        row=2, col=1
    )
    
    # Productivity outliers
    fig_ranges.add_trace(
        go.Scatter(
            x=analysis_df['timestamp'],
            y=analysis_df['receipts'],
            mode='markers',
            marker=dict(
                color=consistency_data['value_ranges']['productivity']['outliers_receipts'],
                colorscale='Viridis'
            ),
            name='Receipt Outliers'
        ),
        row=2, col=2
    )
    
    fig_ranges.update_layout(
        height=800,
        title_text="Data Consistency Analysis",
        showlegend=True
    )
    figures['consistency_analysis'] = fig_ranges
    
    return figures


In [None]:
def create_interactive_dashboard(analysis_df, analysis_results):
    """
    Create an interactive dashboard combining key visualizations
    """
    from plotly.subplots import make_subplots
    import plotly.graph_objects as go
    import plotly.express as px
    
    # Create main dashboard figure with subplots
    fig = make_subplots(
        rows=4, cols=2,
        subplot_titles=(
            'Daily Heart Rate Patterns',
            'Sleep & Calm Scores',
            'Activity Distribution',
            'Monthly Productivity',
            'Wellbeing Metrics',
            'Exercise Minutes',
            'Correlation Matrix',
            'Group Performance'
        ),
        specs=[[{"type": "scatter"}, {"type": "scatter"}],
               [{"type": "bar"}, {"type": "scatter"}],
               [{"type": "heatmap"}, {"type": "scatter"}],
               [{"type": "scatter"}, {"type": "scatter"}]]
    )
    
    # Daily heart rate patterns
    daily_patterns = analysis_results['patterns']['heart_rate_mean_patterns']['mean_patterns'].reset_index()
    for group in daily_patterns['group'].unique():
        group_data = daily_patterns[daily_patterns['group'] == group]
        fig.add_trace(
            go.Scatter(
                x=group_data['hour'],
                y=group_data['heart_rate_mean'],
                name=f'{group} - Heart Rate',
                mode='lines+markers'
            ),
            row=1, col=1
        )
    
    # Sleep and calm scores
    for metric in ['historic_daily_sleep_scores', 'historic_daily_calm_scores']:
        for group in analysis_df['group'].unique():
            group_data = analysis_df[analysis_df['group'] == group]
            fig.add_trace(
                go.Scatter(
                    x=group_data['date'],
                    y=group_data[metric],
                    name=f'{group} - {metric}',
                    mode='lines'
                ),
                row=1, col=2
            )
    
    # Activity distribution
    activity_data = analysis_df.groupby('group')[
        ['historic_sedentary_minutes_per_day', 'historic_liss_minutes_per_day',
         'historic_moderate_exercise_minutes_per_day', 'historic_intense_exercise_minutes_per_day']
    ].mean().reset_index()
    
    for col in ['historic_sedentary_minutes_per_day', 'historic_liss_minutes_per_day',
                'historic_moderate_exercise_minutes_per_day', 'historic_intense_exercise_minutes_per_day']:
        fig.add_trace(
            go.Bar(
                x=activity_data['group'],
                y=activity_data[col],
                name=col.replace('historic_', '').replace('_per_day', '')
            ),
            row=2, col=1
        )
    
    # Monthly productivity
    for group in analysis_df['group'].unique():
        group_data = analysis_df[analysis_df['group'] == group]
        fig.add_trace(
            go.Scatter(
                x=group_data['date'],
                y=group_data['dispatch_receipt_ratio'],
                name=f'{group} - Efficiency',
                mode='lines'
            ),
            row=2, col=2
        )
    
    # Correlation heatmap
    corr_matrix = analysis_results['correlations']['overall_correlations']['correlation_matrix']
    fig.add_trace(
        go.Heatmap(
            z=corr_matrix.values,
            x=corr_matrix.columns,
            y=corr_matrix.index,
            colorscale='RdBu',
            zmin=-1,
            zmax=1
        ),
        row=3, col=1
    )
    
    # Wellbeing metrics over time
    for metric in ['historic_daily_cs', 'historic_daily_activity_scores']:
        for group in analysis_df['group'].unique():
            group_data = analysis_df[analysis_df['group'] == group]
            fig.add_trace(
                go.Scatter(
                    x=group_data['date'],
                    y=group_data[metric],
                    name=f'{group} - {metric}',
                    mode='lines'
                ),
                row=3, col=2
            )
    
    # Exercise minutes trends
    exercise_trends = analysis_df.groupby('date')[
        ['historic_liss_minutes_per_day', 'historic_moderate_exercise_minutes_per_day',
         'historic_intense_exercise_minutes_per_day']
    ].mean()
    
    for col in exercise_trends.columns:
        fig.add_trace(
            go.Scatter(
                x=exercise_trends.index,
                y=exercise_trends[col],
                name=col.replace('historic_', '').replace('_per_day', ''),
                mode='lines'
            ),
            row=4, col=1
        )
    
    # Group performance comparison
    performance_metrics = analysis_df.groupby('group')[
        ['historic_daily_cs', 'historic_daily_activity_scores', 'dispatch_receipt_ratio']
    ].mean().reset_index()
    
    for metric in ['historic_daily_cs', 'historic_daily_activity_scores', 'dispatch_receipt_ratio']:
        fig.add_trace(
            go.Bar(
                x=performance_metrics['group'],
                y=performance_metrics[metric],
                name=metric.replace('historic_', '').replace('_', ' '),
            ),
            row=4, col=2
        )
    
    # Update layout
    fig.update_layout(
        height=1600,  # Increased height to accommodate new plots
        width=1600,
        title_text="Group Analysis Dashboard",
        showlegend=True,
        hovermode='closest',
        barmode='group'
    )
    
    # Update axes labels
    fig.update_xaxes(title_text="Hour of Day", row=1, col=1)
    fig.update_xaxes(title_text="Date", row=1, col=2)
    fig.update_xaxes(title_text="Group", row=2, col=1)
    fig.update_xaxes(title_text="Date", row=2, col=2)
    fig.update_xaxes(title_text="Metric", row=3, col=1)
    fig.update_xaxes(title_text="Date", row=3, col=2)
    fig.update_xaxes(title_text="Date", row=4, col=1)
    fig.update_xaxes(title_text="Group", row=4, col=2)
    
    fig.update_yaxes(title_text="Heart Rate", row=1, col=1)
    fig.update_yaxes(title_text="Score", row=1, col=2)
    fig.update_yaxes(title_text="Minutes", row=2, col=1)
    fig.update_yaxes(title_text="Efficiency Ratio", row=2, col=2)
    fig.update_yaxes(title_text="Metric", row=3, col=1)
    fig.update_yaxes(title_text="Score", row=3, col=2)
    fig.update_yaxes(title_text="Minutes", row=4, col=1)
    fig.update_yaxes(title_text="Score", row=4, col=2)
    
    return fig

In [None]:
def validate_data_structures(biometric_df, user_df, cask_df):
    """
    Validate the structure and content of input DataFrames
    Args:
        biometric_df: Biometric data DataFrame
        user_df: User mapping DataFrame
        cask_df: Cask movement DataFrame
    Returns:
        Dictionary containing validation results and any issues found
    """
    validation = {
        'status': True,
        'issues': [],
        'warnings': [],
        'schema_validation': {}
    }
    
    # Expected columns for each DataFrame
    expected_columns = {
        'biometric': ['user_id', 'timestamp', 'heart_rate'],
        'user': ['user_id', 'group', 'org'],
        'cask': ['site', 'date', 'receipts', 'dispatches', 'receipts_mom_var', 'dispatches_mom_var']
    }
    
    # Validate biometric data
    if biometric_df is not None:
        bio_issues = []
        
        # Check columns
        missing_cols = set(expected_columns['biometric']) - set(biometric_df.columns)
        if missing_cols:
            bio_issues.append(f"Missing columns in biometric data: {missing_cols}")
        
        # Check data types
        if not pd.api.types.is_datetime64_any_dtype(biometric_df['timestamp']):
            bio_issues.append("timestamp column is not datetime type")
        
        if not pd.api.types.is_numeric_dtype(biometric_df['heart_rate']):
            bio_issues.append("heart_rate column is not numeric type")
        
        validation['schema_validation']['biometric'] = bio_issues
        
    else:
        validation['issues'].append("Biometric DataFrame is None")
    
    # Validate user mapping data
    if user_df is not None:
        user_issues = []
        
        # Check columns
        missing_cols = set(expected_columns['user']) - set(user_df.columns)
        if missing_cols:
            user_issues.append(f"Missing columns in user mapping: {missing_cols}")
        
        # Check for duplicate user_ids
        if user_df['user_id'].duplicated().any():
            user_issues.append("Duplicate user_ids found in user mapping")
        
        validation['schema_validation']['user'] = user_issues
        
    else:
        validation['issues'].append("User mapping DataFrame is None")
    
    # Validate cask data
    if cask_df is not None:
        cask_issues = []
        
        # Check columns
        missing_cols = set(expected_columns['cask']) - set(cask_df.columns)
        if missing_cols:
            cask_issues.append(f"Missing columns in cask data: {missing_cols}")
        
        # Check numeric columns
        numeric_cols = ['receipts', 'dispatches', 'receipts_mom_var', 'dispatches_mom_var']
        for col in numeric_cols:
            if col in cask_df.columns and not pd.api.types.is_numeric_dtype(cask_df[col]):
                cask_issues.append(f"{col} column is not numeric type")
        
        validation['schema_validation']['cask'] = cask_issues
        
    else:
        validation['issues'].append("Cask DataFrame is None")
    
    # Update overall status
    validation['status'] = not (validation['issues'] or any(validation['schema_validation'].values()))
    
    # Add detailed validation output
    print("\nDetailed Validation Results:")
    
    if biometric_df is not None:
        print("\nBiometric Data Types:")
        print(biometric_df.dtypes)
        print("\nBiometric nulls:")
        print(biometric_df.isnull().sum())
        print("\nBiometric sample timestamps:")
        print(biometric_df['timestamp'].head())
    
    if user_df is not None:
        print("\nUser Mapping Data Types:")
        print(user_df.dtypes)
        print("\nUser mapping nulls:")
        print(user_df.isnull().sum())
        if user_df['user_id'].duplicated().any():
            duplicates = user_df[user_df['user_id'].duplicated(keep=False)]
            print("\nDuplicate user_ids found:")
            for user_id in duplicates['user_id'].unique():
                print(f"\nEntries for user_id: {user_id}")
                print(duplicates[duplicates['user_id'] == user_id])
    
    if cask_df is not None:
        print("\nCask Data Types:")
        print(cask_df.dtypes)
        print("\nCask data nulls:")
        print(cask_df.isnull().sum())
    
    return validation

def generate_analysis_report(analysis_df, analysis_results, output_path):
    """
    Generate a comprehensive analysis report in HTML format
    Args:
        analysis_df: Combined analysis DataFrame
        analysis_results: Dictionary containing all analysis results
        output_path: Path to save the HTML report
    """
    import plotly.io as pio
    from jinja2 import Template
    import json
    
    # Create report sections
    sections = {
        'summary': {
            'title': 'Executive Summary',
            'content': generate_executive_summary(analysis_results)
        },
        'data_quality': {
            'title': 'Data Quality Assessment',
            'content': generate_quality_summary(analysis_results['quality'])
        },
        'group_analysis': {
            'title': 'Group Analysis',
            'content': generate_group_analysis(analysis_results['patterns'])
        },
        'correlations': {
            'title': 'Correlation Analysis',
            'content': generate_correlation_summary(analysis_results['correlations'])
        }
    }
    
    # Convert plots to HTML
    plots_html = {}
    for name, fig in analysis_results['plots'].items():
        plots_html[name] = pio.to_html(fig, full_html=False)
    
    # Load report template
    report_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>Analysis Report</title>
        <style>
            body { font-family: Arial, sans-serif; margin: 40px; }
            .section { margin-bottom: 30px; }
            .plot { margin: 20px 0; }
            table { border-collapse: collapse; width: 100%; }
            th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
            th { background-color: #f2f2f2; }
        </style>
    </head>
    <body>
        <h1>Analysis Report</h1>
        {% for section in sections.values() %}
            <div class="section">
                <h2>{{ section.title }}</h2>
                {{ section.content }}
            </div>
        {% endfor %}
        
        <div class="section">
            <h2>Visualizations</h2>
            {% for name, plot in plots_html.items() %}
                <div class="plot">
                    <h3>{{ name }}</h3>
                    {{ plot }}
                </div>
            {% endfor %}
        </div>
    </body>
    </html>
    """
    
    # Render template
    template = Template(report_template)
    report_html = template.render(sections=sections, plots_html=plots_html)
    
    # Save report
    with open(output_path, 'w') as f:
        f.write(report_html)

def generate_executive_summary(analysis_results):
    """
    Generate executive summary of analysis results
    Args:
        analysis_results: Dictionary containing all analysis results
    Returns:
        HTML string containing executive summary
    """
    # Extract key metrics and findings
    quality_metrics = analysis_results['quality']['group_coverage']
    correlations = analysis_results['correlations']['overall_correlations']
    patterns = analysis_results['patterns']
    
    # Format summary text
    summary = f"""
    <h3>Key Findings</h3>
    <ul>
        <li>Data Quality: {format_quality_summary(quality_metrics)}</li>
        <li>Group Patterns: {format_pattern_summary(patterns)}</li>
        <li>Correlations: {format_correlation_summary(correlations)}</li>
    </ul>
    """
    
    return summary

def format_quality_summary(quality_metrics):
    """Format data quality metrics into readable text"""
    return f"Analysis covers {len(quality_metrics)} groups with average data completeness of {quality_metrics['heart_rate_mean_count'].mean():.1f}%"

def format_pattern_summary(patterns):
    """Format pattern analysis into readable text"""
    return "Key patterns identified in daily and weekly cycles across groups"

def format_correlation_summary(correlations):
    """Format correlation analysis into readable text"""
    return f"Strongest correlation observed: {correlations['correlation_matrix'].max().max():.2f}"

def validate_data_completeness(biometric_df, cask_df):
    """Validate data completeness before merging"""
    completeness = {
        'status': True,
        'issues': []
    }
    
    # Check for required columns
    required_bio = ['standardized_group', 'timestamp', 'heart_rate', 'is_working_hours']
    required_cask = ['site', 'date', 'receipts', 'dispatches']
    
    missing_bio = [col for col in required_bio if col not in biometric_df.columns]
    missing_cask = [col for col in required_cask if col not in cask_df.columns]
    
    if missing_bio:
        completeness['status'] = False
        completeness['issues'].append(f"Missing biometric columns: {missing_bio}")
    
    if missing_cask:
        completeness['status'] = False
        completeness['issues'].append(f"Missing cask columns: {missing_cask}")
    
    return completeness

In [None]:
# Add after visualization functions
# Test visualizations on sample data
if analysis_df_sample is not None and pattern_results is not None:
    import matplotlib.pyplot as plt
    %matplotlib inline  # Enable inline plotting
    
    daily_plots = create_daily_pattern_plots(pattern_results)
    if daily_plots:
        for name, fig in daily_plots.items():
            fig.show()
    
    productivity_plots = create_productivity_plots(analysis_df_sample)
    if productivity_plots:
        for name, fig in productivity_plots.items():
            fig.show()

NameError: name 'pattern_results' is not defined

# Section 6: Main Execution


In [None]:
def log_data_shape(df, step_name):
    """Log the shape and columns of a DataFrame"""
    print(f"\n{step_name}:")
    print(f"Shape: {df.shape}")
    print("Columns:", df.columns.tolist())
    print("Sample rows:")
    print(df.head())

In [None]:
def validate_group_matching(biometric_df, cask_df):
    """
    Validate group names match between datasets
    """
    try:
        bio_groups = set(biometric_df['standardized_group'].dropna().unique())
        cask_sites = set(cask_df['site'].unique())
        
        print("\nGroup matching validation:")
        print(f"Biometric groups: {bio_groups}")
        print(f"Cask sites: {cask_sites}")
        
        mismatches = bio_groups.symmetric_difference(cask_sites)
        if mismatches:
            print(f"\nWarning: Group mismatches found: {mismatches}")
            return False
        return True
        
    except Exception as e:
        print(f"Error in validate_group_matching: {str(e)}")
        print(f"Traceback: {traceback.format_exc()}")
        return False

def validate_data_completeness(biometric_df, cask_df):
    """
    Validate data completeness before merging
    """
    completeness = {
        'status': True,
        'issues': []
    }
    
    try:
        # Check for required columns
        required_bio = ['standardized_group', 'timestamp', 'heart_rate', 'is_working_hours']
        required_cask = ['site', 'date', 'receipts', 'dispatches']
        
        missing_bio = [col for col in required_bio if col not in biometric_df.columns]
        missing_cask = [col for col in required_cask if col not in cask_df.columns]
        
        if missing_bio:
            completeness['status'] = False
            completeness['issues'].append(f"Missing biometric columns: {missing_bio}")
        
        if missing_cask:
            completeness['status'] = False
            completeness['issues'].append(f"Missing cask columns: {missing_cask}")
        
        # Check for null values in key columns
        if biometric_df['standardized_group'].isnull().any():
            completeness['issues'].append(
                f"Found {biometric_df['standardized_group'].isnull().sum()} null values in standardized_group"
            )
        
        if biometric_df['heart_rate'].isnull().any():
            completeness['issues'].append(
                f"Found {biometric_df['heart_rate'].isnull().sum()} null values in heart_rate"
            )
            
        # Check date ranges
        bio_date_range = pd.date_range(
            start=biometric_df['timestamp'].min(),
            end=biometric_df['timestamp'].max(),
            freq='D'
        )
        cask_date_range = pd.date_range(
            start=cask_df['date'].min(),
            end=cask_df['date'].max(),
            freq='D'
        )
        
        date_overlap = set(bio_date_range).intersection(set(cask_date_range))
        if not date_overlap:
            completeness['status'] = False
            completeness['issues'].append("No date overlap between biometric and cask data")
        
        bio_dates = pd.to_datetime(biometric_df['timestamp']).dt.date
        cask_dates = pd.to_datetime(cask_df['date']).dt.date
        
        print("\nDate ranges:")
        print(f"Biometric data: {bio_dates.min()} to {bio_dates.max()}")
        print(f"Cask data: {cask_dates.min()} to {cask_dates.max()}")
        
        # Check for any overlap
        bio_date_set = set(bio_dates)
        cask_date_set = set(cask_dates)
        date_overlap = bio_date_set.intersection(cask_date_set)
        
        if not date_overlap:
            completeness['status'] = False
            completeness['issues'].append(
                "No date overlap between biometric and cask data\n" +
                f"Biometric: {bio_dates.min()} to {bio_dates.max()}\n" +
                f"Cask: {cask_dates.min()} to {cask_dates.max()}"
            )
        else:
            print(f"\nFound {len(date_overlap)} days of overlapping data")
            
        return completeness
        
    except Exception as e:
        print(f"Error in validate_data_completeness: {str(e)}")
        print(f"Traceback: {traceback.format_exc()}")
        completeness['status'] = False
        completeness['issues'].append(f"Error during validation: {str(e)}")
        return completeness

def summarize_validation_results(biometric_df, cask_df):
    """
    Print summary statistics of the validation
    """
    print("\nValidation Summary:")
    
    print("\nBiometric Data:")
    print(f"Total records: {len(biometric_df)}")
    print(f"Date range: {biometric_df['timestamp'].min()} to {biometric_df['timestamp'].max()}")
    print("\nGroup distribution:")
    print(biometric_df['standardized_group'].value_counts(dropna=False))
    
    print("\nCask Data:")
    print(f"Total records: {len(cask_df)}")
    print(f"Date range: {cask_df['date'].min()} to {cask_df['date'].max()}")
    print("\nSite distribution:")
    print(cask_df['site'].value_counts())

def filter_to_valid_dates(biometric_df, cask_df):
    """
    Filter both datasets to overlapping date ranges
    """
    print("\nFiltering to overlapping date ranges...")
    
    # Convert timestamps to date
    biometric_df = biometric_df.copy()
    cask_df = cask_df.copy()
    
    biometric_df['date'] = pd.to_datetime(biometric_df['timestamp']).dt.date
    cask_df['date'] = pd.to_datetime(cask_df['date']).dt.date
    
    # Get overlapping date range
    start_date = max(biometric_df['date'].min(), cask_df['date'].min())
    end_date = min(biometric_df['date'].max(), cask_df['date'].max())
    
    print(f"Filtering to date range: {start_date} to {end_date}")
    
    # Filter both datasets
    bio_filtered = biometric_df[
        (biometric_df['date'] >= start_date) & 
        (biometric_df['date'] <= end_date)
    ]
    cask_filtered = cask_df[
        (cask_df['date'] >= start_date) & 
        (cask_df['date'] <= end_date)
    ]
    
    print(f"Filtered biometric records: {len(bio_filtered)} (was {len(biometric_df)})")
    print(f"Filtered cask records: {len(cask_filtered)} (was {len(cask_df)})")
    
    return bio_filtered, cask_filtered



In [None]:
def main():
    """
    Enhanced main execution flow with comprehensive error handling and validation
    """
    try:
        print("Starting analysis pipeline...")
        
        # 1. Load Data
        print("\n=== Loading Data Sources ===")
        biometric_df = load_json_data(BIOMETRIC_DATA_PATH)
        user_df = load_google_sheet_data()
        cask_df = load_cask_data(CASK_DATA_PATH)
        
        if any(df is None for df in [biometric_df, user_df, cask_df]):
            print("Error: Failed to load one or more data sources")
            return None, None, None
        
        # 2. Validate Data Structures
        print("\n=== Validating Data Structures ===")
        structure_validation = validate_data_structures(biometric_df, user_df, cask_df)
        if not structure_validation['status']:
            print("Data structure validation failed:")
            for issue in structure_validation['issues']:
                print(f"- {issue}")
            return None, None, None
        
        # 3. Clean and Process Data
        print("\n=== Processing Data ===")
        
        # Clean biometric data
        cleaned_biometric = clean_biometric_data(biometric_df, user_df)
        if cleaned_biometric is None:
            print("Error: Failed to clean biometric data")
            return None, None, None
        log_data_shape(cleaned_biometric, "Cleaned biometric data")
        
        # Process cask data
        processed_cask = process_cask_data(cask_df)
        if processed_cask is None:
            print("Error: Failed to process cask data")
            return None, None, None
        log_data_shape(processed_cask, "Processed cask data")
        
        # 4. Validate Data Quality
        print("\n=== Validating Data Quality ===")
        quality_report = validate_data_quality(cleaned_biometric, user_df, processed_cask)
        if not quality_report['status']:
            print("Data quality validation failed:")
            for issue in quality_report['issues']:
                print(f"- {issue}")
            return None, None, None
        
        # 5. Check Data Consistency
        print("\n=== Checking Data Consistency ===")
        consistency_report = validate_data_consistency(cleaned_biometric, user_df, processed_cask)
        if not consistency_report['status']:
            print("Data consistency validation failed:")
            for issue in consistency_report['issues']:
                print(f"- {issue}")
            return None, None, None
        
        # 6. Validate Data Completeness
        print("\n=== Validating Data Completeness ===")
        completeness_check = validate_data_completeness(cleaned_biometric, processed_cask)
        if not completeness_check['status']:
            print("Data completeness check failed:")
            for issue in completeness_check['issues']:
                print(f"- {issue}")
            return None, None, None
        
        # 7. Validate Group Matching
        print("\n=== Validating Group Matching ===")
        if not validate_group_matching(cleaned_biometric, processed_cask):
            print("Warning: Proceeding with analysis despite group mismatches")
        
        print("\n=== Validation Summary ===")
        summarize_validation_results(cleaned_biometric, processed_cask)

        cleaned_biometric, processed_cask = filter_to_valid_dates(cleaned_biometric, processed_cask)

        # 8. Create Analysis Dataset
        print("\n=== Creating Analysis Dataset ===")
        analysis_df = create_analysis_dataset(cleaned_biometric, processed_cask)
        if analysis_df is None:
            print("Error: Failed to create analysis dataset")
            return None, None, None
        log_data_shape(analysis_df, "Combined analysis dataset")

        # 9. Run Analyses
        print("\n=== Running Analyses ===")
        try:
            print("\nAnalyzing group patterns...")
            pattern_results = analyze_group_patterns(analysis_df)
            
            print("\nAnalyzing group differences...")
            difference_results = analyze_group_differences(analysis_df)
            
            print("\nAnalyzing productivity correlations...")
            correlation_results = analyze_productivity_correlations(analysis_df)
            
            print("\nAnalyzing data quality...")
            quality_results = analyze_data_quality(analysis_df)
        except Exception as e:
            print(f"Error during analysis: {str(e)}")
            print(f"Traceback: {traceback.format_exc()}")
            return None, None, None
        
        # 10. Generate Visualizations
        print("\n=== Generating Visualizations ===")
        try:
            daily_plots = create_daily_pattern_plots(pattern_results)
            productivity_plots = create_productivity_plots(analysis_df)
            correlation_plots = create_correlation_plots(correlation_results)
            quality_plots = create_quality_assessment_plots(analysis_df, quality_results)
        except Exception as e:
            print(f"Error generating visualizations: {str(e)}")
            print(f"Traceback: {traceback.format_exc()}")
            return None, None, None
        
        # 11. Combine Results
        analysis_results = {
            'patterns': pattern_results,
            'differences': difference_results,
            'correlations': correlation_results,
            'quality': quality_results,
            'plots': {
                'daily': daily_plots,
                'productivity': productivity_plots,
                'correlation': correlation_plots,
                'quality': quality_plots
            }
        }
        
        # 12. Create Dashboard
        print("\n=== Creating Dashboard ===")
        try:
            dashboard = create_interactive_dashboard(analysis_df, analysis_results)
        except Exception as e:
            print(f"Error creating dashboard: {str(e)}")
            print(f"Traceback: {traceback.format_exc()}")
            return None, None, None
        
        # 13. Save Results
        print("\n=== Saving Results ===")
        try:
            # Create output directories
            output_dir = Path('outputs/plots')
            output_dir.mkdir(parents=True, exist_ok=True)
            
            # Save processed data
            analysis_df.to_csv('outputs/processed_analysis_data.csv', index=False)
            
            # Save plots
            for category, plots in analysis_results['plots'].items():
                for name, fig in plots.items():
                    plot_file = output_dir / f'{category}_{name}.html'
                    fig.write_html(str(plot_file))
            
            # Save dashboard
            dashboard.write_html('outputs/dashboard.html')
            
            # Generate reports
            generate_summary_report(analysis_results, analysis_df)
            generate_analysis_report(analysis_df, analysis_results, 'outputs/analysis_report.html')
            
        except Exception as e:
            print(f"Error saving results: {str(e)}")
            print(f"Traceback: {traceback.format_exc()}")
            return None, None, None
        
        print("\n=== Analysis Pipeline Complete! ===")
        print("All results saved in 'outputs' directory")
        
        return analysis_df, analysis_results, dashboard
        
    except Exception as e:
        print(f"Unexpected error in main execution: {str(e)}")
        print(f"Traceback: {traceback.format_exc()}")
        return None, None, None

def generate_summary_report(analysis_results, analysis_df):
    """
    Generate a summary report of key findings
    """
    with open('outputs/summary_report.txt', 'w') as f:
        f.write("Analysis Summary Report\n")
        f.write("=====================\n\n")
        
        # Data coverage
        f.write("Data Coverage:\n")
        f.write("--------------\n")
        coverage = analysis_results['quality']['group_coverage']
        f.write(f"{coverage.to_string()}\n\n")
        
        # Key findings
        f.write("Key Findings:\n")
        f.write("-------------\n")
        
        # Group differences
        differences = analysis_results['differences']
        for metric, comparisons in differences.items():
            if metric.endswith('_comparisons'):
                f.write(f"\n{metric.replace('_comparisons', '').title()} Comparisons:\n")
                for comp in comparisons:
                    if comp['p_value'] < 0.05:
                        f.write(f"- Significant difference between {comp['group1']} and {comp['group2']}\n")
        
        # Correlations
        f.write("\nStrong Correlations:\n")
        corr_matrix = analysis_results['correlations']['overall_correlations']
        strong_corrs = [(i, j, corr_matrix.loc[i,j]) 
                       for i in corr_matrix.index 
                       for j in corr_matrix.columns 
                       if abs(corr_matrix.loc[i,j]) > 0.7 and i != j]
        for i, j, corr in strong_corrs:
            f.write(f"- {i} vs {j}: {corr:.2f}\n")

In [None]:


if __name__ == "__main__":
    try:
        # Create outputs directory if it doesn't exist
        os.makedirs('outputs/plots', exist_ok=True)
        
        # Run main analysis
        analysis_df, analysis_results, dashboard = main()
    except Exception as e:
        print(f"Error in main execution: {e}")
        raise