In [1]:
import psycopg2
from psycopg2.extras import DictCursor, execute_batch
from datetime import datetime, timedelta
import schedule
import time
import logging

In [2]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
class HealthcareDataPipeline():
    def __init__(self):
        self.conn = psycopg2.connect(
            dbname="healthcare_data",
            user="your_user",
            password=input("Enter password key: "),
            host="localhost",
            port="5432"
        )
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    

clone

In [3]:
class HealthcareDataPipeline():
    def __init__(self):
        self.conn = psycopg2.connect(
            dbname="healthcare_data",
            user="your_user",
            password=input("Enter password key: "),
            host="localhost",
            port="5432"
        )
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def extract_new_data(self, batch_size=100):
        """Extract new or updated data from master table"""
        query = """
            SELECT * FROM private_data.master_data 
            WHERE created_at > %s OR updated_at > %s
            ORDER BY created_at 
            LIMIT %s
        """
        
        # Get last run time
        last_run = datetime.now() - timedelta(hours=1)  # Adjust as needed
        
        self.cursor.execute(query, (last_run, last_run, batch_size))
        return self.cursor.fetchall()
    
    def transform_data(self, row):
        """Transform master data into target table structures"""
        transformed = {
            'billing': {
                'patient_id': row['patient_id'],
                'total_cost': row['total_cost'],
                'insurance_coverage': row['insurance_coverage'],
                'out_of_pocket': row['out_of_pocket']
            },
            'diagnosis': {
                'patient_id': row['patient_id'],
                'icd_10_code': row['icd_10_code'],
                'diagnosis_name': row['primary_diagnosis'],
                'diagnosis_category': row['diagnosis_category']
            },
            'encounters': {
                'encounter_id': row['encounter_id'],
                'patient_id': row['patient_id'],
                'admission_date': row['admission_date'],
                'release_date': row['release_date'],
                'primary_diagnosis_code': row['icd_10_code'],
                'died': row['died'],
                'cause_of_death': row['cause_of_death'],
                'death_date': row['death_date'],
                'transferred': row['transferred'],
                'transfer_hospitals': row['transfer_hospitals'],
                'transfer_date': row['transfer_date']
            },
            'follow_up_appointments': {
                'patient_id': row['patient_id'],
                'follow_up_date': row['follow_up_appointment'],
                'attending_physician': row['attending_physician']
            },
            'patient_financial_info': {
                'patient_id': row['patient_id'],
                'credit_card_number': row['patient_credit_card'],
                'credit_card_expiry': row['patient_credit_card_expiry'],
                'credit_card_cvv': row['patient_credit_card_cvv'],
                'insurance_provider': row['insurance_provider'],
                'insurance_id': row['insurance_id']
            }
        }
        return transformed
    
    def load_data(self, data_dict):
        """Load data into respective tables"""
        current_time = datetime.now()
        
        # Load billing data
        billing_query = """
            INSERT INTO healthcare.billing 
            (patient_id, total_cost, insurance_coverage, out_of_pocket, updated_at)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (patient_id) DO UPDATE SET
            total_cost = EXCLUDED.total_cost,
            insurance_coverage = EXCLUDED.insurance_coverage,
            out_of_pocket = EXCLUDED.out_of_pocket,
            updated_at = EXCLUDED.updated_at
        """
        billing_values = (
            data_dict['billing']['patient_id'],
            data_dict['billing']['total_cost'],
            data_dict['billing']['insurance_coverage'],
            data_dict['billing']['out_of_pocket'],
            current_time
        )
        
        # Load diagnosis data
        diagnosis_query = """
            INSERT INTO healthcare.diagnosis 
            (patient_id, icd_10_code, diagnosis_name, diagnosis_category)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (patient_id, icd_10_code) DO UPDATE SET
            diagnosis_name = EXCLUDED.diagnosis_name,
            diagnosis_category = EXCLUDED.diagnosis_category
        """
        diagnosis_values = (
            data_dict['diagnosis']['patient_id'],
            data_dict['diagnosis']['icd_10_code'],
            data_dict['diagnosis']['diagnosis_name'],
            data_dict['diagnosis']['diagnosis_category']
        )
        
        # Load encounters data
        encounters_query = """
            INSERT INTO healthcare.encounters 
            (encounter_id, patient_id, admission_date, release_date, 
             primary_diagnosis_code, died, cause_of_death, death_date,
             transferred, transfer_hospitals, transfer_date)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (encounter_id) DO UPDATE SET
            patient_id = EXCLUDED.patient_id,
            admission_date = EXCLUDED.admission_date,
            release_date = EXCLUDED.release_date,
            primary_diagnosis_code = EXCLUDED.primary_diagnosis_code,
            died = EXCLUDED.died,
            cause_of_death = EXCLUDED.cause_of_death,
            death_date = EXCLUDED.death_date,
            transferred = EXCLUDED.transferred,
            transfer_hospitals = EXCLUDED.transfer_hospitals,
            transfer_date = EXCLUDED.transfer_date
        """
        encounters_values = tuple(data_dict['encounters'].values())
        
        # Execute all queries
        queries = [
            (billing_query, billing_values),
            (diagnosis_query, diagnosis_values),
            (encounters_query, encounters_values)
        ]
        
        if data_dict['follow_up_appointments']['follow_up_date']:
            follow_up_query = """
                INSERT INTO healthcare.follow_up_appointments 
                (patient_id, follow_up_date, attending_physician, updated_at)
                VALUES (%s, %s, %s, %s)
                ON CONFLICT (patient_id) DO UPDATE SET
                follow_up_date = EXCLUDED.follow_up_date,
                attending_physician = EXCLUDED.attending_physician,
                updated_at = EXCLUDED.updated_at
            """
            follow_up_values = (
                data_dict['follow_up_appointments']['patient_id'],
                data_dict['follow_up_appointments']['follow_up_date'],
                data_dict['follow_up_appointments']['attending_physician'],
                current_time
            )
            queries.append((follow_up_query, follow_up_values))
        
        if data_dict['patient_financial_info']['credit_card_number']:
            financial_query = """
                INSERT INTO healthcare.patient_financial_info 
                (patient_id, credit_card_number, credit_card_expiry, 
                 credit_card_cvv, insurance_provider, insurance_id, updated_at)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (patient_id) DO UPDATE SET
                credit_card_number = EXCLUDED.credit_card_number,
                credit_card_expiry = EXCLUDED.credit_card_expiry,
                credit_card_cvv = EXCLUDED.credit_card_cvv,
                insurance_provider = EXCLUDED.insurance_provider,
                insurance_id = EXCLUDED.insurance_id,
                updated_at = EXCLUDED.updated_at
            """
            financial_values = tuple(data_dict['patient_financial_info'].values()) + (current_time,)
            queries.append((financial_query, financial_values))
        
        # Execute all queries in transaction
        try:
            for query, values in queries:
                self.cursor.execute(query, values)
            self.conn.commit()
            logger.info(f"Successfully loaded data for patient {data_dict['billing']['patient_id']}")
            return True
        except Exception as e:
            self.conn.rollback()
            logger.error(f"Error loading data: {e}")
            return False
    
    def run_pipeline(self):
        """Main pipeline execution"""
        logger.info("Starting healthcare data pipeline...")
        
        try:
            new_data = self.extract_new_data()
            processed_count = 0
            
            for row in new_data:
                transformed_data = self.transform_data(row)
                success = self.load_data(transformed_data)
                if success:
                    processed_count += 1
            
            logger.info(f"Pipeline completed. Processed {processed_count} records.")
        except Exception as e:
            logger.error(f"Pipeline failed: {e}")
    
    def close(self):
        self.cursor.close()
        self.conn.close()

In [None]:
def scheduled_pipeline():
    pipeline = HealthcareDataPipeline()
    pipeline.run_pipeline()
    pipeline.close()

if __name__ == "__main__":
    # For scheduled execution
    schedule.every().hour.do(scheduled_pipeline)
    
    # For one-time execution
    # scheduled_pipeline()
    
    # Keep script running for scheduled execution
    while True:
        schedule.run_pending()
        time.sleep(1)