In [1]:
from confluent_kafka.admin import AdminClient, NewTopic
import json

### Delete topics

In [3]:
# Kafka configuration
kafka_config = {
    'bootstrap.servers': 'localhost:9092'
}

# Create AdminClient
admin_client = AdminClient(kafka_config)


In [14]:

# Fetch metadata
metadata = admin_client.list_topics(timeout=10)

# Get all topics
topics = metadata.topics
print("Topics to be deleted:")
for topic in topics:
    print(topic)

# Delete topics
fs = admin_client.delete_topics(list(topics.keys()), operation_timeout=30)

# Wait for each deletion to finish
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print(f"Topic {topic} deleted")
    except Exception as e:
        print(f"Failed to delete topic {topic}: {e}")


Topics to be deleted:
Electricity
Oil-GasProducers_EOD
Chemicals_EOD
Electricity_EOD
Travel-Leisure
FoodProducers
RealEstateInvestment-Services
LifeInsurance_EOD
FoodProducers_EOD
IndustrialMetals-Mining
GeneralRetailers_EOD
Oil-GasProducers
Banks_EOD
Banks
IndustrialMetals-Mining_EOD
Beverages_EOD
Travel-Leisure_EOD
FinancialServices_EOD
Software-ComputerServices
Software-ComputerServices_EOD
Chemicals
FinancialServices
Gas-Water-Multi-utilities
Gas-Water-Multi-utilities_EOD
RealEstateInvestment-Services_EOD
__consumer_offsets
GeneralRetailers
Beverages
LifeInsurance
Topic Electricity deleted
Topic Oil-GasProducers_EOD deleted
Topic Chemicals_EOD deleted
Topic Electricity_EOD deleted
Topic Travel-Leisure deleted
Topic FoodProducers deleted
Topic RealEstateInvestment-Services deleted
Topic LifeInsurance_EOD deleted
Topic FoodProducers_EOD deleted
Topic IndustrialMetals-Mining deleted
Topic GeneralRetailers_EOD deleted
Topic Oil-GasProducers deleted
Topic Banks_EOD deleted
Topic Banks d

### Print current topics

In [4]:
# Fetch metadata
metadata = admin_client.list_topics(timeout=10)

# Get and print all topics
topics = metadata.topics
print("Current topics in Kafka cluster:")
for topic in topics:
    print(topic)


Current topics in Kafka cluster:
Electricity
Software-ComputerServices
Gas-Water-Multi-utilities
FoodProducers
__consumer_offsets
RealEstateInvestment-Services
IndustrialMetals-Mining
Oil-GasProducers
Banks
Chemicals
FinancialServices
EOD
Travel-Leisure
GeneralRetailers
Beverages
LifeInsurance


### Create topics

In [10]:
def create_kafka_topic(admin_client, topic_name, num_partitions=1, replication_factor=1):
    topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
    fs = admin_client.create_topics([topic])

    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
            print(f"Topic {topic} created")
        except Exception as e:
            print(f"Failed to create topic {topic}: {e}")

In [17]:
icb_symbol = json.loads(open('assets/icb_symbol.json', 'r').read())

In [18]:
topic_names = list(icb_symbol.keys())
topic_names.append('EOD')

In [19]:
for topic_name in topic_names:
    create_kafka_topic(admin_client, topic_name)

Topic Banks created
Topic Beverages created
Topic Chemicals created
Topic Electricity created
Topic FinancialServices created
Topic FoodProducers created
Topic Gas-Water-Multi-utilities created
Topic GeneralRetailers created
Topic IndustrialMetals-Mining created
Topic LifeInsurance created
Topic Oil-GasProducers created
Topic RealEstateInvestment-Services created
Topic Software-ComputerServices created
Topic Travel-Leisure created
Topic EOD created
