In [None]:
import http.client
import json
import logging
import os
from dotenv import load_dotenv

load_dotenv()

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

REPORT_NAME = "CA_DEV_REPORT"
HEADERS = { 'Authorization': f"Basic {os.getenv("HEADER")}" }
CLUSTER_ID = os.getenv("CLUSTER_ID")
ENDPOINT = os.getenv("ENDPOINT")


if not all([HEADERS['Authorization'], CLUSTER_ID, ENDPOINT]):
    logging.error("Missing required environment variables.")
    exit(1)

def make_request(method: str, endpoint: str) -> dict:
    try:
        conn = http.client.HTTPSConnection(ENDPOINT)
        conn.request(method, endpoint, headers=HEADERS)
        response = conn.getresponse()
        data_str = response.read().decode("utf-8")
        return json.loads(data_str)
    except json.JSONDecodeError as e:
        logging.error(f"Error decoding JSON from {endpoint}: {e}")
    except Exception as e:
        logging.error(f"Error making request to {endpoint}: {e}")
    finally:
        conn.close()
    return {}


In [None]:
import pandas as pd

df =pd.read_csv("consumer_group_states.csv")

pivot_df = df.groupby(['consumer_group_id', 'state']).size().unstack(fill_value=0).reset_index()

# Step 2: Rename columns to match desired format
pivot_df.columns = ['consumer_group_id', 'preparing_rebalance', 'stable']  # Ensure column order matches the final output

# Step 3: Reorder columns as necessary
pivot_df = pivot_df[['consumer_group_id', 'stable', 'preparing_rebalance']]

sorted = pivot_df.
sorted.head()

ValueError: Length mismatch: Expected axis has 4 elements, new values have 3 elements

In [15]:
import pandas as pd


df =pd.read_csv("consumer_group_states.csv")

# Step 1: Create dummy variables for each state
dummies = pd.get_dummies(df['state'], prefix='', prefix_sep='')

# Step 2: Concatenate the dummy variables with the original DataFrame
df_with_dummies = pd.concat([df[['consumer_group_id']], dummies], axis=1)

# Step 3: Group by consumer_group_id and sum the counts
result = df_with_dummies.groupby('consumer_group_id').sum().reset_index()

# Step 4: (Optional) Rename the columns to have a clearer structure
result.columns.name = None  # Remove the name of the index
result = result.rename(columns=lambda x: x if x == 'consumer_group_id' else f"{x.lower()}")

new = result.sort_values(by=['preparing_rebalance'], ascending=False)
new.head()


Unnamed: 0,consumer_group_id,preparing_rebalance,stable
132,vitality-manage-health-attributes-service-1.ca...,1,2
0,PIMS-va.pims.inboundendpoint.56.apple.appstore...,0,1
2,WSO2_ESB-va.ndc.inboundendpoint.accountstatusc...,0,1
3,WSO2_ESB-va.ndc.inboundendpoint.activerewardno...,0,1
4,WSO2_ESB-va.ndc.inboundendpoint.activityremind...,0,1


In [None]:
def get_consumer_group_ids() -> list[str]:
    logging.info(f"Fetching consumer group IDs for cluster {CLUSTER_ID}...")
    json_data = make_request("GET", f"/kafka/v3/clusters/{CLUSTER_ID}/consumer-groups")
    consumer_groups = [item["consumer_group_id"] for item in json_data.get("data", [])]
    logging.info(f"Found {len(consumer_groups)} consumer groups")
    return consumer_groups

In [None]:
def get_consumer_ids(consumer_group_id) -> list[str]:
    logging.info(f"Fetching consumers for group: {consumer_group_id}.")
    json_data = make_request("GET", f"/kafka/v3/clusters/{CLUSTER_ID}/consumer-groups/{consumer_group_id}/consumers")
    
    if 'data' in json_data and isinstance(json_data['data'], list):
        return [consumer.get('consumer_id') for consumer in json_data['data'] if consumer.get('consumer_id')]
    
    logging.error("No consumer data found in the consumer group.")
    return []


In [None]:
def get_consumer_info(consumer_group_id, consumer_id):   
    json_data = make_request("GET", f"/kafka/v3/clusters/{CLUSTER_ID}/consumer-groups/{consumer_group_id}/consumers/{consumer_id}")

    if 'consumer_id' in json_data:
        return {
            "consumer_id": json_data.get('consumer_id'),
            "instance_id": json_data.get('instance_id'),
            "client_id": json_data.get('client_id'),
            "assignments_url": json_data.get('assignments', {}).get('related')
        }
    
    logging.error("No valid consumer data found in the response.")
    return None

In [None]:
consumer_group_ids = get_consumer_group_ids()
for consumer_group_id in consumer_group_ids:
    consumer_ids = get_consumer_ids(consumer_group_id)
    for i, consumer_id in enumerate(consumer_ids, start=1):
        consumer_info = get_consumer_info(consumer_group_id, consumer_id)
        if consumer_info:
            logging.info(f"Consumer {i}/{len(consumer_ids)} in group: {consumer_group_id}:")
            print(json.dumps(consumer_info, indent=2))
