In [None]:
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
import json
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

In [None]:
topic = 'my_topic'
bootstrap_servers = ['kafka_url:443']
group_id = 'mygroup'
# SSL Configuration
ssl_certfile = './sslcert.crt'
ssl_cafile = './sslcert.pem'

In [None]:
consumer = None
try:
        # Create KafkaConsumer
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda v: v,  # Keep value as bytes for JSON decoding
            group_id=group_id,
            auto_offset_reset='earliest',
            consumer_timeout_ms=20000,
            security_protocol='SSL',  # Add SSL configuration if needed
            ssl_cafile=ssl_cafile,
            ssl_certfile=ssl_certfile
        )
        logger.info("Successfully connected to Kafka broker")
except NoBrokersAvailable as e:
    logger.error(f"No brokers available. Ensure the Kafka broker is running and accessible. Error: {e}")

except Exception as e:
    logger.error(f"Failed to create Kafka consumer. Error: {e}")

In [None]:
def consume_messages(consumer):
    messages = []
    try:
        logger.info("Starting message consumption")
        for message in consumer:
            logger.debug(f"Consumed message: {message.value}")
            json_message = json.loads(message.value.decode('utf-8'))  # Decode and parse JSON
            messages.append(json_message)
            if(messages.length == 2) :
                consumer.close()
                logger.info(f"Consumed {messages.length} messages")
    except KeyboardInterrupt:
        logger.info("Stopping consumer...")
    except Exception as e:
        logger.error(f"An error occurred: {e}")
    finally:
        consumer.close()
        logger.info("Consumer closed")
        print(messages)
        
    return messages
consume_messages(consumer)

In [None]:
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import NamespaceAlreadyExistsError
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import StringType, LongType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.table.sorting import SortOrder, SortField,IdentityTransform
from pyiceberg.transforms import DayTransform, IdentityTransform
import logging
import json

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def create_s3_bucket(bucket_name, minio_endpoint, access_key, secret_key):
    try:
        s3 = boto3.client('s3',
                          endpoint_url=minio_endpoint,
                          aws_access_key_id=access_key,
                          aws_secret_access_key=secret_key,
                          config=Config(signature_version='s3v4'))
        s3.create_bucket(Bucket=bucket_name)
        logger.info(f"Bucket '{bucket_name}' created successfully.")
    except ClientError as e:
        if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
            logger.info(f"Bucket '{bucket_name}' already exists and is owned by you.")
        elif e.response['Error']['Code'] == 'BucketAlreadyExists':
            logger.info(f"Bucket '{bucket_name}' already exists.")
        elif e.response['Error']['Code'] == 'AccessDenied':
            logger.error(f"Access denied to create bucket '{bucket_name}'.")
        else:
            logger.error(f"An error occurred: {e}")
    except Exception as e:
        logger.error(f"An unexpected error occurred: {e}")

def setup_iceberg_table():
    s3_bucket_name = "my_buket_name"
    minio_endpoint = 'http://minio-url:8080'
    access_key = 'minio123'
    secret_key = 'minio123'

    # Create the bucket first
    create_s3_bucket(s3_bucket_name, minio_endpoint, access_key, secret_key)

    try:
        catalog = load_catalog(
            "hive",
            **{
                "uri": "thrift://hive_url:9083",
                "s3.endpoint": minio_endpoint,
                "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
                "s3.access-key-id": access_key,
                "s3.secret-access-key": secret_key
            }
        )

        # Check if namespace exists before creating it
        namespaces = catalog.list_namespaces()
        # if "powerlevelpoddata" not in namespaces:
        #     catalog.create_namespace("powerlevelpoddata")
        #     logger.info("Namespace 'powerlevelpoddata' created successfully.")
        # else:
        #     logger.info("Namespace 'powerlevelpoddata' already exists.")

        schema = Schema(
            NestedField(field_id=1, name="resourceMetrics", field_type=StringType(), required=True),        
            NestedField(field_id=2, name="container_id", field_type=StringType(), required=True),
            NestedField(field_id=3, name="container_name", field_type=StringType(), required=True),
            NestedField(field_id=4, name="container_namespace", field_type=StringType(), required=True),
            NestedField(field_id=5, name="pod_name", field_type=StringType(), required=False),
            NestedField(field_id=6, name="timeUnixNano", field_type=LongType(), required=False),
            NestedField(field_id=7, name="asDouble", field_type=StringType(), required=False),   
        )
        
        partition_spec = PartitionSpec(
            PartitionField(
                source_id=2, field_id=1000, transform=DayTransform(), name="container_id"
            )
        )
        sort_order = SortOrder(SortField(source_id=1, transform=IdentityTransform()))

      # Check if table exists before creating it
        tables = catalog.list_tables("my_namespace_name")
        if "sample_table" not in tables:
            table = catalog.create_table(
                identifier="my_namespace_name.sample_table",
                schema=schema,
                location=f"s3a://{s3_bucket_name}/sample",
                partition_spec=partition_spec,
                sort_order=sort_order
            )
            logger.info("Iceberg table 'sample_table' created successfully.")
        else:
            logger.info("Table 'sample_table' already exists in namespace 'my_namespace_name'.")
    
    except Exception as e:
        logger.error(f"An unexpected error occurred while setting up the Iceberg table: {e}")

# setup_iceberg_table()


In [None]:
import pyarrow as pa
from pyiceberg.io.pyarrow import PyArrowFileIO

def write_to_iceberg(table, messages):
     # Convert messages to PyArrow Table
    data = {
        "resourceMetrics": [json.dumps(msg) for msg in messages],  # Store the entire message as a string
        "container_id": [msg.get("container_id") for msg in messages],
        "container_name": [msg.get("container_name") for msg in messages],
        "container_namespace": [msg.get("container_namespace") for msg in messages],
        "pod_name": [msg.get("pod_name") for msg in messages],
        "timeUnixNano": [msg.get("timeUnixNano") for msg in messages],
        "asDouble": [msg.get("asDouble") for msg in messages],
    }

    arrow_table = pa.Table.from_pydict(data)
    
    # Write the data to the Iceberg table
    file_io = PyArrowFileIO()
    with table.new_append(file_io) as writer:
        writer.add(arrow_table)
        writer.commit()

# json_responses = consume_messages(consumer)
# table = setup_iceberg_table()        
# write_to_iceberg(table, json_responses)

In [None]:
if __name__ == "__main__":
    #Check if the consumer is correctly created and not None
    if consumer is not None:
        json_responses = consume_messages(consumer)
        print(json_responses)
        if json_responses:
            table = setup_iceberg_table()
            write_to_iceberg(table, json_responses)
            for json_response in json_responses:
                print(json.dumps(json_response, indent=2))  # Pretty-print JSON response
        else:
            logger.info("No messages consumed.")
else:
    logger.error("Consumer initialization failed. Exiting...")