In [None]:
from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
import datetime as dt
import pandas as pd
import numpy as np


 
def publish_message(producer_instance, topic_name, key, data):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=data)
        producer_instance.flush()
        print('Message published successfully. ' + str(data))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))
        
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],
                                  value_serializer=lambda x:dumps(x).encode('ascii'),
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer
    
if __name__ == '__main__':
   
    topic = 'fire'
    print('Publishing records..')
    producer1 = connect_kafka_producer()
    
    for e in range(10000):
        df = pd.read_csv("climate_streaming.csv")
        df_random = dict(df.loc[np.random.choice(df.index)])
        df_random['sender_id'] = 'producer1'
        df_random['created_time'] = dt.datetime.now().strftime("%X")
        df_random['air_temperature_celcius']=float(df_random['air_temperature_celcius'])
        data = df_random
        publish_message(producer1, topic, 'jsondata', data)
        sleep(5)

Publishing records..
Message published successfully. {'latitude': -36.942, 'longitude': 143.292, 'air_temperature_celcius': 18.0, 'relative_humidity': 51.1, 'windspeed_knots': 8.1, 'max_wind_speed': 15.0, 'precipitation ': ' 0.00I', 'sender_id': 'producer1', 'created_time': '21:12:56'}
Message published successfully. {'latitude': -36.575, 'longitude': 146.6668, 'air_temperature_celcius': 18.0, 'relative_humidity': 53.6, 'windspeed_knots': 7.9, 'max_wind_speed': 15.9, 'precipitation ': ' 0.00G', 'sender_id': 'producer1', 'created_time': '21:13:01'}
Message published successfully. {'latitude': -37.466, 'longitude': 148.143, 'air_temperature_celcius': 17.0, 'relative_humidity': 48.7, 'windspeed_knots': 10.5, 'max_wind_speed': 15.0, 'precipitation ': ' 0.00I', 'sender_id': 'producer1', 'created_time': '21:13:06'}
Message published successfully. {'latitude': -37.457, 'longitude': 148.143, 'air_temperature_celcius': 9.0, 'relative_humidity': 39.0, 'windspeed_knots': 5.6, 'max_wind_speed': 8.

Message published successfully. {'latitude': -37.375, 'longitude': 148.054, 'air_temperature_celcius': 12.0, 'relative_humidity': 47.0, 'windspeed_knots': 4.8, 'max_wind_speed': 8.9, 'precipitation ': ' 0.00I', 'sender_id': 'producer1', 'created_time': '21:15:31'}
Message published successfully. {'latitude': -36.4025, 'longitude': 142.5598, 'air_temperature_celcius': 21.0, 'relative_humidity': 57.4, 'windspeed_knots': 10.7, 'max_wind_speed': 22.9, 'precipitation ': ' 0.67G', 'sender_id': 'producer1', 'created_time': '21:15:37'}
Message published successfully. {'latitude': -37.368, 'longitude': 148.05, 'air_temperature_celcius': 10.0, 'relative_humidity': 41.4, 'windspeed_knots': 9.4, 'max_wind_speed': 14.0, 'precipitation ': ' 0.00I', 'sender_id': 'producer1', 'created_time': '21:15:42'}
Message published successfully. {'latitude': -37.391, 'longitude': 148.066, 'air_temperature_celcius': 14.0, 'relative_humidity': 44.6, 'windspeed_knots': 7.7, 'max_wind_speed': 15.9, 'precipitation ':

Message published successfully. {'latitude': -36.5775, 'longitude': 142.6076, 'air_temperature_celcius': 19.0, 'relative_humidity': 58.7, 'windspeed_knots': 5.5, 'max_wind_speed': 12.0, 'precipitation ': ' 0.28G', 'sender_id': 'producer1', 'created_time': '21:18:07'}
Message published successfully. {'latitude': -36.942, 'longitude': 143.292, 'air_temperature_celcius': 18.0, 'relative_humidity': 51.1, 'windspeed_knots': 8.1, 'max_wind_speed': 15.0, 'precipitation ': ' 0.00I', 'sender_id': 'producer1', 'created_time': '21:18:12'}
Message published successfully. {'latitude': -36.851, 'longitude': 148.11700000000002, 'air_temperature_celcius': 9.0, 'relative_humidity': 45.3, 'windspeed_knots': 2.5, 'max_wind_speed': 6.0, 'precipitation ': ' 0.00G', 'sender_id': 'producer1', 'created_time': '21:18:17'}
Message published successfully. {'latitude': -37.946, 'longitude': 144.342, 'air_temperature_celcius': 21.0, 'relative_humidity': 54.9, 'windspeed_knots': 7.4, 'max_wind_speed': 16.9, 'precip

Message published successfully. {'latitude': -36.836999999999996, 'longitude': 142.0391, 'air_temperature_celcius': 19.0, 'relative_humidity': 55.8, 'windspeed_knots': 6.0, 'max_wind_speed': 11.1, 'precipitation ': ' 0.00I', 'sender_id': 'producer1', 'created_time': '21:20:42'}
Message published successfully. {'latitude': -37.608000000000004, 'longitude': 149.282, 'air_temperature_celcius': 22.0, 'relative_humidity': 62.7, 'windspeed_knots': 7.6, 'max_wind_speed': 18.1, 'precipitation ': ' 0.00I', 'sender_id': 'producer1', 'created_time': '21:20:47'}
Message published successfully. {'latitude': -36.6516, 'longitude': 141.4943, 'air_temperature_celcius': 11.0, 'relative_humidity': 45.3, 'windspeed_knots': 8.5, 'max_wind_speed': 15.0, 'precipitation ': ' 0.00G', 'sender_id': 'producer1', 'created_time': '21:20:52'}
Message published successfully. {'latitude': -37.946, 'longitude': 144.342, 'air_temperature_celcius': 21.0, 'relative_humidity': 54.9, 'windspeed_knots': 7.4, 'max_wind_speed

Message published successfully. {'latitude': -37.621, 'longitude': 143.447, 'air_temperature_celcius': 23.0, 'relative_humidity': 53.4, 'windspeed_knots': 6.1, 'max_wind_speed': 11.1, 'precipitation ': ' 0.00I', 'sender_id': 'producer1', 'created_time': '21:23:18'}
Message published successfully. {'latitude': -37.375, 'longitude': 148.063, 'air_temperature_celcius': 10.0, 'relative_humidity': 43.5, 'windspeed_knots': 12.0, 'max_wind_speed': 16.9, 'precipitation ': ' 0.04G', 'sender_id': 'producer1', 'created_time': '21:23:23'}
Message published successfully. {'latitude': -36.358000000000004, 'longitude': 143.113, 'air_temperature_celcius': 21.0, 'relative_humidity': 58.8, 'windspeed_knots': 8.8, 'max_wind_speed': 22.9, 'precipitation ': ' 0.08G', 'sender_id': 'producer1', 'created_time': '21:23:28'}
Message published successfully. {'latitude': -36.5871, 'longitude': 144.961, 'air_temperature_celcius': 13.0, 'relative_humidity': 43.6, 'windspeed_knots': 9.0, 'max_wind_speed': 15.0, 'pre

Message published successfully. {'latitude': -37.485, 'longitude': 148.095, 'air_temperature_celcius': 11.0, 'relative_humidity': 45.4, 'windspeed_knots': 5.2, 'max_wind_speed': 8.9, 'precipitation ': ' 0.00A', 'sender_id': 'producer1', 'created_time': '21:25:53'}
Message published successfully. {'latitude': -37.453, 'longitude': 148.099, 'air_temperature_celcius': 10.0, 'relative_humidity': 45.7, 'windspeed_knots': 3.6, 'max_wind_speed': 7.0, 'precipitation ': ' 0.01G', 'sender_id': 'producer1', 'created_time': '21:25:58'}
Message published successfully. {'latitude': -37.453, 'longitude': 148.111, 'air_temperature_celcius': 11.0, 'relative_humidity': 45.3, 'windspeed_knots': 10.6, 'max_wind_speed': 16.9, 'precipitation ': ' 0.08G', 'sender_id': 'producer1', 'created_time': '21:26:03'}
Message published successfully. {'latitude': -36.1462, 'longitude': 145.2096, 'air_temperature_celcius': 10.0, 'relative_humidity': 43.6, 'windspeed_knots': 9.7, 'max_wind_speed': 14.0, 'precipitation ':