# Create the Kafka topics using python-kafka

In the following code blocks, I create **Kafka Topics** using the library ```kafka-python```

In [32]:
from kafka.admin import KafkaAdminClient, NewTopic
import uuid

admin_client = KafkaAdminClient(
    bootstrap_servers = 'localhost:9092',
    client_id = 'climatechange'
)

In [33]:
topic_list = [
    NewTopic(name='ghg_data', num_partitions=1, replication_factor=1),
    NewTopic(name='temperature', num_partitions=1, replication_factor=1)
]

admin_client.create_topics(new_topics=topic_list, validate_only=False)

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='ghg_data', error_code=0, error_message=None), (topic='temperature', error_code=0, error_message=None)])

In [34]:
from kafka import KafkaProducer
from csv import DictReader
import json

# Producers

In [35]:
uuid.uuid4().bytes

b'w\r\x8cJ_\xc9L#\x80t^\xcb\x96+k7'

In [36]:
with open('../Data/historical_emissions_cleaned.csv') as read_csv:
    csv_obj = DictReader(read_csv)
    for row in csv_obj:
        print(row)

{'Country': 'Afghanistan', 'Year': '1990', 'Value': '9.58', 'Pollutant': 'GHG', 'Sector': 'Total including LUCF'}
{'Country': 'Afghanistan', 'Year': '1991', 'Value': '9.81', 'Pollutant': 'GHG', 'Sector': 'Total including LUCF'}
{'Country': 'Afghanistan', 'Year': '1992', 'Value': '9.03', 'Pollutant': 'GHG', 'Sector': 'Total including LUCF'}
{'Country': 'Afghanistan', 'Year': '1993', 'Value': '9.11', 'Pollutant': 'GHG', 'Sector': 'Total including LUCF'}
{'Country': 'Afghanistan', 'Year': '1994', 'Value': '9.15', 'Pollutant': 'GHG', 'Sector': 'Total including LUCF'}
{'Country': 'Afghanistan', 'Year': '1995', 'Value': '9.58', 'Pollutant': 'GHG', 'Sector': 'Total including LUCF'}
{'Country': 'Afghanistan', 'Year': '1996', 'Value': '10.61', 'Pollutant': 'GHG', 'Sector': 'Total including LUCF'}
{'Country': 'Afghanistan', 'Year': '1997', 'Value': '11.58', 'Pollutant': 'GHG', 'Sector': 'Total including LUCF'}
{'Country': 'Afghanistan', 'Year': '1998', 'Value': '12.4', 'Pollutant': 'GHG', 'Secto

In [37]:
# We create the producer here
producer = KafkaProducer(bootstrap_servers = ['localhost:9092'])

with open('../Data/historical_emissions_cleaned.csv') as read_csv:
    csv_obj = DictReader(read_csv)
    for row in csv_obj:
        ack = producer.send(topic='ghg_data',value=json.dumps(row).encode('utf-8'), key=uuid.uuid4().bytes)
        metadata = ack.get()
        print(metadata.topic, metadata.partition)

ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0
ghg_data 0

In [38]:
with open('../Data/temperature.csv') as read_csv:
    csv_obj = DictReader(read_csv)
    for row in csv_obj:
        print(row)

{'REF_AREA': 'Yemen', 'Measure': 'TEMP_C: Annual temperature change', 'UNIT_MEASURE': 'C: Degrees celsius', 'TIME_PERIOD': '1979', 'OBS_VALUE': '-0.55', 'REF_CODE': 'YEM'}
{'REF_AREA': 'Yemen', 'Measure': 'TEMP_C: Annual temperature change', 'UNIT_MEASURE': 'C: Degrees celsius', 'TIME_PERIOD': '1980', 'OBS_VALUE': '-0.17', 'REF_CODE': 'YEM'}
{'REF_AREA': 'Yemen', 'Measure': 'TEMP_C: Annual temperature change', 'UNIT_MEASURE': 'C: Degrees celsius', 'TIME_PERIOD': '1981', 'OBS_VALUE': '-0.48', 'REF_CODE': 'YEM'}
{'REF_AREA': 'Yemen', 'Measure': 'TEMP_C: Annual temperature change', 'UNIT_MEASURE': 'C: Degrees celsius', 'TIME_PERIOD': '1982', 'OBS_VALUE': '-0.53', 'REF_CODE': 'YEM'}
{'REF_AREA': 'Yemen', 'Measure': 'TEMP_C: Annual temperature change', 'UNIT_MEASURE': 'C: Degrees celsius', 'TIME_PERIOD': '1983', 'OBS_VALUE': '-0.89', 'REF_CODE': 'YEM'}
{'REF_AREA': 'Yemen', 'Measure': 'TEMP_C: Annual temperature change', 'UNIT_MEASURE': 'C: Degrees celsius', 'TIME_PERIOD': '1984', 'OBS_VALU

In [39]:
# We create the producer here
producer = KafkaProducer(bootstrap_servers = ['localhost:9092'])

with open('../Data/temperature.csv') as read_csv:
    csv_obj = DictReader(read_csv)
    for row in csv_obj:
        ack = producer.send(topic='temperature',value=json.dumps(row).encode('utf-8'), key=uuid.uuid4().bytes)
        metadata = ack.get()
        print(metadata.topic, metadata.partition)

temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temperature 0
temper

# Consumers

In [40]:
# from kafka.admin import KafkaAdminClient, NewTopic
# import uuid

# admin_client = KafkaAdminClient(
#     bootstrap_servers = 'localhost:9092',
#     client_id = 'climatechange'
# )
# admin_client.delete_topics(topics=['temperature', 'ghg_data'])