In [1]:
# Import libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from dotenv import load_dotenv, find_dotenv
from redcap import Project #PyCap
import os

In [2]:
# Load the REDCap API URL and Key from the .env file
load_dotenv(find_dotenv())
api_url = os.getenv('REDCAP_API_URL')
api_key = os.getenv('REDCAP_API_KEY')

In [3]:
# Export the REDCap project using PyCap
project = Project(api_url, api_key)

In [7]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class REDCapConfig:
    """Configuration parameters for REDCap-compliant patient data generation"""
    sample_size: int = 10000
    seed: int = 42
    missing_data_ratio: float = 0.1
    alive_ratio: float = 0.8
    
    # Adjusted date ranges for more appropriate age distribution
    birth_year_range: tuple = (1930, 1965)  # Adjusted to ensure older population
    death_year_range: tuple = (2000, 2024)
    visit_year_range: tuple = (2000, 2024)  # Adjusted to ensure older visit ages
    
    # Diagnosis distribution (matching REDCap coding)
    diagnosis_weights: dict = None
    
    def __post_init__(self):
        if self.diagnosis_weights is None:
            self.diagnosis_weights = {
                1: 0.25,  # Alzheimer's Disease
                2: 0.15,  # Frontotemporal Dementia
                3: 0.15,  # Primary Progressive Aphasia
                4: 0.15,  # Mild Cognitive Impairment
                5: 0.30   # Control
            }

class REDCapDataGenerator:
    def __init__(self, config: REDCapConfig):
        self.config = config
        np.random.seed(config.seed)
    
    def _format_date(self, date: datetime) -> str:
        """Convert datetime to YYYY-MM-DD format"""
        return date.strftime('%Y-%m-%d')
    
    def _parse_date(self, date_str: str) -> datetime:
        """Parse YYYY-MM-DD formatted date string to datetime"""
        return datetime.strptime(date_str, '%Y-%m-%d')
    
    def _generate_birth_date(self) -> str:
        """
        Generate birth date with weighted distribution favoring earlier years
        to ensure older population
        """
        start_year, end_year = self.config.birth_year_range
        # Create weighted probabilities favoring earlier years
        years = np.arange(start_year, end_year + 1)
        weights = np.exp(-0.1 * (years - start_year))  # Exponential decay weights
        weights /= weights.sum()
        
        # Select year based on weights
        birth_year = np.random.choice(years, p=weights)
        
        # Generate random month and day
        birth_date = datetime(birth_year, 1, 1) + timedelta(days=np.random.randint(0, 365))
        return self._format_date(birth_date)

    def _generate_visit_date(self, birth_date: str, diagnosis: int) -> str:
        """
        Generate visit date ensuring appropriate age based on diagnosis
        """
        birth = self._parse_date(birth_date)
        start_date = datetime(self.config.visit_year_range[0], 1, 1)
        end_date = datetime(self.config.visit_year_range[1], 1, 1)
        
        # Adjust minimum age based on diagnosis
        min_age = 60 if diagnosis in [1, 2, 3] else 50  # Higher min age for dementia cases
        
        # Calculate earliest possible visit date based on minimum age
        earliest_visit = birth + timedelta(days=min_age * 365)
        
        # Adjust start_date if necessary
        start_date = max(start_date, earliest_visit)
        
        if start_date >= end_date:
            # If minimum age pushes us past end date, use end date and adjust birth date
            return self._format_date(end_date)
        
        # Generate random date between adjusted start and end
        days_between = (end_date - start_date).days
        random_days = np.random.randint(0, max(1, days_between))
        visit_date = start_date + timedelta(days=random_days)
        return self._format_date(visit_date)

    def _generate_death_date(self, birth_date: str, visit_date: str) -> str:
        """
        Generate death date ensuring minimum age of 70 at death
        """
        birth = self._parse_date(birth_date)
        visit = self._parse_date(visit_date)
        
        # Ensure death date is after visit date and person is at least 70
        min_death_date = max(
            visit + timedelta(days=1),
            birth + timedelta(days=70 * 365)
        )
        
        end_date = datetime(self.config.death_year_range[1], 1, 1)
        
        if min_death_date >= end_date:
            return self._format_date(end_date)
        
        days_between = (end_date - min_death_date).days
        random_days = np.random.randint(0, max(1, days_between))
        death_date = min_death_date + timedelta(days=random_days)
        return self._format_date(death_date)
    
    def _calculate_age(self, birth_date: str, reference_date: str) -> int:
        """Calculate age in years between two dates in YYYY-MM-DD format"""
        birth = self._parse_date(birth_date)
        ref = self._parse_date(reference_date)
        return (ref - birth).days // 365
    
    def _generate_mmse_score(self, diagnosis: int, age: int) -> int:
        """
        Generate MMSE score based on diagnosis and age
        """
        if diagnosis == 5:  # Control
            base_score = np.random.normal(29, 1)
            age_effect = (age - 60) * 0.05  # Slight decline with age
        elif diagnosis == 4:  # MCI
            base_score = np.random.normal(26, 1.5)
            age_effect = (age - 60) * 0.1
        else:  # Other conditions
            base_score = np.random.normal(
                loc=20 - (5 - diagnosis) * 3,
                scale=4
            )
            age_effect = (age - 60) * 0.15  # Stronger decline with age
        
        final_score = base_score - age_effect
        return int(np.clip(final_score, 0, 30))
    
    def generate_dataset(self) -> pd.DataFrame:
        """Generate REDCap-compliant patient dataset"""
        # Generate patient IDs
        patient_ids = np.random.choice(range(100000), size=self.config.sample_size, replace=False)
        
        # Generate diagnoses
        diagnoses = np.random.choice(
            list(self.config.diagnosis_weights.keys()),
            size=self.config.sample_size,
            p=list(self.config.diagnosis_weights.values())
        )
        
        # Generate birth dates
        birth_dates = [self._generate_birth_date() 
                      for _ in range(self.config.sample_size)]
        
        # Generate visit dates based on birth dates and diagnoses
        visit_dates = [self._generate_visit_date(bd, dx) 
                      for bd, dx in zip(birth_dates, diagnoses)]
        
        # Calculate visit ages first (needed for MMSE calculation)
        visit_ages = [self._calculate_age(bd, vd) 
                     for bd, vd in zip(birth_dates, visit_dates)]
        
        # Generate MMSE scores using age information
        mmse_scores = [self._generate_mmse_score(dx, age) 
                      for dx, age in zip(diagnoses, visit_ages)]
        
        # Generate death dates
        death_dates = [self._generate_death_date(bd, vd) 
                      for bd, vd in zip(birth_dates, visit_dates)]
        death_dates = np.where(
            np.random.rand(self.config.sample_size) < self.config.alive_ratio,
            "",
            death_dates
        )
        
        # Create initial dataframe
        df = pd.DataFrame({
            'patient_id': patient_ids,
            'birth_date': birth_dates,
            'diagnosis': diagnoses,
            'sex': np.random.choice([1, 2], size=self.config.sample_size),
            'handedness': np.random.choice([1, 2, 3], size=self.config.sample_size),
            'death_date': death_dates,
            'visit_date': visit_dates,
            'mmse_tot': mmse_scores
        })
        
        # Calculate ages
        df['death_age'] = df.apply(
            lambda row: self._calculate_age(row['birth_date'], row['death_date']) 
            if row['death_date'] != "" else "", 
            axis=1
        )
        
        df['visit_age'] = df.apply(
            lambda row: self._calculate_age(row['birth_date'], row['visit_date']), 
            axis=1
        )
        
        # Add missing values randomly (except for patient_id)
        for col in df.columns:
            if col != 'patient_id':
                mask = np.random.random(len(df)) < self.config.missing_data_ratio
                df.loc[mask, col] = ""
        
        # Convert all columns to strings (except patient_id)
        for col in df.columns:
            if col != 'patient_id':
                df[col] = df[col].astype(str)
        
        # Sort by patient_id
        return df.sort_values('patient_id').reset_index(drop=True)

