In [None]:
# ! pip install pyiceberg pyarrow

In [1]:
# from pyiceberg.catalog import load_catalog
# from pyiceberg.schema import Schema
# from pyiceberg.types import StringType, IntegerType, NestedField
# from pyiceberg.partitioning import PartitionSpec
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
import json
import logging
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
    TimestampType,
    LongType,
    IntegerType,
    StringType,
    NestedField,
    StructType,
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform
# Configure logging
# logging.basicConfig(level=logging.DEBUG)
# logger = logging.getLogger(__name__)  # Corrected __name__
logger = logging.getLogger(__name__)
# Set the logging level to INFO to suppress DEBUG messages
logging.basicConfig(level=logging.DEBUG)

# # Explicitly set the logging level for Kafka loggers
# logging.getLogger('kafka').setLevel(logging.WARNING)
# logging.getLogger('kafka.coordinator').setLevel(logging.WARNING)
# logging.getLogger('kafka.protocol.parser').setLevel(logging.WARNING)
# logging.getLogger('kafka.conn').setLevel(logging.WARNING)
# logging.getLogger('kafka.client').setLevel(logging.WARNING)
# logging.getLogger('kafka.metrics.metrics').setLevel(logging.WARNING)

In [2]:
topic = 'powerlevel'
bootstrap_servers = ['telemetry-kafka-external-bootstrap-observability-kafka.apps.zagaopenshift.zagaopensource.com:443']
group_id = 'sustaingroup01'
# SSL Configuration
ssl_certfile = './telemetry_zagaopenshift.crt'
ssl_cafile = './telemetry_zagaopenshift.pem'
# security_protocol = 'SSL'
  # Replace with your consumer group ID

In [3]:
consumer = None
try:
        # Create KafkaConsumer
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda v: v,  # Keep value as bytes for JSON decoding
            group_id=group_id,
            auto_offset_reset='earliest',
            consumer_timeout_ms=20000,
            security_protocol='SSL',  # Add SSL configuration if needed
            ssl_cafile=ssl_cafile,
            ssl_certfile=ssl_certfile
        )
        logger.info("Successfully connected to Kafka broker")
except NoBrokersAvailable as e:
    logger.error(f"No brokers available. Ensure the Kafka broker is running and accessible. Error: {e}")

except Exception as e:
    logger.error(f"Failed to create Kafka consumer. Error: {e}")

DEBUG:kafka.metrics.metrics:Added sensor with name connections-closed
DEBUG:kafka.metrics.metrics:Added sensor with name connections-created
DEBUG:kafka.metrics.metrics:Added sensor with name select-time
DEBUG:kafka.metrics.metrics:Added sensor with name io-time
DEBUG:kafka.client:Initiating connection to node bootstrap-0 at telemetry-kafka-external-bootstrap-observability-kafka.apps.zagaopenshift.zagaopensource.com:443
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent-received
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name request-latency
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.latency
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=telemetry-ka

In [None]:
def consume_messages(consumer):
    messages = []
    try:
        logger.info("Starting message consumption")
        for message in consumer:
            logger.debug(f"Consumed message: {message.value}")
            json_message = json.loads(message.value.decode('utf-8'))  # Decode and parse JSON
            messages.append(json_message)
    except KeyboardInterrupt:
        logger.info("Stopping consumer...")
    except Exception as e:
        logger.error(f"An error occurred: {e}")
    finally:
        # consumer.close()
        logger.info("Consumer closed")
    return messages
consume_messages(consumer)

In [None]:
def setup_iceberg_table():
    # S3 bucket name
    s3_bucket_name = "powerlevel-pod"
    catalog = load_catalog(
        "hive",
    **{
        "uri": "thrift://192.168.31.102:9083", # For other networks use this URL: "hive-ms-trino.apps.zagaopenshift.zagaopensource.com 9888"
        "s3.endpoint": "http://minio-api-dev.apps.zagaopenshift.zagaopensource.com",
        "s3.access-key-id": "minio",
        "s3.secret-access-key": "minio123$",
    }
    )
    catalog.create_namespace("power_pod")
    
    schema = Schema(
        NestedField(field_id=1, name="resourceMetrics", field_type=StringType(), required=True),        
        NestedField(field_id=2, name="container_id", field_type=StringType(), required=True),
        NestedField(field_id=3, name="container_name", field_type=StringType(), required=True),
        NestedField(field_id=4, name="container_namespace", field_type=StringType(), required=True),
        NestedField(field_id=5, name="pod_name", field_type=StringType(), required=False),
        NestedField(field_id=6, name="timeUnixNano", field_type=LongType(), required=False),
        NestedField(field_id=7, name="asDouble", field_type=StringType(), required=False),        
    )

    partition_spec = PartitionSpec(
            PartitionField(
                source_id=2, field_id=1000, transform=DayTransform(), name="container_id"
            )
        )

    sort_order = SortOrder(SortField(source_id=1, transform=IdentityTransform()))

    table = catalog.create_table(
            identifier="power_pod",
            schema=schema,
            location=f"s3a://{s3_bucket_name}/sample",  # Specify the S3 bucket name here
            partition_spec=partition_spec,
            sort_order=sort_order,
        )
        
    return table

In [None]:
def write_to_iceberg(table, messages):
    import pyarrow as pa
    from pyiceberg.io.pyarrow import PyArrowFileIO

    # Convert messages to PyArrow Table
    data = {
        "resourceMetrics": [json.dumps(msg) for msg in messages],  # Store the entire message as a string
        "container_id": [msg.get("container_id") for msg in messages],
        "container_name": [msg.get("container_name") for msg in messages],
        "container_namespace": [msg.get("container_namespace") for msg in messages],
        "pod_name": [msg.get("pod_name") for msg in messages],
        "timeUnixNano": [msg.get("timeUnixNano") for msg in messages],
        "asDouble": [msg.get("asDouble") for msg in messages],
    }

    arrow_table = pa.Table.from_pydict(data)
    
    # Write the data to the Iceberg table
    file_io = PyArrowFileIO()
    with table.new_append(file_io) as writer:
        writer.add(arrow_table)
        writer.commit()

In [None]:
if __name__ == "__main__":
    # Check if the consumer is correctly created and not None
    if consumer is not None:
        try:
            # Consume messages
            for json_response in consume_messages(consumer):
                print("success")
                print(json.dumps(json_response, indent=2))  # Pretty-print JSON response
        except Exception as e:
            logger.error(f"Error consuming messages: {e}")
        finally:
            consumer.close()
            logger.info("Consumer closed")
    else:
        logger.error("Consumer initialization failed. Exiting...")