In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import functools
import time
import json
from typing import List, Optional, Dict, Any, Callable
from confluent_kafka import Producer, Consumer
from confluent_kafka.serialization import StringSerializer
import requests


class KafkaOAuthClient:
    """
    Handles OAuth authentication for Kafka connections.
    This is a separate class to avoid code duplication between producer and consumer.
    """
    
    def __init__(self, client_id: str, client_secret: str, token_url: str, scopes: List[str]):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self.scopes = scopes
        self.logger = logging.getLogger(__name__)
    
    def get_token(self, config: Dict[str, Any]) -> tuple:
        """
        Retrieves OAuth access token using client credentials grant.
        
        Args:
            config: Configuration dictionary (not used but required by Kafka)
            
        Returns:
            tuple: (access_token, expiration_timestamp)
        """
        payload = {
            'grant_type': 'client_credentials',
            'scope': ' '.join(self.scopes)
        }
        
        try:
            resp = requests.post(
                self.token_url,
                auth=(self.client_id, self.client_secret),
                data=payload
            )
            resp.raise_for_status()  # Raise exception for bad status codes
            
            token = resp.json()
            self.logger.info(f"Successfully obtained OAuth token")
            
            return token['access_token'], time.time() + float(token['expires_in'])
            
        except requests.exceptions.RequestException as e:
            self.logger.error(f"Failed to obtain OAuth token: {e}")
            raise
        except KeyError as e:
            self.logger.error(f"Invalid token response format: {e}")
            raise


class KafkaProducer:
    """
    Kafka Producer with OAuth authentication.
    
    This class handles sending messages to Kafka topics securely.
    """
    
    def __init__(self, bootstrap_servers: str, client_id: str, client_secret: str, 
                 token_url: str, scopes: List[str], ssl_ca_location: str = 'kafka-dev.asd.pem'):
        """
        Initialize Kafka Producer with OAuth authentication.
        
        Args:
            bootstrap_servers: Comma-separated list of Kafka brokers
            client_id: OAuth client ID
            client_secret: OAuth client secret
            token_url: OAuth token endpoint URL
            scopes: List of OAuth scopes to request
            ssl_ca_location: Path to SSL certificate file
        """
        self.bootstrap_servers = bootstrap_servers
        self.ssl_ca_location = ssl_ca_location
        self.logger = logging.getLogger(__name__)
        
        # Create OAuth client
        self.oauth_client = KafkaOAuthClient(client_id, client_secret, token_url, scopes)
        
        # Create producer with configuration
        self.producer = Producer(self._get_producer_config())
        self.serializer = StringSerializer('utf8')
        
        self.logger.info("Kafka Producer initialized successfully")
    
    def _get_producer_config(self) -> Dict[str, Any]:
        """
        Generate producer configuration with OAuth settings.
        
        Returns:
            Dict: Producer configuration
        """
        return {
            'bootstrap.servers': self.bootstrap_servers,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'OAUTHBEARER',
            'ssl.ca.location': self.ssl_ca_location,
            'oauth_cb': functools.partial(self.oauth_client.get_token),
            'logger': self.logger,
        }
    
    def send_message(self, topic: str, message: Any, key: Optional[str] = None) -> None:
        """
        Send a message to a Kafka topic.
        
        Args:
            topic: Topic name to send message to
            message: Message content (will be JSON serialized if not string)
            key: Optional message key for partitioning
        """
        try:
            # Convert message to JSON string if it's not already a string
            if not isinstance(message, str):
                message = json.dumps(message)
            
            # Produce message
            self.producer.produce(
                topic=topic,
                value=message,
                key=key,
                callback=self._delivery_callback
            )
            
            # Flush to ensure message is sent
            self.producer.flush()
            
            self.logger.info(f"Message sent to topic '{topic}'")
            
        except Exception as e:
            self.logger.error(f"Failed to send message to topic '{topic}': {e}")
            raise
    
    def _delivery_callback(self, err, msg):
        """
        Callback function called when message delivery completes.
        
        Args:
            err: Error object if delivery failed
            msg: Message object if delivery succeeded
        """
        if err:
            self.logger.error(f"Message delivery failed: {err}")
        else:
            self.logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}]")
    
    def send_batch(self, topic: str, messages: List[Dict[str, Any]]) -> None:
        """
        Send multiple messages to a topic efficiently.
        
        Args:
            topic: Topic name
            messages: List of message dictionaries with 'message' and optional 'key'
        """
        try:
            for msg_data in messages:
                message = msg_data.get('message')
                key = msg_data.get('key')
                
                if not isinstance(message, str):
                    message = json.dumps(message)
                
                self.producer.produce(
                    topic=topic,
                    value=message,
                    key=key,
                    callback=self._delivery_callback
                )
            
            # Flush all messages
            self.producer.flush()
            self.logger.info(f"Batch of {len(messages)} messages sent to topic '{topic}'")
            
        except Exception as e:
            self.logger.error(f"Failed to send batch to topic '{topic}': {e}")
            raise
    
    def close(self) -> None:
        """Clean up producer resources."""
        if hasattr(self, 'producer'):
            self.producer.flush()
            self.logger.info("Kafka Producer closed")