# Usage example
if __name__ == "__main__":
    config = REDCapConfig(
        sample_size=10000,
        seed=42,
        missing_data_ratio=0.1,
        alive_ratio=0.8
    )
    
    generator = REDCapDataGenerator(config)
    df = generator.generate_dataset()
    display(df)

  df.loc[mask, col] = ""
  df.loc[mask, col] = ""
  df.loc[mask, col] = ""
  df.loc[mask, col] = ""
  df.loc[mask, col] = ""


Unnamed: 0,patient_id,birth_date,diagnosis,sex,handedness,death_date,visit_date,mmse_tot,death_age,visit_age
0,23,1941-02-14,2,2,1,,2011-11-29,,,70
1,53,1930-04-12,1,2,3,,2012-01-25,6,,81
2,70,1942-04-26,1,1,3,,,12,,
3,87,1933-05-09,4,1,3,,2022-12-22,18,,
4,97,1945-02-12,2,2,2,,2017-10-31,9,,72
...,...,...,...,...,...,...,...,...,...,...
9995,99943,,5,1,2,,2009-07-08,29,,79
9996,99954,1932-02-10,1,2,2,,2010-06-24,1,,78
9997,99971,1930-09-28,2,2,2,,2022-03-29,6,,91
9998,99991,1942-11-23,5,2,,,2005-02-11,28,,62


In [9]:
# Import the generated data into REDCap:
import_data = df.to_dict(orient='records')
response = project.import_records(import_data, overwrite='overwrite')
print(response)

{'count': 10000}
