In [3]:
#!/usr/bin/env python3
"""
Debug script to test DLT pipeline components one by one
"""
import os
import sys
import time
import json
import logging
from kafka import KafkaConsumer
from dlt.destinations import postgres

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('debug')

pipeline_name = 'voters'
table_name = 'voters'

def get_destination_config():
    """Get destination configuration based on storage preference"""
    storage_preference = os.environ.get('STORAGE_PREFERENCE', 'postgres').upper()
    
    if storage_preference == 'GCP':
        # Check if GCP credentials file path is provided
        gcp_creds_path = os.environ.get('GCP_CREDENTIALS_PATH')
        
        if gcp_creds_path and os.path.exists(gcp_creds_path):
            # Load GCP credentials from file
            with open(gcp_creds_path, 'r') as f:
                gcp_creds = json.load(f)
            return 'bigquery', gcp_creds
        else:
            # If credentials file is not provided, try to use application default credentials
            print("GCP credentials file not found, using application default credentials")
            return 'bigquery', None
    else:
        # PostgreSQL connection
        pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
        pg_port = os.environ.get('POSTGRES_PORT', '5432')
        pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
        pg_user = os.environ.get('POSTGRES_USER', 'postgres')
        pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
        
        connection_string = f"postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_db}"
        return 'postgres', connection_string

from kafka import KafkaConsumer
import json
import time
import os
import dlt

def kafka_voters_source():
    """Source function that reads voters from Kafka"""
    consumer = KafkaConsumer(
        'voters',
        bootstrap_servers=['localhost:29092'],
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id='dlt-voter-group'
    )
    
    for message in consumer:
        logger.info(f"Received voter message: {message.value}")
        yield message.value

def kafka_votes_source():
    """Source function that reads votes from Kafka"""
    consumer = KafkaConsumer(
        'votes',
        bootstrap_servers=['localhost:29092'],
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id='dlt-votes-group'
    )
    
    for message in consumer:
        yield message.value

def test_kafka_connection():
    """Test connection to Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    logger.info(f"Testing Kafka connection to {bootstrap_servers}")
    
    try:
        logger.info("Creating test consumer...")
        consumer = KafkaConsumer(
            bootstrap_servers=[bootstrap_servers],
            group_id='debug-group',
            auto_offset_reset='earliest',
            consumer_timeout_ms=3000
        )
        
        logger.info("Listing topics...")
        topics = consumer.topics()
        logger.info(f"Available topics: {topics}")
        
        consumer.close()
        logger.info("Kafka connection test successful")
        return True
    except Exception as e:
        logger.error(f"Kafka connection failed: {str(e)}")
        return False

def test_voter_topic():
    """Test the voters topic in Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    logger.info(f"Testing voter topic in Kafka at {bootstrap_servers}")
    
    try:
        logger.info("Creating voter consumer...")
        consumer = KafkaConsumer(
            'voters',
            bootstrap_servers=[bootstrap_servers],
            auto_offset_reset='earliest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            group_id='debug-voter-group',
            consumer_timeout_ms=3000  # 30 seconds timeout
        )
        
        logger.info("Attempting to poll messages from voter topic...")
        messages = consumer.poll(timeout_ms=30000, max_records=10)
        
        if messages:
            logger.info(f"Found {len(messages)} message partitions")
            for tp, msgs in messages.items():
                logger.info(f"Topic-partition {tp} has {len(msgs)} messages")
                for msg in msgs[:3]:  # Show up to 3 messages
                    logger.info(f"Sample message: {msg.value}")
        else:
            logger.warning("No messages found in voter topic")
        
        consumer.close()
        return True
    except Exception as e:
        logger.error(f"Voter topic test failed: {str(e)}")
        return False

def test_postgres_connection():
    """Test connection to PostgreSQL"""
    import psycopg2
    
    pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
    pg_port = os.environ.get('POSTGRES_PORT', '5432')
    pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
    pg_user = os.environ.get('POSTGRES_USER', 'postgres')
    pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
    
    logger.info(f"Testing PostgreSQL connection to {pg_host}:{pg_port}/{pg_db}")
    
    try:
        conn = psycopg2.connect(
            host=pg_host,
            port=pg_port,
            dbname=pg_db,
            user=pg_user,
            password=pg_password
        )
        
        cursor = conn.cursor()
        cursor.execute("SELECT 1")
        result = cursor.fetchone()
        
        logger.info(f"PostgreSQL connection test successful: {result}")
        
        # Check if tables exist
        cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'voters')")
        voters_table_exists = cursor.fetchone()[0]
        
        cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'votes')")
        votes_table_exists = cursor.fetchone()[0]
        
        logger.info(f"Tables exist check - voters: {voters_table_exists}, votes: {votes_table_exists}")
        
        cursor.close()
        conn.close()
        return True
    except Exception as e:
        logger.error(f"PostgreSQL connection failed: {str(e)}")
        return False

def test_dlt_installation():
    """Test if DLT is installed and working"""
    try:
        import dlt
        logger.info(f"DLT version: {dlt.__version__}")
        
        # Test basic pipeline creation
        # pipeline = dlt.pipeline(
        #     pipeline_name='debug_pipeline',
        #     destination='dummy'
        # )
        # pipeline = dlt.pipeline(
        #     pipeline_name="ny_taxi_pipeline",
        #     destination="duckdb",
        #     dataset_name="ny_taxi_data"
        # )
        destination_type, destination_config = get_destination_config()
        pipeline = dlt.pipeline(
            pipeline_name=pipeline_name,
            destination=postgres(destination_config),
            dataset_name='public',  # Use 'public' schema or whatever schema your tables are in
            dev_mode=True  # This forces DLT to recreate all internal state tables
        )

        info = pipeline.run(
                kafka_voters_source,
                table_name=table_name,
                write_disposition='append',
                #merge_key=None,  # Set this to your primary key if you want upsert behavior
                #if_exists='append'  # 'append' will add to existing tables, 'replace' would drop and recreate
            )
        logger.info("DLT installation test successful")
        return True
    except Exception as e:
        logger.error(f"DLT installation test failed: {str(e)}")
        return False

def main():
    """Run all tests"""
    logger.info("Starting debug tests...")
    
    # Print environment variables
    # logger.info("Environment variables:")
    # for key, value in os.environ.items():
    #     if 'PASSWORD' not in key:  # Don't log passwords
    #         logger.info(f"{key}={value}")
    
    # Test components
    kafka_ok = test_kafka_connection()
    voter_topic_ok = test_voter_topic() if kafka_ok else False
    postgres_ok = test_postgres_connection()
    dlt_ok = test_dlt_installation()
    
    logger.info("\n----- TEST RESULTS -----")
    logger.info(f"Kafka Connection: {'✅' if kafka_ok else '❌'}")
    logger.info(f"Voter Topic: {'✅' if voter_topic_ok else '❌'}")
    logger.info(f"PostgreSQL Connection: {'✅' if postgres_ok else '❌'}")
    logger.info(f"DLT Installation: {'✅' if dlt_ok else '❌'}")
    
    # Wait to keep the container running
    logger.info("Debug tests completed. Container will remain running for 10 minutes.")
    for i in range(10):
        time.sleep(60)
        logger.info(f"Debug container alive - {i+1}/10 minutes")