class KafkaConsumer:
    """
    Kafka Consumer with OAuth authentication.
    
    This class handles consuming messages from Kafka topics securely.
    """
    
    def __init__(self, bootstrap_servers: str, client_id: str, client_secret: str,
                 token_url: str, scopes: List[str], group_id: str,
                 ssl_ca_location: str = 'kafka-dev.asd.pem'):
        """
        Initialize Kafka Consumer with OAuth authentication.
        
        Args:
            bootstrap_servers: Comma-separated list of Kafka brokers
            client_id: OAuth client ID
            client_secret: OAuth client secret
            token_url: OAuth token endpoint URL
            scopes: List of OAuth scopes to request
            group_id: Consumer group ID
            ssl_ca_location: Path to SSL certificate file
        """
        self.bootstrap_servers = bootstrap_servers
        self.group_id = group_id
        self.ssl_ca_location = ssl_ca_location
        self.logger = logging.getLogger(__name__)
        
        # Create OAuth client
        self.oauth_client = KafkaOAuthClient(client_id, client_secret, token_url, scopes)
        
        # Create consumer with configuration
        self.consumer = Consumer(self._get_consumer_config())
        
        self.logger.info(f"Kafka Consumer initialized with group_id: {group_id}")
    
    def _get_consumer_config(self) -> Dict[str, Any]:
        """
        Generate consumer configuration with OAuth settings.
        
        Returns:
            Dict: Consumer configuration
        """
        return {
            'bootstrap.servers': self.bootstrap_servers,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'OAUTHBEARER',
            'group.id': self.group_id,
            'ssl.ca.location': self.ssl_ca_location,
            'oauth_cb': functools.partial(self.oauth_client.get_token),
            'logger': self.logger,
            'auto.offset.reset': 'earliest',  # Start from beginning if no offset
        }
    
    def _print_assignment(self, consumer, partitions):
        """Callback for partition assignment."""
        self.logger.info(f"Assigned partitions: {partitions}")
    
    def subscribe(self, topics: List[str]) -> None:
        """
        Subscribe to one or more topics.
        
        Args:
            topics: List of topic names to subscribe to
        """
        try:
            self.consumer.subscribe(topics, on_assign=self._print_assignment)
            self.logger.info(f"Subscribed to topics: {topics}")
        except Exception as e:
            self.logger.error(f"Failed to subscribe to topics {topics}: {e}")
            raise
    
    def poll_message(self, timeout: float = 1.0) -> Optional[Dict[str, Any]]:
        """
        Poll for a single message.
        
        Args:
            timeout: Timeout in seconds for polling
            
        Returns:
            Dict with message data or None if no message
        """
        try:
            msg = self.consumer.poll(timeout)
            
            if msg is None:
                return None
            
            if msg.error():
                self.logger.error(f"Consumer error: {msg.error()}")
                return None
            
            # Return message data
            return {
                'topic': msg.topic(),
                'partition': msg.partition(),
                'offset': msg.offset(),
                'key': msg.key().decode('utf-8') if msg.key() else None,
                'value': msg.value().decode('utf-8') if msg.value() else None,
                'timestamp': msg.timestamp()
            }
            
        except Exception as e:
            self.logger.error(f"Error polling message: {e}")
            raise
    
    def consume_messages(self, message_handler: Callable[[Dict[str, Any]], None], 
                        timeout: float = 1.0) -> None:
        """
        Continuously consume messages and process them with a handler function.
        
        Args:
            message_handler: Function to process each message
            timeout: Timeout for each poll operation
        """
        self.logger.info("Starting message consumption...")
        
        try:
            while True:
                message = self.poll_message(timeout)
                
                if message:
                    try:
                        message_handler(message)
                    except Exception as e:
                        self.logger.error(f"Error in message handler: {e}")
                        
        except KeyboardInterrupt:
            self.logger.info("Consumption interrupted by user")
        except Exception as e:
            self.logger.error(f"Error in message consumption: {e}")
            raise
        finally:
            self.close()
    
    def close(self) -> None:
        """Clean up consumer resources."""
        if hasattr(self, 'consumer'):
            self.consumer.close()
            self.logger.info("Kafka Consumer closed")


