In [1]:
# import statements
from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
from pprint import pprint
from datetime import datetime, timedelta

In [2]:
def read_climate_streaming_data():
    climate_streaming_data_lst=[]
    with open('climate_streaming.csv') as read_file:
        read_file.readline() # for skipping header
        climate=read_file.readlines()
        for c in climate:
             # create dummy doc
            tmp_doc={
            "latitude":0.0,
            "date":"27/12/2021",
            "longitude":0.0,
            "air_temperature_celcius":0,
            "relative_humidity":0.0,
            "windspeed_knots":0.0,
            "max_wind_speed":0.0,
            "precipitation":0.00,
            "preciptation_form":"G",
            "GHI_w/m2":0,   
            "key":"climate",
            "hotspot":[],
            }
            
            # updating tmp_doc
            lst=c.split(',')
            tmp_doc["latitude"]=float(lst[0])
            tmp_doc["longitude"]=float(lst[1])
            tmp_doc["air_temperature_celcius"]=int(lst[2])
            tmp_doc["relative_humidity"]=float(lst[3])
            tmp_doc["windspeed_knots"]=float(lst[4])
            tmp_doc["max_wind_speed"]=float(lst[5])
            tmp_doc["precipitation"]=float(lst[6][:-1])
            tmp_doc["preciptation_form"]=lst[6][-1]
            #print(tmp_doc['precipitation'])
            tmp_doc["GHI_w/m2"]=lst[7].split("\n")[0]
            
            # update climate_streaming_data_lst
            climate_streaming_data_lst.append(tmp_doc)
    

    return climate_streaming_data_lst

In [3]:
print(len(read_climate_streaming_data()))

366


In [4]:
def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully. Data: ' + str(value) + '\n')
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))

In [5]:
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer

In [6]:
def get_latest_date():
    date_lst=[]
    with open('climate_historic.csv') as read_file:
        read_file.readline() # for skipping header
        climate=read_file.readlines()
        for c in climate:
            c=c.split(',')
            date_lst.append(datetime.strptime(c[1], '%d/%m/%y').date())
    return max(date_lst), len(climate)

In [7]:
if __name__ == '__main__':
   
    topic = 'Assgn'
    
    #topic='PartB'
    
    # read csv file from climate data
    climate_streaming_data_lst=read_climate_streaming_data()
    
    # finding the latest date in climate_historic.csv file
    date_val, size=get_latest_date()
    
    
    print('Publishing records..')
    producer_climate = connect_kafka_producer()
    
    # create a random list of index ranging from 0 to 365 to help us randomly access a different
    # climate data at each iteration
    random_lst=random.sample(range(0, 365), 365)
    
    
    random_index=0
    for e in range(366):
        
        # treat 10 sec different as a day difference
        date_val=date_val + timedelta(days=1)
        
        
        
        # update climate_streaming_data_lst's data attribute 
        # at random_lst[random_index] pos
        climate_streaming_data_lst[random_lst[random_index]]['date']=str(date_val)
        
        
        # assign the updated position record to data
        data=climate_streaming_data_lst[random_lst[random_index]]
        
        
        # publish message
        publish_message(producer_climate, topic, 'climate', dumps(data))
        
        #increment random index
        random_index+=1
        
        # sleep of 10 seconds to simulate real-time streaming data
        sleep(10)
        
        #sleep(1)

Publishing records..
Message published successfully. Data: {"latitude": -36.6859, "date": "2022-01-01", "longitude": 146.8907, "air_temperature_celcius": 16, "relative_humidity": 51.5, "windspeed_knots": 8.7, "max_wind_speed": 15.0, "precipitation": 0.02, "preciptation_form": "G", "GHI_w/m2": "135", "key": "climate", "hotspot": []}

Message published successfully. Data: {"latitude": -37.376, "date": "2022-01-02", "longitude": 148.042, "air_temperature_celcius": 11, "relative_humidity": 43.2, "windspeed_knots": 5.5, "max_wind_speed": 8.0, "precipitation": 0.0, "preciptation_form": "G", "GHI_w/m2": "100", "key": "climate", "hotspot": []}

Message published successfully. Data: {"latitude": -35.963, "date": "2022-01-03", "longitude": 141.078, "air_temperature_celcius": 5, "relative_humidity": 33.1, "windspeed_knots": 5.8, "max_wind_speed": 14.0, "precipitation": 0.0, "preciptation_form": "I", "GHI_w/m2": "49", "key": "climate", "hotspot": []}

Message published successfully. Data: {"latitu

Message published successfully. Data: {"latitude": -36.4553, "date": "2022-01-28", "longitude": 142.8786, "air_temperature_celcius": 19, "relative_humidity": 55.3, "windspeed_knots": 6.2, "max_wind_speed": 12.0, "precipitation": 0.0, "preciptation_form": "I", "GHI_w/m2": "156", "key": "climate", "hotspot": []}

Message published successfully. Data: {"latitude": -37.0899, "date": "2022-01-29", "longitude": 141.0238, "air_temperature_celcius": 9, "relative_humidity": 42.2, "windspeed_knots": 6.4, "max_wind_speed": 9.9, "precipitation": 0.01, "preciptation_form": "G", "GHI_w/m2": "82", "key": "climate", "hotspot": []}

Message published successfully. Data: {"latitude": -37.456, "date": "2022-01-30", "longitude": 148.11, "air_temperature_celcius": 10, "relative_humidity": 47.9, "windspeed_knots": 4.1, "max_wind_speed": 7.0, "precipitation": 0.0, "preciptation_form": "I", "GHI_w/m2": "87", "key": "climate", "hotspot": []}

Message published successfully. Data: {"latitude": -37.478, "date": 

Message published successfully. Data: {"latitude": -37.7052, "date": "2022-02-24", "longitude": 144.6926, "air_temperature_celcius": 17, "relative_humidity": 52.7, "windspeed_knots": 7.7, "max_wind_speed": 14.0, "precipitation": 0.0, "preciptation_form": "G", "GHI_w/m2": "142", "key": "climate", "hotspot": []}

Message published successfully. Data: {"latitude": -38.226, "date": "2022-02-25", "longitude": 147.167, "air_temperature_celcius": 10, "relative_humidity": 43.7, "windspeed_knots": 7.2, "max_wind_speed": 11.1, "precipitation": 0.0, "preciptation_form": "I", "GHI_w/m2": "90", "key": "climate", "hotspot": []}

Message published successfully. Data: {"latitude": -36.7685, "date": "2022-02-26", "longitude": 142.7134, "air_temperature_celcius": 14, "relative_humidity": 48.2, "windspeed_knots": 12.5, "max_wind_speed": 19.0, "precipitation": 0.03, "preciptation_form": "G", "GHI_w/m2": "122", "key": "climate", "hotspot": []}

Message published successfully. Data: {"latitude": -37.614, "d

KeyboardInterrupt: 