if __name__ == "__main__":
    main()

2025-04-13 22:06:24,260 - INFO - Starting debug tests...
2025-04-13 22:06:24,262 - INFO - Testing Kafka connection to localhost:29092
2025-04-13 22:06:24,263 - INFO - Creating test consumer...
2025-04-13 22:06:24,266 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: connecting to localhost:29092 [('::1', 29092, 0, 0) IPv6]
2025-04-13 22:06:24,267 - ERROR - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: Connect attempt returned error 111. Disconnecting.
2025-04-13 22:06:24,268 - ERROR - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
2025-04-13 22:06:24,269 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv4 (

2025-04-13 22:06:24,323 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: connecting to localhost:29092 [('::1', 29092, 0, 0) IPv6]
2025-04-13 22:06:24,323 - ERROR - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: Connect attempt returned error 111. Disconnecting.
2025-04-13 22:06:24,324 - ERROR - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
2025-04-13 22:06:24,325 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv4 ('127.0.0.1', 29092)]>: connecting to localhost:29092 [('127.0.0.1', 29092) IPv4]
2025-04-13 22:06:24,325 - ERROR - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=local

KeyboardInterrupt: 

In [4]:
#!/usr/bin/env python3
"""
Debug script to test DLT pipeline components one by one
"""
import os
import sys
import time
import json
import logging
import traceback
from kafka import KafkaConsumer
from dlt.destinations import postgres
from datetime import datetime
from uuid import uuid4

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('debug')

pipeline_name = 'voters'
table_name = 'voters'

def test_source(limit=5):
    for i in range(limit):
        yield {
            "voter_id": f"voter_{i}",
            "name": f"Test Voter {i}",
            "age": 25 + i,
            "gender": "female" if i % 2 == 0 else "male",
            "state": "TestState",
            "county": "TestCounty",
            "registration_date": datetime.utcnow().isoformat()
        }

def get_destination_config():
    """Get destination configuration based on storage preference"""
    storage_preference = os.environ.get('STORAGE_PREFERENCE', 'postgres').upper()
    
    if storage_preference == 'GCP':
        # Check if GCP credentials file path is provided
        gcp_creds_path = os.environ.get('GCP_CREDENTIALS_PATH')
        
        if gcp_creds_path and os.path.exists(gcp_creds_path):
            # Load GCP credentials from file
            with open(gcp_creds_path, 'r') as f:
                gcp_creds = json.load(f)
            return 'bigquery', gcp_creds
        else:
            # If credentials file is not provided, try to use application default credentials
            logger.info("GCP credentials file not found, using application default credentials")
            return 'bigquery', None
    else:
        # PostgreSQL connection
        pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
        pg_port = os.environ.get('POSTGRES_PORT', '5432')
        pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
        pg_user = os.environ.get('POSTGRES_USER', 'postgres')
        pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
        
        connection_string = f"postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_db}"
        return 'postgres', connection_string

def kafka_voters_source(limit=None):
    """Source function that reads voters from Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    topic = os.environ.get('KAFKA_VOTERS_TOPIC', 'voters')
    
    logger.info(f"Starting Kafka consumer for topic '{topic}' at {bootstrap_servers}")
    
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=[bootstrap_servers],
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id=f'dlt-voter-group-{uuid4()}', #'dlt-voter-group' #,
        #consumer_timeout_ms=3000  # 30 seconds timeout
    )
    
    count = 0
    for message in consumer:
        voter_data = message.value
        logger.info(f"Processing voter record: {voter_data}")
        yield voter_data
        
        count += 1
        if limit is not None and count >= limit:
            logger.info(f"Reached limit of {limit} records, stopping")
            break
    
    logger.info("Finished reading from Kafka, closing consumer")
    consumer.close()

def kafka_voters_source2(limit=None):
    """Source function that reads voters from Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    topic = os.environ.get('KAFKA_VOTERS_TOPIC', 'voters')
    
    logger.info(f"Starting Kafka consumer for topic '{topic}' at {bootstrap_servers}")
    
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=[bootstrap_servers],
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id=f'dlt-voter-group-test',
        consumer_timeout_ms=5000
    )

    logger.info("Polling messages (test mode)...")
    polled = consumer.poll(timeout_ms=5000, max_records=limit or 10)
    
    if polled:
        for tp, msgs in polled.items():
            for msg in msgs:
                logger.info(f"Polled voter record: {msg.value}")
                yield msg.value
    else:
        logger.warning("No messages were polled from Kafka.")
    
    consumer.close()

def kafka_voters_source3(limit=None):
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    topic = os.environ.get('KAFKA_VOTERS_TOPIC', 'voters')
    logger.info(f"Starting Kafka consumer for topic '{topic}' at {bootstrap_servers}")
    
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=[bootstrap_servers],
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id='dlt-voter-group',
        #consumer_timeout_ms=10000
        enable_auto_commit=True
    )

    count = 0
    while True:
        messages = consumer.poll(timeout_ms=10000, max_records=10)
        if not messages:
            break  # end iteration if nothing found (for one-off runs)

        for tp, msgs in messages.items():
            for message in msgs:
                yield message.value
                count += 1
                if limit is not None and count >= limit:
                    consumer.close()
                    return

    logger.info("Finished reading from Kafka, closing consumer")
    consumer.close()