# Example usage
if __name__ == "__main__":
    # Configuration
    config = {
        'bootstrap_servers': 'your-kafka-broker:9092',
        'client_id': 'your-client-id',
        'client_secret': 'your-client-secret',
        'token_url': 'https://your-oauth-server/token',
        'scopes': ['kafka.read', 'kafka.write'],
        'group_id': 'my-consumer-group'
    }
    
    # Example: Producer usage
    producer = KafkaProducer(
        bootstrap_servers=config['bootstrap_servers'],
        client_id=config['client_id'],
        client_secret=config['client_secret'],
        token_url=config['token_url'],
        scopes=config['scopes']
    )
    
    # Send a message
    producer.send_message('my-topic', {'message': 'Hello, Kafka!'})
    producer.close()
    
    # Example: Consumer usage
    consumer = KafkaConsumer(
        bootstrap_servers=config['bootstrap_servers'],
        client_id=config['client_id'],
        client_secret=config['client_secret'],
        token_url=config['token_url'],
        scopes=config['scopes'],
        group_id=config['group_id']
    )
    
    # Subscribe to topics
    consumer.subscribe(['my-topic'])
    
    # Define message handler
    def handle_message(message):
        print(f"Received: {message['value']} from topic: {message['topic']}")
    
    # Start consuming
    consumer.consume_messages(handle_message)

In [None]:
import logging
import functools
import json
import time
from confluent_kafka import Producer, Consumer
from confluent_kafka.serialization import StringSerializer
import requests

class KafkaProducer:
    """
    A class to produce (write) messages to Kafka topics.
    Designed for insurance data engineering workflows.
    """
    
    def __init__(self, bootstrap_servers, client_id, client_secret, token_url, topic_name):
        """
        Initialize the Kafka producer.
        
        Args:
            bootstrap_servers (str): Kafka server addresses
            client_id (str): OAuth client ID for authentication
            client_secret (str): OAuth client secret
            token_url (str): URL to get OAuth tokens
            topic_name (str): Default topic to write to
        """
        self.bootstrap_servers = bootstrap_servers
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self.topic_name = topic_name
        self.logger = logging.getLogger(__name__)
        
        # Create producer with configuration
        self.producer = Producer(self._get_producer_config())
        self.serializer = StringSerializer('utf8')
    
    def _get_token(self, config):
        """Get OAuth token for authentication."""
        payload = {
            'grant_type': 'client_credentials',
            'scope': 'kafka'  # Adjust scope as needed
        }
        
        resp = requests.post(
            self.token_url,
            auth=(self.client_id, self.client_secret),
            data=payload
        )
        
        token = resp.json()
        return token['access_token'], time.time() + float(token['expires_in'])
    
    def _get_producer_config(self):
        """Get producer configuration."""
        return {
            'bootstrap.servers': self.bootstrap_servers,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'OAUTHBEARER',
            'ssl.ca.location': 'kafka-dev.asd.pem',
            'oauth_cb': functools.partial(self._get_token),
            'logger': self.logger,
        }
    
    def send_message(self, message, topic_name=None):
        """
        Send a message to the specified topic.
        
        Args:
            message (dict or str): Message to send
            topic_name (str): Topic to send to (uses default if None)
        """
        target_topic = topic_name or self.topic_name
        
        # Convert message to JSON string if it's a dict
        if isinstance(message, dict):
            message = json.dumps(message)
        
        try:
            self.producer.produce(target_topic, value=message)
            self.producer.flush()  # Ensure message is sent
            self.logger.info(f"Message sent to topic '{target_topic}': {message}")
            return True
        except Exception as e:
            self.logger.error(f"Failed to send message: {e}")
            return False
    
    def send_batch(self, messages, topic_name=None):
        """
        Send multiple messages in batch.
        
        Args:
            messages (list): List of messages to send
            topic_name (str): Topic to send to
        """
        target_topic = topic_name or self.topic_name
        successful_sends = 0
        
        for message in messages:
            if isinstance(message, dict):
                message = json.dumps(message)
            
            try:
                self.producer.produce(target_topic, value=message)
                successful_sends += 1
            except Exception as e:
                self.logger.error(f"Failed to send message in batch: {e}")
        
        self.producer.flush()
        self.logger.info(f"Sent {successful_sends}/{len(messages)} messages to '{target_topic}'")
        return successful_sends
    
    def close(self):
        """Close the producer connection."""
        self.producer.flush()
        self.logger.info("Producer closed")


