References:
https://www.w3schools.com/python/pandas/ref_df_iterrows.asp

In [2]:
# import statements
from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
import pandas as pd
import datetime as dt
import json

def read_file():
    #Read csv file using pandas
    climate = pd.read_csv("climate_streaming.csv")
    array = []
    #iterrows is useful that it generates iterator of the Dataframe
    #we call the variables easily by using the string index
    for index, each in climate.iterrows():
        row={}
        row['latitude'] = round(float(each['latitude']), 4)    #Round up to 4 decimal number
        row['longitude'] = round(float(each['longitude']), 4)  #Round up to 4 decimal number
        row['air_temperature_celcius'] = float(each['air_temperature_celcius'])
        row['relative_humidity'] = float(each['relative_humidity'])
        row['windspeed_knots'] = float(each['windspeed_knots'])
        row['max_wind_speed'] = float(each['max_wind_speed'])
        row['precipitation'] = float((each['precipitation '])[0:-1])   #get the numeric value from precipitation
        row['precipitation_type'] = each['precipitation '][-1]         #get the alphabet from the last digit 
        row['GHI_w'] = (each['GHI_w/m2'])    #rename the column 
        array.append(row)
    return array

def publish_message(producer_instance, topic_name, key, value):
    try:
        #json dumps convert the list into JSON string object
        key_bytes = bytes(key, encoding='utf-8')
        value_str = json.dumps(value, sort_keys=True, indent=4)
        value_bytes = bytes(value_str, encoding='utf-8')
        producer_instance.send(topic_name, key = key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully. Data: ' + str(value))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))

def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer

if __name__ == '__main__':

    topic = 'Asgn_PartB'
    data = read_file()
    print('Publishing records..')
    producer = connect_kafka_producer()
    latest_date = dt.datetime(2021, 12, 31) #Last date from historic CSV

    for _ in range(len(data)):
        random_number = random.randrange(0, len(data))
        random_data = data[random_number]
        #print(random_data)
        latest_date += dt.timedelta(days=1)                   # Increase one day to the latest date
        random_data['created_date'] = latest_date.isoformat() # Set date to string

        #key = 'producer1' which to be identified from consumers
        publish_message(producer, topic, "producer1", random_data)
        sleep(10)

Publishing records..
Message published successfully. Data: {'latitude': -38.498, 'longitude': 146.95, 'air_temperature_celcius': 5.0, 'relative_humidity': 38.6, 'windspeed_knots': 1.8, 'max_wind_speed': 5.1, 'precipitation': 0.0, 'precipitation_type': 'I', 'GHI_w': 47, 'created_date': '2022-01-01T00:00:00'}
Message published successfully. Data: {'latitude': -37.614, 'longitude': 143.451, 'air_temperature_celcius': 19.0, 'relative_humidity': 60.2, 'windspeed_knots': 6.4, 'max_wind_speed': 12.0, 'precipitation': 0.0, 'precipitation_type': 'I', 'GHI_w': 149, 'created_date': '2022-01-02T00:00:00'}


KeyboardInterrupt: 