def kafka_votes_source(limit=None):
    """Source function that reads votes from Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    topic = os.environ.get('KAFKA_VOTES_TOPIC', 'votes')
    
    logger.info(f"Starting Kafka consumer for topic '{topic}' at {bootstrap_servers}")
    
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=[bootstrap_servers],
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id= f'dlt-voter-group-{uuid4()}', #'dlt-votes-group',
        consumer_timeout_ms=30000  # 30 seconds timeout
    )
    
    count = 0
    for message in consumer:
        vote_data = message.value
        logger.info(f"Processing vote record: {vote_data}")
        yield vote_data
        
        count += 1
        if limit is not None and count >= limit:
            logger.info(f"Reached limit of {limit} records, stopping")
            break
    
    logger.info("Finished reading from Kafka, closing consumer")
    consumer.close()

def test_kafka_connection():
    """Test connection to Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    logger.info(f"Testing Kafka connection to {bootstrap_servers}")
    
    try:
        logger.info("Creating test consumer...")
        consumer = KafkaConsumer(
            bootstrap_servers=[bootstrap_servers],
            group_id='debug-group',
            auto_offset_reset='earliest',
            consumer_timeout_ms=3000
        )
        
        logger.info("Listing topics...")
        topics = consumer.topics()
        logger.info(f"Available topics: {topics}")
        
        consumer.close()
        logger.info("Kafka connection test successful")
        return True
    except Exception as e:
        logger.error(f"Kafka connection failed: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        return False

def test_voter_topic():
    """Test the voters topic in Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    topic = os.environ.get('KAFKA_VOTERS_TOPIC', 'voters')
    logger.info(f"Testing '{topic}' topic in Kafka at {bootstrap_servers}")
    
    try:
        logger.info(f"Creating consumer for topic '{topic}'...")
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=[bootstrap_servers],
            auto_offset_reset='earliest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            group_id='debug-voter-group',
            consumer_timeout_ms=10000  # 10 seconds timeout
        )
        
        logger.info(f"Attempting to poll messages from '{topic}' topic...")
        messages = consumer.poll(timeout_ms=10000, max_records=10)
        
        if messages:
            logger.info(f"Found {len(messages)} message partitions")
            for tp, msgs in messages.items():
                logger.info(f"Topic-partition {tp} has {len(msgs)} messages")
                for msg in msgs[:3]:  # Show up to 3 messages
                    logger.info(f"Sample message: {msg.value}")
        else:
            logger.warning(f"No messages found in '{topic}' topic")
        
        consumer.close()
        return True
    except Exception as e:
        logger.error(f"Topic '{topic}' test failed: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        return False

def test_postgres_connection():
    """Test connection to PostgreSQL"""
    import psycopg2
    
    pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
    pg_port = os.environ.get('POSTGRES_PORT', '5432')
    pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
    pg_user = os.environ.get('POSTGRES_USER', 'postgres')
    pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
    
    logger.info(f"Testing PostgreSQL connection to {pg_host}:{pg_port}/{pg_db}")
    
    try:
        conn = psycopg2.connect(
            host=pg_host,
            port=pg_port,
            dbname=pg_db,
            user=pg_user,
            password=pg_password
        )
        
        cursor = conn.cursor()
        cursor.execute("SELECT 1")
        result = cursor.fetchone()
        
        logger.info(f"PostgreSQL connection test successful: {result}")
        
        # Check if tables exist
        cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'voters')")
        voters_table_exists = cursor.fetchone()[0]
        
        cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'votes')")
        votes_table_exists = cursor.fetchone()[0]
        
        logger.info(f"Tables exist check - voters: {voters_table_exists}, votes: {votes_table_exists}")
        
        # If voters table exists, inspect its schema
        if voters_table_exists:
            inspect_postgres_table(conn, 'voters')
        
        # If votes table exists, inspect its schema
        if votes_table_exists:
            inspect_postgres_table(conn, 'votes')
        
        cursor.close()
        conn.close()
        return True
    except Exception as e:
        logger.error(f"PostgreSQL connection failed: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        return False

def inspect_postgres_table(conn, table_name):
    """Inspect the structure of a table in PostgreSQL"""
    try:
        cursor = conn.cursor()
        cursor.execute(f"""
            SELECT column_name, data_type 
            FROM information_schema.columns 
            WHERE table_name = '{table_name}'
            ORDER BY ordinal_position
        """)
        columns = cursor.fetchall()
        
        logger.info(f"PostgreSQL '{table_name}' table schema:")
        for column in columns:
            logger.info(f"Column: {column[0]}, Type: {column[1]}")
            
        # Get row count
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        row_count = cursor.fetchone()[0]
        logger.info(f"Table '{table_name}' has {row_count} rows")
        
        # Get sample data if table has rows
        if row_count > 0:
            cursor.execute(f"SELECT * FROM {table_name} LIMIT 3")
            sample_rows = cursor.fetchall()
            logger.info(f"Sample data from '{table_name}':")
            for i, row in enumerate(sample_rows):
                logger.info(f"Row {i+1}: {row}")
        
        cursor.close()
    except Exception as e:
        logger.error(f"Error inspecting PostgreSQL table '{table_name}': {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")

def test_dlt_installation():
    """Test if DLT is installed and working"""
    try:
        import dlt
        logger.info(f"DLT version: {dlt.__version__}")
        
        # Test basic pipeline creation
        destination_type, destination_config = get_destination_config()
        logger.info(f"Creating DLT pipeline with destination type: {destination_type}")
        
        try:
            # Initialize pipeline
            pipeline = dlt.pipeline(
                pipeline_name=pipeline_name,
                destination=postgres(destination_config),
                dataset_name='public',  # Use 'public' schema or whatever schema your tables are in
               # dev_mode=True  # This forces DLT to recreate all internal state tables
            )
            logger.info("Pipeline initialized successfully")
            
            # Load a limited number of records for testing
            logger.info("Running pipeline with limited data (5 records max)")
            try:
                # logger.info(f"test if we have data")
                # for msg in kafka_voters_source(3):
                #     logger.info(f"Sample Kafka record to dlt: {msg}")
                #     break

                info = pipeline.run(
                    kafka_voters_source(limit=1000),
                    #kafka_voters_source3(limit=None),
                    #test_source(limit=5),
                    table_name='voters',
                    write_disposition='append'
                )
                logger.info(f"Pipeline run completed successfully. Load info: {info}")
                #logger.info(f"Loaded {info.load_packages.normalized_rows_count} rows")
                
                # Check table after loading
                if destination_type == 'postgres':
                    import psycopg2
                    pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
                    pg_port = os.environ.get('POSTGRES_PORT', '5432')
                    pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
                    pg_user = os.environ.get('POSTGRES_USER', 'postgres')
                    pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
                    
                    try:
                        conn = psycopg2.connect(
                            host=pg_host,
                            port=pg_port,
                            dbname=pg_db,
                            user=pg_user,
                            password=pg_password
                        )
                        logger.info("Checking table after DLT load")
                        inspect_postgres_table(conn, table_name)
                        conn.close()
                    except Exception as e:
                        logger.error(f"Failed to check table after load: {str(e)}")
                
                return True
            except Exception as e:
                logger.error(f"Pipeline run failed: {str(e)}")
                logger.error(f"Traceback: {traceback.format_exc()}")
                return False
        except Exception as e:
            logger.error(f"Pipeline creation failed: {str(e)}")
            logger.error(f"Traceback: {traceback.format_exc()}")
            return False
    except Exception as e:
        logger.error(f"DLT installation test failed: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        return False

def main():
    """Run all tests"""
    logger.info("Starting debug tests...")
    
    # Print environment variables
    # logger.info("Environment variables:")
    # for key, value in os.environ.items():
    #     if 'PASSWORD' not in key.upper():  # Don't log passwords
    #         logger.info(f"{key}={value}")
    
    # Test components
    kafka_ok = test_kafka_connection()
    voter_topic_ok = test_voter_topic() if kafka_ok else False
    postgres_ok = test_postgres_connection()
    dlt_ok = test_dlt_installation() if kafka_ok and postgres_ok else False
    
    logger.info("\n----- TEST RESULTS -----")
    logger.info(f"Kafka Connection: {'✅' if kafka_ok else '❌'}")
    logger.info(f"Voter Topic: {'✅' if voter_topic_ok else '❌'}")
    logger.info(f"PostgreSQL Connection: {'✅' if postgres_ok else '❌'}")
    logger.info(f"DLT Installation: {'✅' if dlt_ok else '❌'}")
    
    if not dlt_ok:
        logger.info("\n----- TROUBLESHOOTING RECOMMENDATIONS -----")
        if not kafka_ok:
            logger.info("- Fix Kafka connection issues before proceeding")
        elif not voter_topic_ok:
            logger.info("- Ensure the 'voters' topic exists and has data")
        elif not postgres_ok:
            logger.info("- Fix PostgreSQL connection issues before proceeding")
        else:
            logger.info("- Check DLT pipeline configuration")
            logger.info("- Verify data format compatibility between Kafka and PostgreSQL")
            logger.info("- Examine error logs for specific DLT errors")
    
    # Wait to keep the container running
    logger.info("Debug tests completed. Container will remain running for 10 minutes.")
    for i in range(10):
        time.sleep(60)
        logger.info(f"Debug container alive - {i+1}/10 minutes")

if __name__ == "__main__":
    main()

2025-04-13 22:06:56,566 - INFO - Starting debug tests...
2025-04-13 22:06:56,567 - INFO - Testing Kafka connection to localhost:29092


2025-04-13 22:06:56,568 - INFO - Creating test consumer...
2025-04-13 22:06:56,571 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: connecting to localhost:29092 [('::1', 29092, 0, 0) IPv6]
2025-04-13 22:06:56,572 - ERROR - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: Connect attempt returned error 111. Disconnecting.
2025-04-13 22:06:56,572 - ERROR - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
2025-04-13 22:06:56,573 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv4 ('127.0.0.1', 29092)]>: connecting to localhost:29092 [('127.0.0.1', 29092) IPv4]
2025-04-13 22:06:56,574 - ERROR - <BrokerConnection c

KeyboardInterrupt: 

### Final code

In [None]:
#!/usr/bin/env python3
"""
Debug script to test DLT pipeline components one by one
"""
import os
import sys
import time
import json
import logging
import traceback
from kafka import KafkaConsumer
from dlt.destinations import postgres
from datetime import datetime
from uuid import uuid4

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('debug')

pipeline_name = 'voters'
table_name = 'voters'

def get_destination_config():
    """Get destination configuration based on storage preference"""
    storage_preference = os.environ.get('STORAGE_PREFERENCE', 'postgres').upper()
    
    if storage_preference == 'GCP':
        # Check if GCP credentials file path is provided
        gcp_creds_path = os.environ.get('GCP_CREDENTIALS_PATH')
        
        if gcp_creds_path and os.path.exists(gcp_creds_path):
            # Load GCP credentials from file
            with open(gcp_creds_path, 'r') as f:
                gcp_creds = json.load(f)
            return 'bigquery', gcp_creds
        else:
            # If credentials file is not provided, try to use application default credentials
            logger.info("GCP credentials file not found, using application default credentials")
            return 'bigquery', None
    else:
        # PostgreSQL connection
        pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
        pg_port = os.environ.get('POSTGRES_PORT', '5432')
        pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
        pg_user = os.environ.get('POSTGRES_USER', 'postgres')
        pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
        
        connection_string = f"postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_db}"
        return 'postgres', connection_string

def kafka_voters_source(limit=None):
    """Source function that reads voters from Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    topic = os.environ.get('KAFKA_VOTERS_TOPIC', 'voters')
    
    logger.info(f"Starting Kafka consumer for topic '{topic}' at {bootstrap_servers}")
    
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=[bootstrap_servers],
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id=f'dlt-voter-group-{uuid4()}', #'dlt-voter-group' #,
        consumer_timeout_ms=3000  # 30 seconds timeout
    )
    
    count = 0
    for message in consumer:
        voter_data = message.value
        logger.info(f"Processing voter record: {voter_data}")
        yield voter_data
        
        count += 1
        if limit is not None and count >= limit:
            logger.info(f"Reached limit of {limit} records, stopping")
            break
    
    logger.info("Finished reading from Kafka, closing consumer")
    consumer.close()