class KafkaConsumer:
    """
    A class to consume (read) messages from Kafka topics.
    Designed for insurance data engineering workflows.
    """
    
    def __init__(self, bootstrap_servers, client_id, client_secret, token_url, topic_name, group_id="insurance-data-eng"):
        """
        Initialize the Kafka consumer.
        
        Args:
            bootstrap_servers (str): Kafka server addresses
            client_id (str): OAuth client ID for authentication
            client_secret (str): OAuth client secret
            token_url (str): URL to get OAuth tokens
            topic_name (str): Topic to read from
            group_id (str): Consumer group ID
        """
        self.bootstrap_servers = bootstrap_servers
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self.topic_name = topic_name
        self.group_id = group_id
        self.logger = logging.getLogger(__name__)
        
        # Create consumer with configuration
        self.consumer = Consumer(self._get_consumer_config())
        self.consumer.subscribe([topic_name], on_assign=self._print_assignment)
    
    def _get_token(self, config):
        """Get OAuth token for authentication."""
        payload = {
            'grant_type': 'client_credentials',
            'scope': 'kafka'
        }
        
        resp = requests.post(
            self.token_url,
            auth=(self.client_id, self.client_secret),
            data=payload
        )
        
        token = resp.json()
        return token['access_token'], time.time() + float(token['expires_in'])
    
    def _get_consumer_config(self):
        """Get consumer configuration."""
        return {
            'bootstrap.servers': self.bootstrap_servers,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'OAUTHBEARER',
            'group.id': self.group_id,
            'ssl.ca.location': 'kafka-dev.asd.pem',
            'oauth_cb': functools.partial(self._get_token),
            'logger': self.logger,
        }
    
    def _print_assignment(self, consumer, partitions):
        """Callback for partition assignment."""
        self.logger.info(f"Assignment: {partitions}")
    
    def consume_messages(self, timeout=1.0, max_messages=None):
        """
        Consume messages from the topic.
        
        Args:
            timeout (float): Poll timeout in seconds
            max_messages (int): Maximum number of messages to consume (None for unlimited)
        
        Returns:
            list: List of consumed messages
        """
        messages = []
        message_count = 0
        
        self.logger.info(f"Starting to consume from topic '{self.topic_name}'")
        
        try:
            while True:
                msg = self.consumer.poll(timeout)
                
                if msg is None:
                    continue
                
                if msg.error():
                    self.logger.error(f"Consumer error: {msg.error()}")
                    continue
                
                # Process the message
                message_value = msg.value().decode('utf-8')
                try:
                    # Try to parse as JSON
                    parsed_message = json.loads(message_value)
                    messages.append(parsed_message)
                except json.JSONDecodeError:
                    # If not JSON, store as string
                    messages.append(message_value)
                
                message_count += 1
                self.logger.info(f"Consumed message {message_count}: {message_value}")
                
                # Check if we've reached the maximum number of messages
                if max_messages and message_count >= max_messages:
                    break
                    
        except KeyboardInterrupt:
            self.logger.info("Consumption interrupted by user")
        
        return messages
    
    def consume_single_message(self, timeout=5.0):
        """
        Consume a single message from the topic.
        
        Args:
            timeout (float): Poll timeout in seconds
        
        Returns:
            dict or str or None: The consumed message or None if no message
        """
        msg = self.consumer.poll(timeout)
        
        if msg is None:
            return None
        
        if msg.error():
            self.logger.error(f"Consumer error: {msg.error()}")
            return None
        
        message_value = msg.value().decode('utf-8')
        try:
            return json.loads(message_value)
        except json.JSONDecodeError:
            return message_value
    
    def close(self):
        """Close the consumer connection."""
        self.consumer.close()
        self.logger.info("Consumer closed")

Config class

