In [9]:
#produce (key, value) where key identifies the producer and value is a piece of data from streaming file
# (producer1, row_values)
#each producer sends data to the same topic but with a unique key

In [10]:
# import statements
from time import sleep
from json import dumps
from json import loads
from kafka import KafkaProducer
import random
import datetime as dt
import pandas as pd

# Producer 1
This program writes records to Kafka topic `assignment`. Each record has `key = "parsed"` and value equal to a (key, value pair) where the key identifies the producer and the value contains a dictionary object with the data for a random row. THis random row is decided by choosing a random integer in the interval $[0, df.shape[0])$ where the enpoint refers to the number of rows in the dataset.

### Reading in climate streaming data and creating a function to extract row values

In [11]:
climate_data = pd.read_csv("climate_streaming.csv")

In [12]:
climate_data.head()

Unnamed: 0,latitude,longitude,air_temperature_celcius,relative_humidity,windspeed_knots,max_wind_speed,precipitation,GHI_w/m2
0,-37.623,149.323,19,56.8,7.9,11.1,0.00I,154
1,-38.038,142.986,15,50.7,9.2,13.0,0.02G,128
2,-37.95,142.366,16,53.6,8.1,15.0,0.00G,133
3,-38.231,147.172,24,61.6,7.7,14.0,0.00I,186
4,-37.903,145.25,24,62.3,7.0,13.0,0.00I,185


In [13]:
#this function takes in a dataframe and a row number and returns a dictionary variable containing the row data
#for the given dataframe in the required data model format. The station is set to a constant value since it is not 
#a variable in the climate_streaming dataset and date 

#date needs to be converted to a string otherwise the json dumps function does not work

def extract_row_data(data, row_number, date, producer_id="producer1"):
    #convert row_data to a string with single quotes on either side so that it can be converted back to 
    #a json object using the loads funcion from the json library
        row_data = {"station": int(1),
                "date": date.strftime("%d/%m/%Y"),
                "air_temperature_celcius": int(data.iloc[row_number]["air_temperature_celcius"]),
                "relative_humidity": data.iloc[row_number]["relative_humidity"],
                "windspeed_knots": data.iloc[row_number]["windspeed_knots"],
                "max_wind_speed": data.iloc[row_number]["max_wind_speed"],
                "precipitation": data.iloc[row_number]["precipitation "],
                "GHI_w/m2": int(data.iloc[row_number]["GHI_w/m2"]),
                "latitude" : data.iloc[row_number]["latitude"],
                "longitude" : data.iloc[row_number]["longitude"]}
        

        
        return [producer_id, row_data]

In [14]:
def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully. Data: ' + str(data))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))
        
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer
    
if __name__ == '__main__':
   
    topic = 'assignment'
    
    print('Publishing records..')
    producer = connect_kafka_producer()
    
    #latest date in climate_historic (extracted at the end of Part A)
    date = dt.datetime(2021, 12, 31, 0, 0)
    
    for e in range(100):
        
        #increase the previous date by 1 day
        date += dt.timedelta(days=1)
        
        #select row_number randomly from the list of all rows
        row_number = random.randrange(0, climate_data.shape[0])
        
        #get our data
        data = extract_row_data(climate_data, row_number, date, producer_id="producer1")
        
        #convert data to a string using the json dumps function
        data = dumps(data)
        
        publish_message(producer, topic, 'parsed', data)
        sleep(10)

Publishing records..
Message published successfully. Data: ["producer1", {"station": 1, "date": "01/01/2022", "air_temperature_celcius": 8, "relative_humidity": 42.6, "windspeed_knots": 2.0, "max_wind_speed": 6.0, "precipitation": " 0.00I", "GHI_w/m2": 73, "latitude": -37.477, "longitude": 148.097}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "02/01/2022", "air_temperature_celcius": 8, "relative_humidity": 43.3, "windspeed_knots": 3.9, "max_wind_speed": 8.0, "precipitation": " 0.00I", "GHI_w/m2": 72, "latitude": -37.385999999999996, "longitude": 148.043}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "03/01/2022", "air_temperature_celcius": 14, "relative_humidity": 41.7, "windspeed_knots": 12.3, "max_wind_speed": 18.1, "precipitation": " 0.00G", "GHI_w/m2": 128, "latitude": -36.9364, "longitude": 143.4996}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "04/01/2022", "air_temperature_celcius": 16, "

Message published successfully. Data: ["producer1", {"station": 1, "date": "30/01/2022", "air_temperature_celcius": 10, "relative_humidity": 43.5, "windspeed_knots": 12.0, "max_wind_speed": 16.9, "precipitation": " 0.04G", "GHI_w/m2": 90, "latitude": -37.375, "longitude": 148.063}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "31/01/2022", "air_temperature_celcius": 24, "relative_humidity": 51.8, "windspeed_knots": 7.9, "max_wind_speed": 15.0, "precipitation": " 0.00I", "GHI_w/m2": 203, "latitude": -36.5794, "longitude": 142.5959}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "01/02/2022", "air_temperature_celcius": 13, "relative_humidity": 46.2, "windspeed_knots": 5.6, "max_wind_speed": 12.0, "precipitation": " 0.00I", "GHI_w/m2": 115, "latitude": -36.6511, "longitude": 143.915}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "02/02/2022", "air_temperature_celcius": 8, "relative_humidity": 41.0, "w

Message published successfully. Data: ["producer1", {"station": 1, "date": "28/02/2022", "air_temperature_celcius": 17, "relative_humidity": 52.0, "windspeed_knots": 8.8, "max_wind_speed": 15.0, "precipitation": " 0.00I", "GHI_w/m2": 143, "latitude": -37.5915, "longitude": 143.0015}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "01/03/2022", "air_temperature_celcius": 8, "relative_humidity": 38.6, "windspeed_knots": 12.8, "max_wind_speed": 18.1, "precipitation": " 0.31G", "GHI_w/m2": 75, "latitude": -36.5548, "longitude": 142.5237}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "02/03/2022", "air_temperature_celcius": 18, "relative_humidity": 53.3, "windspeed_knots": 7.9, "max_wind_speed": 14.0, "precipitation": " 0.00I", "GHI_w/m2": 150, "latitude": -36.275, "longitude": 146.154}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "03/03/2022", "air_temperature_celcius": 12, "relative_humidity": 47.2, "

Message published successfully. Data: ["producer1", {"station": 1, "date": "29/03/2022", "air_temperature_celcius": 13, "relative_humidity": 50.7, "windspeed_knots": 6.1, "max_wind_speed": 13.0, "precipitation": " 0.00I", "GHI_w/m2": 111, "latitude": -37.4437, "longitude": 143.4924}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "30/03/2022", "air_temperature_celcius": 13, "relative_humidity": 50.1, "windspeed_knots": 8.5, "max_wind_speed": 12.0, "precipitation": " 0.08G", "GHI_w/m2": 111, "latitude": -36.369, "longitude": 143.7132}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "31/03/2022", "air_temperature_celcius": 16, "relative_humidity": 53.6, "windspeed_knots": 8.1, "max_wind_speed": 15.0, "precipitation": " 0.00G", "GHI_w/m2": 133, "latitude": -37.95, "longitude": 142.366}]
Message published successfully. Data: ["producer1", {"station": 1, "date": "01/04/2022", "air_temperature_celcius": 21, "relative_humidity": 47.0, "w