def kafka_votes_source(limit=None):
    """Source function that reads votes from Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    topic = os.environ.get('KAFKA_VOTES_TOPIC', 'votes')
    
    logger.info(f"Starting Kafka consumer for topic '{topic}' at {bootstrap_servers}")
    
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=[bootstrap_servers],
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id= f'dlt-voter-group-{uuid4()}', #'dlt-votes-group',
        consumer_timeout_ms=30000  # 30 seconds timeout
    )
    
    count = 0
    for message in consumer:
        vote_data = message.value
        logger.info(f"Processing vote record: {vote_data}")
        yield vote_data
        
        count += 1
        if limit is not None and count >= limit:
            logger.info(f"Reached limit of {limit} records, stopping")
            break
    
    logger.info("Finished reading from Kafka, closing consumer")
    consumer.close()

def test_kafka_connection():
    """Test connection to Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    logger.info(f"Testing Kafka connection to {bootstrap_servers}")
    
    try:
        logger.info("Creating test consumer...")
        consumer = KafkaConsumer(
            bootstrap_servers=[bootstrap_servers],
            group_id='debug-group',
            auto_offset_reset='earliest',
            consumer_timeout_ms=3000
        )
        
        logger.info("Listing topics...")
        topics = consumer.topics()
        logger.info(f"Available topics: {topics}")
        
        consumer.close()
        logger.info("Kafka connection test successful")
        return True
    except Exception as e:
        logger.error(f"Kafka connection failed: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        return False

def test_voter_topic():
    """Test the voters topic in Kafka"""
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    topic = os.environ.get('KAFKA_VOTERS_TOPIC', 'voters')
    logger.info(f"Testing '{topic}' topic in Kafka at {bootstrap_servers}")
    
    try:
        logger.info(f"Creating consumer for topic '{topic}'...")
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=[bootstrap_servers],
            auto_offset_reset='earliest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            group_id='debug-voter-group',
            consumer_timeout_ms=10000  # 10 seconds timeout
        )
        
        logger.info(f"Attempting to poll messages from '{topic}' topic...")
        messages = consumer.poll(timeout_ms=10000, max_records=10)
        
        if messages:
            logger.info(f"Found {len(messages)} message partitions")
            for tp, msgs in messages.items():
                logger.info(f"Topic-partition {tp} has {len(msgs)} messages")
                for msg in msgs[:3]:  # Show up to 3 messages
                    logger.info(f"Sample message: {msg.value}")
        else:
            logger.warning(f"No messages found in '{topic}' topic")
        
        consumer.close()
        return True
    except Exception as e:
        logger.error(f"Topic '{topic}' test failed: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        return False

def test_postgres_connection():
    """Test connection to PostgreSQL"""
    import psycopg2
    
    pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
    pg_port = os.environ.get('POSTGRES_PORT', '5432')
    pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
    pg_user = os.environ.get('POSTGRES_USER', 'postgres')
    pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
    
    logger.info(f"Testing PostgreSQL connection to {pg_host}:{pg_port}/{pg_db}")
    
    try:
        conn = psycopg2.connect(
            host=pg_host,
            port=pg_port,
            dbname=pg_db,
            user=pg_user,
            password=pg_password
        )
        
        cursor = conn.cursor()
        cursor.execute("SELECT 1")
        result = cursor.fetchone()
        
        logger.info(f"PostgreSQL connection test successful: {result}")
        
        # Check if tables exist
        cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'voters')")
        voters_table_exists = cursor.fetchone()[0]
        
        cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'votes')")
        votes_table_exists = cursor.fetchone()[0]
        
        logger.info(f"Tables exist check - voters: {voters_table_exists}, votes: {votes_table_exists}")
        
        # If voters table exists, inspect its schema
        if voters_table_exists:
            inspect_postgres_table(conn, 'voters')
        
        # If votes table exists, inspect its schema
        if votes_table_exists:
            inspect_postgres_table(conn, 'votes')
        
        cursor.close()
        conn.close()
        return True
    except Exception as e:
        logger.error(f"PostgreSQL connection failed: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        return False

def inspect_postgres_table(conn, table_name):
    """Inspect the structure of a table in PostgreSQL"""
    try:
        cursor = conn.cursor()
        cursor.execute(f"""
            SELECT column_name, data_type 
            FROM information_schema.columns 
            WHERE table_name = '{table_name}'
            ORDER BY ordinal_position
        """)
        columns = cursor.fetchall()
        
        logger.info(f"PostgreSQL '{table_name}' table schema:")
        for column in columns:
            logger.info(f"Column: {column[0]}, Type: {column[1]}")
            
        # Get row count
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        row_count = cursor.fetchone()[0]
        logger.info(f"Table '{table_name}' has {row_count} rows")
        
        # Get sample data if table has rows
        if row_count > 0:
            cursor.execute(f"SELECT * FROM {table_name} LIMIT 3")
            sample_rows = cursor.fetchall()
            logger.info(f"Sample data from '{table_name}':")
            for i, row in enumerate(sample_rows):
                logger.info(f"Row {i+1}: {row}")
        
        cursor.close()
    except Exception as e:
        logger.error(f"Error inspecting PostgreSQL table '{table_name}': {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")

def test_dlt_installation():
    """Test if DLT is installed and working"""
    try:
        import dlt
        logger.info(f"DLT version: {dlt.__version__}")
        
        # Test basic pipeline creation
        destination_type, destination_config = get_destination_config()
        logger.info(f"Creating DLT pipeline with destination type: {destination_type}")
        
        try:
            # Initialize pipeline
            pipeline = dlt.pipeline(
                pipeline_name=pipeline_name,
                destination=postgres(destination_config),
                dataset_name='public',  # Use 'public' schema or whatever schema your tables are in
               # dev_mode=True  # This forces DLT to recreate all internal state tables
            )
            logger.info("Pipeline initialized successfully")
            
            # Load a limited number of records for testing
            logger.info("Running pipeline with limited data (5 records max)")
            try:
                info = pipeline.run(
                    kafka_voters_source(limit=1000),
                    table_name='voters',
                    write_disposition='append'
                )
                logger.info(f"Pipeline run completed successfully. Load info: {info}")
                #logger.info(f"Loaded {info.load_packages.normalized_rows_count} rows")
                
                # Check table after loading
                if destination_type == 'postgres':
                    import psycopg2
                    pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
                    pg_port = os.environ.get('POSTGRES_PORT', '5432')
                    pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
                    pg_user = os.environ.get('POSTGRES_USER', 'postgres')
                    pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
                    
                    try:
                        conn = psycopg2.connect(
                            host=pg_host,
                            port=pg_port,
                            dbname=pg_db,
                            user=pg_user,
                            password=pg_password
                        )
                        logger.info("Checking table after DLT load")
                        inspect_postgres_table(conn, table_name)
                        conn.close()
                    except Exception as e:
                        logger.error(f"Failed to check table after load: {str(e)}")
                
                return True
            except Exception as e:
                logger.error(f"Pipeline run failed: {str(e)}")
                logger.error(f"Traceback: {traceback.format_exc()}")
                return False
        except Exception as e:
            logger.error(f"Pipeline creation failed: {str(e)}")
            logger.error(f"Traceback: {traceback.format_exc()}")
            return False
    except Exception as e:
        logger.error(f"DLT installation test failed: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        return False

def main():
    """Run all tests"""
    logger.info("Starting debug tests...")
    
    # Print environment variables
    # logger.info("Environment variables:")
    # for key, value in os.environ.items():
    #     if 'PASSWORD' not in key.upper():  # Don't log passwords
    #         logger.info(f"{key}={value}")
    
    # Test components
    kafka_ok = test_kafka_connection()
    voter_topic_ok = test_voter_topic() if kafka_ok else False
    postgres_ok = test_postgres_connection()
    dlt_ok = test_dlt_installation() if kafka_ok and postgres_ok else False
    
    logger.info("\n----- TEST RESULTS -----")
    logger.info(f"Kafka Connection: {'✅' if kafka_ok else '❌'}")
    logger.info(f"Voter Topic: {'✅' if voter_topic_ok else '❌'}")
    logger.info(f"PostgreSQL Connection: {'✅' if postgres_ok else '❌'}")
    logger.info(f"DLT Installation: {'✅' if dlt_ok else '❌'}")
    
    if not dlt_ok:
        logger.info("\n----- TROUBLESHOOTING RECOMMENDATIONS -----")
        if not kafka_ok:
            logger.info("- Fix Kafka connection issues before proceeding")
        elif not voter_topic_ok:
            logger.info("- Ensure the 'voters' topic exists and has data")
        elif not postgres_ok:
            logger.info("- Fix PostgreSQL connection issues before proceeding")
        else:
            logger.info("- Check DLT pipeline configuration")
            logger.info("- Verify data format compatibility between Kafka and PostgreSQL")
            logger.info("- Examine error logs for specific DLT errors")
    
    # Wait to keep the container running
    logger.info("Debug tests completed. Container will remain running for 10 minutes.")
    for i in range(10):
        time.sleep(60)
        logger.info(f"Debug container alive - {i+1}/10 minutes")

if __name__ == "__main__":
    main()

In [1]:
pwd

'/workspaces/Realtime-Voting/Debug'

In [None]:
#!/usr/bin/env python3
"""
Real-time DLT pipeline for continuous data ingestion from Kafka to Postgres/BigQuery
"""
import os
import sys
import time
import json
import logging
import signal
import traceback
from kafka import KafkaConsumer, TopicPartition
from dlt.destinations import postgres, bigquery
from datetime import datetime
from uuid import uuid4

#os.environ['STORAGE_PREFERENCE'] = 'GCP'
#os.environ['GCP_CREDENTIALS_PATH'] = 
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('dlt-pipeline')

# Global flag for graceful shutdown
running = True

def signal_handler(sig, frame):
    """Handle shutdown signals"""
    global running
    logger.info("Shutdown signal received, stopping pipeline...")
    running = False

# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

def get_destination_config():
    """Get destination configuration based on storage preference"""
    storage_preference = os.environ.get('STORAGE_PREFERENCE', 'postgres').upper()
    
    if storage_preference == 'GCP':
        # Check if GCP credentials file path is provided
        gcp_creds_path = os.environ.get('GCP_CREDENTIALS_PATH')
        
        if gcp_creds_path and os.path.exists(gcp_creds_path):
            # Load GCP credentials from file
            with open(gcp_creds_path, 'r') as f:
                gcp_creds = json.load(f)
            return 'bigquery', gcp_creds
        else:
            # If credentials file is not provided, try to use application default credentials
            logger.info("GCP credentials file not found, using application default credentials")
            return 'bigquery', None
    else:
        # PostgreSQL connection
        pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
        pg_port = os.environ.get('POSTGRES_PORT', '5432')
        pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
        pg_user = os.environ.get('POSTGRES_USER', 'postgres')
        pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
        
        connection_string = f"postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_db}"
        return 'postgres', connection_string

def create_pipeline(pipeline_name, destination_type, destination_config):
    """Create and configure DLT pipeline"""
    import dlt
    
    # Set the appropriate destination
    if destination_type == 'bigquery':
        dest = bigquery(destination_config)
    else:
        dest = postgres(destination_config)
    
    # Create pipeline with incremental loading
    pipeline = dlt.pipeline(
        pipeline_name=pipeline_name,
        destination=dest,
        dataset_name='public',
    )
    
    return pipeline

def continuous_ingest(pipeline_name, table_name, group_id_prefix):
    """Continuously ingest data from Kafka to the destination"""
    global running
    
    # Get destination configuration
    destination_type, destination_config = get_destination_config()
    logger.info(f"Setting up continuous ingestion with destination: {destination_type}")
    
    # Create DLT pipeline
    import dlt
    pipeline = create_pipeline(pipeline_name, destination_type, destination_config)
    
    # Kafka configuration
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    topic_type = "VOTES" if table_name.lower() == "votes" else "VOTERS"
    topic = os.environ.get(f'KAFKA_{topic_type}_TOPIC', topic_type.lower())
    #topic = os.environ.get(f'KAFKA_{"VOTES" if "vote" in table_name.lower() else "VOTERS"}_TOPIC', 
    #                      'votes' if 'vote' in table_name.lower() else 'voters')
    
    # Use a consistent group_id for offset tracking between restarts
    # But allow multiple instances with different IDs if needed
    instance_id = os.environ.get('INSTANCE_ID', '')
    group_id = f"{group_id_prefix}-{instance_id}" if instance_id else group_id_prefix
    
    logger.info(f"Starting Kafka consumer for topic '{topic}' with group '{group_id}'")
    
    # Create consumer without timeout to enable continuous processing
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=[bootstrap_servers],
        auto_offset_reset='earliest',  # Start from earliest unprocessed message
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id=group_id,
        enable_auto_commit=True,  # Automatically commit offsets
        auto_commit_interval_ms=15000,  # Commit every 15 seconds
    )
    
    # Batch size configuration
    batch_size = int(os.environ.get('BATCH_SIZE', '10')) #from 100
    max_batch_interval_seconds = int(os.environ.get('MAX_BATCH_INTERVAL_SECONDS', '100')) #30
    
    logger.info(f"Batch configuration: size={batch_size}, interval={max_batch_interval_seconds}s")
    
    batch_buffer = []
    last_flush_time = time.time()
    
    # Main ingestion loop
    try:
        while running:
            # Poll for messages with a timeout to avoid blocking indefinitely
            messages = consumer.poll(timeout_ms=1000, max_records=batch_size)
            
            current_time = time.time()
            time_since_last_flush = current_time - last_flush_time
            
            if messages:
                # Process received messages
                for tp, msgs in messages.items():
                    for msg in msgs:
                        # Add message to the batch buffer
                        batch_buffer.append(msg.value)
                        
                        # If batch is full, process it
                        if len(batch_buffer) >= batch_size:
                            process_batch(pipeline, batch_buffer, table_name)
                            batch_buffer = []
                            last_flush_time = time.time()
            
            # Also flush if max time has passed since last flush
            if batch_buffer and time_since_last_flush >= max_batch_interval_seconds:
                logger.info(f"Time-based flush after {time_since_last_flush:.1f}s with {len(batch_buffer)} records")
                process_batch(pipeline, batch_buffer, table_name)
                batch_buffer = []
                last_flush_time = time.time()
                
            # Small sleep to prevent CPU spinning when idle
            if not messages:
                time.sleep(0.1)
                
    except Exception as e:
        logger.error(f"Error in continuous ingestion: {str(e)}")
        logger.error(traceback.format_exc())
    finally:
        # Process any remaining records in the buffer
        if batch_buffer:
            logger.info(f"Processing {len(batch_buffer)} remaining records before shutdown")
            process_batch(pipeline, batch_buffer, table_name)
            
        # Clean up resources
        logger.info("Closing Kafka consumer")
        consumer.close()

def process_batch(pipeline, batch, table_name):
    """Process a batch of records with DLT"""
    if not batch:
        return
    
    logger.info(f"Processing batch of {len(batch)} records for table '{table_name}'")
    
    try:
        # Run the pipeline with the batch data
        info = pipeline.run(
            batch,
            table_name=table_name,
            write_disposition='append'
        )
        
        logger.info(f"Successfully loaded batch. Load info: {info}")
        
        # Check for errors
        if hasattr(info, 'load_packages') and info.load_packages:
            if hasattr(info.load_packages, 'failed_rows_count') and info.load_packages.failed_rows_count > 0:
                logger.warning(f"Failed to load {info.load_packages.failed_rows_count} rows")
    except Exception as e:
        logger.error(f"Error processing batch: {str(e)}")
        logger.error(traceback.format_exc())

def test_components():
    """Test basic components before starting continuous ingestion"""
    # Test Kafka connection
    bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
    logger.info(f"Testing Kafka connection to {bootstrap_servers}")
    
    try:
        consumer = KafkaConsumer(
            bootstrap_servers=[bootstrap_servers],
            group_id='test-group',
            consumer_timeout_ms=5000
        )
        
        topics = consumer.topics()
        logger.info(f"Available Kafka topics: {topics}")
        consumer.close()
        
        # Test database connection
        destination_type, destination_config = get_destination_config()
        if destination_type == 'postgres':
            import psycopg2
            pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
            pg_port = os.environ.get('POSTGRES_PORT', '5432')
            pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
            pg_user = os.environ.get('POSTGRES_USER', 'postgres')
            pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
            
            conn = psycopg2.connect(
                host=pg_host,
                port=pg_port,
                dbname=pg_db,
                user=pg_user,
                password=pg_password
            )
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            cursor.close()
            conn.close()
            logger.info("PostgreSQL connection test successful")
        
        # Test DLT installation
        import dlt
        logger.info(f"DLT version: {dlt.__version__}")
        
        return True
    except Exception as e:
        logger.error(f"Component tests failed: {str(e)}")
        logger.error(traceback.format_exc())
        return False

def main():
    """Main entry point for the continuous ingestion service"""
    logger.info("Starting real-time DLT ingestion service")
    
    # Test components first
    if not test_components():
        logger.error("Component tests failed. Exiting.")
        return
    
    # Get configuration for which data to ingest
    pipeline_name = os.environ.get('PIPELINE_NAME', 'voting_data')
    ingest_voters = os.environ.get('INGEST_VOTERS', 'true').lower() == 'true'
    ingest_votes = os.environ.get('INGEST_VOTES', 'true').lower() == 'true'
    
    # Start ingestion processes
    if ingest_voters:
        import threading
        pipeline_name = 'voters'
        voters_thread = threading.Thread(
            target=continuous_ingest,
            args=('voters_pipeline', 'voters', 'dlt-voters-group'),
            daemon=True
        )
        voters_thread.start()
        logger.info("Started voters ingestion thread")
    
    if ingest_votes:
        import threading
        pipeline_name = 'votes'
        votes_thread = threading.Thread(
            target=continuous_ingest,
            args=('votes_pipeline', 'votes', 'dlt-votes-group'),
            daemon=True
        )
        votes_thread.start()
        logger.info("Started votes ingestion thread")
    
    # Keep main thread alive to allow the ingestion to continue
    logger.info("Ingestion service is running. Press Ctrl+C to stop.")
    try:
        while running:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Keyboard interrupt received, stopping service...")
    
    # Wait for a clean shutdown
    logger.info("Waiting for ingestion to complete (max 30 seconds)...")
    shutdown_start = time.time()
    while time.time() - shutdown_start < 30 and any(t.is_alive() for t in threading.enumerate() if t != threading.current_thread()):
        time.sleep(1)
    
    logger.info("DLT ingestion service stopped")

if __name__ == "__main__":
    main()

2025-04-13 18:33:40,252 - INFO - Starting real-time DLT ingestion service
2025-04-13 18:33:40,253 - INFO - Testing Kafka connection to localhost:29092
2025-04-13 18:33:40,257 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: connecting to localhost:29092 [('::1', 29092, 0, 0) IPv6]
2025-04-13 18:33:40,261 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <checking_api_versions_recv> [IPv6 ('::1', 29092, 0, 0)]>: Broker version identified as 2.6
2025-04-13 18:33:40,262 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Connection complete.
2025-04-13 18:33:40,268 - INFO - Available Kafka topics: {'voters', 'votes'}
2025-04-13 18:33:40,269 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connected> [IPv6 ('::1', 29

In [None]:
data-generator:
    build:
      context: .
      dockerfile: docker/Dockerfile.generator
    volumes:
      - ./data_generator:/app/data_generator
    environment:
      - VOTERS_COUNT=1000
      - VOTES_PER_MINUTE=100
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
    networks:
      - voting-network
    depends_on:
      - kafka
    command: python -m data_generator.voter_gen2

  vote-simulator:
    build:
      context: .
      dockerfile: docker/Dockerfile.generator
    volumes:
      - ./data_generator:/app/data_generator
    environment:
      - VOTES_PER_MINUTE=100
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
    networks:
      - voting-network
    depends_on:
      - kafka
      - data-generator
    #command: python -m data_generator.vote_sim2
    command: python -m data_generator.real_vote_simulator
    #command: python -m data_generator.real_vote_sim2

  # Kafka infrastructure
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      KAFKA_HEAP_OPTS: "-Xmx512M -Xms256M"
    networks:
      - voting-network
    healthcheck:
      test: ["CMD", "nc", "-z", "localhost", "2181"]
      interval: 5s
      timeout: 10s
      retries: 5

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CREATE_TOPICS: "votes:4:1,voters:1:1"
    networks:
      - voting-network
    depends_on:
      - zookeeper

  # DLT pipeline for data ingestion
  dlt-pipeline:
    build:
      context: .
      dockerfile: docker/Dockerfile.ingestion
    volumes:
      - ./ingestion:/app/ingestion
      - ~/.dlt:/root/.dlt
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
      - STORAGE_PREFERENCE=GCP  # or GCP
      - POSTGRES_HOST=postgres
      - POSTGRES_PORT=5432
      - POSTGRES_DB=voting_db
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      # Add for GCP if needed
      - GCP_CREDENTIALS_PATH=/workspaces/Realtime-Voting/dprof-dezoomfinal-b4d188529d18.json
    networks:
      - voting-network
    depends_on:
      - kafka
      - postgres
    #command: python -m ingestion.dlt_pipeline.voter_pipeline2
    #command: python ingestion/dlt_pipeline/voter_pipeline.py
    command: python ingestion/dlt_pipeline/real_dlt.py
postgres:
    image: postgres:14
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_MULTIPLE_DATABASES: "voting_db,kestra"
      POSTGRES_KESTRA_USER: kestra
      POSTGRES_KESTRA_PASSWORD: k3str4
    volumes:
      - postgres-data:/var/lib/postgresql/data
      - ./docker/postgres-init.sh:/docker-entrypoint-initdb.d/postgres-init.sh
    networks:
      - voting-network

  # pgweb - PostgreSQL web interface
  pgweb:
    image: sosedoff/pgweb
    ports:
      - "8085:8081"
    environment:
      - DATABASE_URL=postgres://postgres:postgres@postgres:5432/voting_db?sslmode=disable
    networks:
      - voting-network
    depends_on:
      - postgres
    restart: unless-stopped

networks:
  voting-network:
    driver: bridge


In [19]:
from sqlalchemy import create_engine

In [3]:
engine = create_engine('postgresql://postgres:postgres@localhost:5432/voting_db')

In [4]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x704bcd5a0b90>

In [5]:
#os.environ.get('KAFKA_BOOTSTRAP_SERVERS', '...')

In [11]:
bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:29092')
logger.info("Creating test consumer...")
consumer = KafkaConsumer(
    bootstrap_servers=[bootstrap_servers],
    group_id='debug-group',
    auto_offset_reset='earliest',
    consumer_timeout_ms=10000
)

logger.info("Listing topics...")
topics = consumer.topics()
logger.info(f"Available topics: {topics}")

consumer.close()
logger.info("Kafka connection test successful")

2025-04-12 19:33:16,371 - INFO - Creating test consumer...
2025-04-12 19:33:16,374 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: connecting to localhost:29092 [('::1', 29092, 0, 0) IPv6]
2025-04-12 19:33:16,378 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <checking_api_versions_recv> [IPv6 ('::1', 29092, 0, 0)]>: Broker version identified as 2.6
2025-04-12 19:33:16,379 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Connection complete.
2025-04-12 19:33:16,380 - INFO - Listing topics...
2025-04-12 19:33:16,384 - INFO - Available topics: {'voters', 'votes'}
2025-04-12 19:33:16,385 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. 
2025-04-12 1

In [1]:
import dlt

In [2]:
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

In [20]:
import duckdb

# A database '.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()
     

CatalogException: Catalog Error: SET search_path: No catalog + schema named "ny_taxi_data" found.

In [4]:
import dlt
#from dlt.sources.helpers import requests_get
import pandas as pd

# Create the pipeline
# pipeline = dlt.pipeline(
#     pipeline_name="ny_taxi_pipeline",
#     destination="duckdb",
#     dataset_name="ny_taxi_data"
# )

# Let's get a small sample of NYC taxi data
def get_taxi_data():
    url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
    # Limit to a small sample for demonstration
    df = pd.read_parquet(url, engine="pyarrow")
    return df.head(1000).to_dict(orient="records")

# Run the pipeline to load the data
load_info = pipeline.run(get_taxi_data(), table_name="yellow_taxi_trips")

# Print the load information
print(f"Loaded {load_info.load_package.count} rows to table {load_info.load_package.table_name}")

AttributeError: 'LoadInfo' object has no attribute 'load_package'

In [6]:
import duckdb

# Connect to the DuckDB database created by DLT
# By default, it will be in .dlt/duckdb/ny_taxi_data.duckdb
conn = duckdb.connect('/workspaces/Realtime-Voting/Debug/ny_taxi_pipeline.duckdb')

# View the available tables
print("Available tables:")
print(conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='ny_taxi_data'").fetchall())

# Query the data
print("\nSample data:")
result = conn.execute("""
    SELECT * FROM ny_taxi_data.yellow_taxi_trips LIMIT 5
""").fetchall()

# Get column names
columns = conn.execute("""
    SELECT column_name 
    FROM information_schema.columns 
    WHERE table_schema='ny_taxi_data' 
    AND table_name='yellow_taxi_trips'
""").fetchall()
column_names = [col[0] for col in columns]

# Create a DataFrame to display the results nicely
result_df = pd.DataFrame(result, columns=column_names)
print(result_df)

# Run a simple aggregation query
print("\nAverage trip distance by payment type:")
agg_result = conn.execute("""
    SELECT 
        payment_type, 
        AVG(trip_distance) as avg_distance,
        COUNT(*) as trip_count
    FROM ny_taxi_data.yellow_taxi_trips
    GROUP BY payment_type
    ORDER BY avg_distance DESC
""").fetchdf()
print(agg_result)

# Close the connection
conn.close()

Available tables:
[('yellow_taxi_trips',), ('_dlt_loads',), ('_dlt_pipeline_state',), ('_dlt_version',)]

Sample data:
   vendor_id      tpep_pickup_datetime     tpep_dropoff_datetime  \
0          2 2023-01-01 00:32:10+00:00 2023-01-01 00:40:36+00:00   
1          2 2023-01-01 00:55:08+00:00 2023-01-01 01:01:27+00:00   
2          2 2023-01-01 00:25:04+00:00 2023-01-01 00:37:49+00:00   
3          1 2023-01-01 00:03:48+00:00 2023-01-01 00:13:25+00:00   
4          2 2023-01-01 00:10:29+00:00 2023-01-01 00:21:19+00:00   

   passenger_count  trip_distance  ratecode_id store_and_fwd_flag  \
0              1.0           0.97          1.0                  N   
1              1.0           1.10          1.0                  N   
2              1.0           2.51          1.0                  N   
3              0.0           1.90          1.0                  N   
4              1.0           1.43          1.0                  N   

   pu_location_id  do_location_id  payment_type  ...  ext

In [8]:
from kafka import KafkaConsumer
import json
import time
import os
import dlt

def kafka_voters_source():
    """Source function that reads voters from Kafka"""
    consumer = KafkaConsumer(
        'voters',
        bootstrap_servers=['localhost:29092'],
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id='dlt-voter-group'
    )
    
    for message in consumer:
        yield message.value

def kafka_votes_source():
    """Source function that reads votes from Kafka"""
    consumer = KafkaConsumer(
        'votes',
        bootstrap_servers=['localhost:29092'],
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        group_id='dlt-votes-group'
    )
    
    for message in consumer:
        yield message.value

In [9]:
def get_destination_config():
    """Get destination configuration based on storage preference"""
    storage_preference = os.environ.get('STORAGE_PREFERENCE', 'postgres').upper()
    
    if storage_preference == 'GCP':
        # Check if GCP credentials file path is provided
        gcp_creds_path = os.environ.get('GCP_CREDENTIALS_PATH')
        
        if gcp_creds_path and os.path.exists(gcp_creds_path):
            # Load GCP credentials from file
            with open(gcp_creds_path, 'r') as f:
                gcp_creds = json.load(f)
            return 'bigquery', gcp_creds
        else:
            # If credentials file is not provided, try to use application default credentials
            print("GCP credentials file not found, using application default credentials")
            return 'bigquery', None
    else:
        # PostgreSQL connection
        pg_host = os.environ.get('POSTGRES_HOST', 'localhost')
        pg_port = os.environ.get('POSTGRES_PORT', '5432')
        pg_db = os.environ.get('POSTGRES_DB', 'voting_db')
        pg_user = os.environ.get('POSTGRES_USER', 'postgres')
        pg_password = os.environ.get('POSTGRES_PASSWORD', 'postgres')
        
        connection_string = f"postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_db}"
        return 'postgres', connection_string

In [10]:
pipeline_name = 'voters'
table_name = 'voters'

In [11]:
from dlt.destinations import postgres
destination_type, destination_config = get_destination_config()
pipeline = dlt.pipeline(
    pipeline_name=pipeline_name,
    destination=postgres(destination_config),
    dataset_name='public',  # Use 'public' schema or whatever schema your tables are in
    dev_mode=True  # This forces DLT to recreate all internal state tables
)

info = pipeline.run(
        kafka_voters_source,
        table_name=table_name,
        write_disposition='append',
        #merge_key=None,  # Set this to your primary key if you want upsert behavior
        #if_exists='append'  # 'append' will add to existing tables, 'replace' would drop and recreate
    )

2025-04-12 20:11:32,251 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: connecting to localhost:29092 [('::1', 29092, 0, 0) IPv6]
2025-04-12 20:11:32,255 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <checking_api_versions_recv> [IPv6 ('::1', 29092, 0, 0)]>: Broker version identified as 2.6


2025-04-12 20:11:32,256 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Connection complete.
2025-04-12 20:11:32,258 - INFO - Updating subscribed topics to: ('voters',)
2025-04-12 20:11:32,263 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=1 host=localhost:29092 <connecting> [IPv6 ('::1', 29092, 0, 0)]>: connecting to localhost:29092 [('::1', 29092, 0, 0) IPv6]
2025-04-12 20:11:32,265 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=1 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Connection complete.
2025-04-12 20:11:32,265 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:29092 <connected> [IPv6 ('::1', 29092, 0, 0)]>: Closing connection. 
2025-04-12 20:11:32,368 - INFO - Group coordinator for dlt-voter-group is BrokerMetadata(nodeId='coordinator-1', host='localhost', port=29092, rack=None)
2025-04-12 2

KeyboardInterrupt: 