In [None]:
import logging
import functools
import json
import time
from confluent_kafka import Producer, Consumer
from confluent_kafka.serialization import StringSerializer
import requests

class KafkaConfigurationService:
    """
    Central configuration service that manages environment-specific Kafka configurations
    """
    
    def __init__(self):
        self._configurations = {
            "nonprod": {
                "bootstrap-servers": "nonprod-kafka-broker1:9092,nonprod-kafka-broker2:9092",
                "client-id-key": "nonprod-kafka-client-id",
                "client-secret-key": "nonprod-kafka-client-secret",
                "token-url": "https://nonprod-auth.company.com/oauth/token",
                "default-secret-scope": "nonprod-kafka-secrets",
                "ssl-ca-location": "kafka-nonprod.pem",
                "default-group-id": "nonprod-insurance-data-eng"
            },
            "prod": {
                "bootstrap-servers": "prod-kafka-broker1:9092,prod-kafka-broker2:9092",
                "client-id-key": "prod-kafka-client-id", 
                "client-secret-key": "prod-kafka-client-secret",
                "token-url": "https://prod-auth.company.com/oauth/token",
                "default-secret-scope": "prod-kafka-secrets",
                "ssl-ca-location": "kafka-prod.pem",
                "default-group-id": "prod-insurance-data-eng"
            }
        }
    
    def get_configuration(self, environment):
        """
        Get configuration dictionary for a specific environment
        
        Args:
            environment (str): Environment name ('prod' or 'nonprod')
            
        Returns:
            dict: Configuration dictionary for the environment
        """
        if environment not in self._configurations:
            raise ValueError(f"Environment '{environment}' not found in configurations")
        
        return self._configurations[environment].copy()
    
    def add_environment(self, environment, config_dict):
        """
        Add a new environment configuration
        
        Args:
            environment (str): Environment name
            config_dict (dict): Configuration dictionary
        """
        self._configurations[environment] = config_dict

class KafkaProducer:
    """
    A class to produce (write) messages to Kafka topics.
    Designed for insurance data engineering workflows.
    """
    
    def __init__(self, environment='nonprod', topic_name=None, config_service=None, custom_secret_scope=None):
        """
        Initialize the Kafka producer.
        
        Args:
            environment (str): 'prod' or 'nonprod' to determine which config to use
            topic_name (str): Default topic to write to
            config_service (KafkaConfigurationService): Optional config service instance
            custom_secret_scope (str): Optional custom secret scope (uses default if not provided)
        """
        self.environment = environment
        self.config_service = config_service or KafkaConfigurationService()
        self.env_config = self.config_service.get_configuration(environment)
        self.topic_name = topic_name
        self.logger = logging.getLogger(__name__)
        
        # Use default secret scope from configuration if not provided
        self.secret_scope = custom_secret_scope or self.env_config['default-secret-scope']
        
        # Get credentials from configuration and secrets
        self._setup_credentials()
        
        # Create producer with configuration
        self.producer = Producer(self._get_producer_config())
        self.serializer = StringSerializer('utf8')
        
        print(f"Kafka Producer initialized for {self.environment} environment")
        print(f"  Bootstrap servers: {self.bootstrap_servers}")
        print(f"  Using secret scope: {self.secret_scope}")
    
    def _setup_credentials(self):
        """Setup credentials from environment configuration and secrets."""
        try:
            # Get credentials from Databricks secrets using configuration
            self.client_id = dbutils.secrets.get(self.secret_scope, self.env_config['client-id-key'])
            self.client_secret = dbutils.secrets.get(self.secret_scope, self.env_config['client-secret-key'])
            
            # Get other config values
            self.bootstrap_servers = self.env_config['bootstrap-servers']
            self.token_url = self.env_config['token-url']
            self.ssl_ca_location = self.env_config['ssl-ca-location']
            
        except Exception as e:
            raise Exception(f"Failed to retrieve credentials from scope '{self.secret_scope}': {str(e)}")
    
    def _get_token(self, config):
        """Get OAuth token for authentication."""
        payload = {
            'grant_type': 'client_credentials',
            'scope': 'kafka'  # Adjust scope as needed
        }
        
        resp = requests.post(
            self.token_url,
            auth=(self.client_id, self.client_secret),
            data=payload
        )
        
        token = resp.json()
        return token['access_token'], time.time() + float(token['expires_in'])
    
    def _get_producer_config(self):
        """Get producer configuration."""
        return {
            'bootstrap.servers': self.bootstrap_servers,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'OAUTHBEARER',
            'ssl.ca.location': self.ssl_ca_location,
            'oauth_cb': functools.partial(self._get_token),
            'logger': self.logger,
        }
    
    def send_message(self, message, topic_name=None):
        """
        Send a message to the specified topic.
        
        Args:
            message (dict or str): Message to send
            topic_name (str): Topic to send to (uses default if None)
        """
        if not topic_name and not self.topic_name:
            raise Exception("No topic specified. Provide topic_name parameter or set default topic.")
            
        target_topic = topic_name or self.topic_name
        
        # Convert message to JSON string if it's a dict
        if isinstance(message, dict):
            message = json.dumps(message)
        
        try:
            self.producer.produce(target_topic, value=message)
            self.producer.flush()  # Ensure message is sent
            self.logger.info(f"Message sent to topic '{target_topic}': {message}")
            return True
        except Exception as e:
            self.logger.error(f"Failed to send message: {e}")
            return False
    
    def send_batch(self, messages, topic_name=None):
        """
        Send multiple messages in batch.
        
        Args:
            messages (list): List of messages to send
            topic_name (str): Topic to send to
        """
        if not topic_name and not self.topic_name:
            raise Exception("No topic specified. Provide topic_name parameter or set default topic.")
            
        target_topic = topic_name or self.topic_name
        successful_sends = 0
        
        for message in messages:
            if isinstance(message, dict):
                message = json.dumps(message)
            
            try:
                self.producer.produce(target_topic, value=message)
                successful_sends += 1
            except Exception as e:
                self.logger.error(f"Failed to send message in batch: {e}")
        
        self.producer.flush()
        self.logger.info(f"Sent {successful_sends}/{len(messages)} messages to '{target_topic}'")
        return successful_sends
    
    def close(self):
        """Close the producer connection."""
        self.producer.flush()
        self.logger.info("Producer closed")


class KafkaConsumer:
    """
    A class to consume (read) messages from Kafka topics.
    Designed for insurance data engineering workflows.
    """
    
    def __init__(self, environment='nonprod', topic_name=None, group_id=None, config_service=None, custom_secret_scope=None):
        """
        Initialize the Kafka consumer.
        
        Args:
            environment (str): 'prod' or 'nonprod' to determine which config to use
            topic_name (str): Topic to read from
            group_id (str): Consumer group ID (uses default if not provided)
            config_service (KafkaConfigurationService): Optional config service instance
            custom_secret_scope (str): Optional custom secret scope (uses default if not provided)
        """
        self.environment = environment
        self.config_service = config_service or KafkaConfigurationService()
        self.env_config = self.config_service.get_configuration(environment)
        self.topic_name = topic_name
        self.logger = logging.getLogger(__name__)
        
        # Use default secret scope and group ID from configuration if not provided
        self.secret_scope = custom_secret_scope or self.env_config['default-secret-scope']
        self.group_id = group_id or self.env_config['default-group-id']
        
        # Get credentials from configuration and secrets
        self._setup_credentials()
        
        # Create consumer with configuration
        self.consumer = Consumer(self._get_consumer_config())
        
        if self.topic_name:
            self.consumer.subscribe([self.topic_name], on_assign=self._print_assignment)
        
        print(f"Kafka Consumer initialized for {self.environment} environment")
        print(f"  Bootstrap servers: {self.bootstrap_servers}")
        print(f"  Group ID: {self.group_id}")
        print(f"  Using secret scope: {self.secret_scope}")
    
    def _setup_credentials(self):
        """Setup credentials from environment configuration and secrets."""
        try:
            # Get credentials from Databricks secrets using configuration
            self.client_id = dbutils.secrets.get(self.secret_scope, self.env_config['client-id-key'])
            self.client_secret = dbutils.secrets.get(self.secret_scope, self.env_config['client-secret-key'])
            
            # Get other config values
            self.bootstrap_servers = self.env_config['bootstrap-servers']
            self.token_url = self.env_config['token-url']
            self.ssl_ca_location = self.env_config['ssl-ca-location']
            
        except Exception as e:
            raise Exception(f"Failed to retrieve credentials from scope '{self.secret_scope}': {str(e)}")
    
    def _get_token(self, config):
        """Get OAuth token for authentication."""
        payload = {
            'grant_type': 'client_credentials',
            'scope': 'kafka'
        }
        
        resp = requests.post(
            self.token_url,
            auth=(self.client_id, self.client_secret),
            data=payload
        )
        
        token = resp.json()
        return token['access_token'], time.time() + float(token['expires_in'])
    
    def _get_consumer_config(self):
        """Get consumer configuration."""
        return {
            'bootstrap.servers': self.bootstrap_servers,
            'security.protocol': 'sasl_ssl',
            'sasl.mechanisms': 'OAUTHBEARER',
            'group.id': self.group_id,
            'ssl.ca.location': self.ssl_ca_location,
            'oauth_cb': functools.partial(self._get_token),
            'logger': self.logger,
        }
    
    def _print_assignment(self, consumer, partitions):
        """Callback for partition assignment."""
        self.logger.info(f"Assignment: {partitions}")
    
    def subscribe_to_topic(self, topic_name):
        """
        Subscribe to a specific topic.
        
        Args:
            topic_name (str): Topic to subscribe to
        """
        self.topic_name = topic_name
        self.consumer.subscribe([topic_name], on_assign=self._print_assignment)
        self.logger.info(f"Subscribed to topic: {topic_name}")
    
    def consume_messages(self, timeout=1.0, max_messages=None):
        """
        Consume messages from the topic.
        
        Args:
            timeout (float): Poll timeout in seconds
            max_messages (int): Maximum number of messages to consume (None for unlimited)
        
        Returns:
            list: List of consumed messages
        """
        if not self.topic_name:
            raise Exception("No topic subscribed. Call subscribe_to_topic() first or provide topic in constructor.")
            
        messages = []
        message_count = 0
        
        self.logger.info(f"Starting to consume from topic '{self.topic_name}'")
        
        try:
            while True:
                msg = self.consumer.poll(timeout)
                
                if msg is None:
                    continue
                
                if msg.error():
                    self.logger.error(f"Consumer error: {msg.error()}")
                    continue
                
                # Process the message
                message_value = msg.value().decode('utf-8')
                try:
                    # Try to parse as JSON
                    parsed_message = json.loads(message_value)
                    messages.append(parsed_message)
                except json.JSONDecodeError:
                    # If not JSON, store as string
                    messages.append(message_value)
                
                message_count += 1
                self.logger.info(f"Consumed message {message_count}: {message_value}")
                
                # Check if we've reached the maximum number of messages
                if max_messages and message_count >= max_messages:
                    break
                    
        except KeyboardInterrupt:
            self.logger.info("Consumption interrupted by user")
        
        return messages
    
    def consume_single_message(self, timeout=5.0):
        """
        Consume a single message from the topic.
        
        Args:
            timeout (float): Poll timeout in seconds
        
        Returns:
            dict or str or None: The consumed message or None if no message
        """
        if not self.topic_name:
            raise Exception("No topic subscribed. Call subscribe_to_topic() first or provide topic in constructor.")
            
        msg = self.consumer.poll(timeout)
        
        if msg is None:
            return None
        
        if msg.error():
            self.logger.error(f"Consumer error: {msg.error()}")
            return None
        
        message_value = msg.value().decode('utf-8')
        try:
            return json.loads(message_value)
        except json.JSONDecodeError:
            return message_value
    
    def close(self):
        """Close the consumer connection."""
        self.consumer.close()
        self.logger.info("Consumer closed")


# Helper functions similar to ADLS connector
def create_kafka_producer(environment, topic_name, config_service=None, custom_secret_scope=None):
    """
    Quick helper function to create a Kafka producer
    
    Args:
        environment (str): 'prod' or 'nonprod'
        topic_name (str): Default topic name
        config_service (KafkaConfigurationService): Optional config service instance
        custom_secret_scope (str): Optional custom secret scope
    
    Returns:
        KafkaProducer: Configured producer
    """
    return KafkaProducer(environment, topic_name, config_service, custom_secret_scope)


def create_kafka_consumer(environment, topic_name, group_id=None, config_service=None, custom_secret_scope=None):
    """
    Quick helper function to create a Kafka consumer
    
    Args:
        environment (str): 'prod' or 'nonprod'
        topic_name (str): Topic to subscribe to
        group_id (str): Consumer group ID
        config_service (KafkaConfigurationService): Optional config service instance
        custom_secret_scope (str): Optional custom secret scope
    
    Returns:
        KafkaConsumer: Configured consumer
    """
    return KafkaConsumer(environment, topic_name, group_id, config_service, custom_secret_scope)