## FIT3182 - Assignment 2
---
### Part B - Producer 1

**Information:**
- Filename: Assignment_PartB_Producer1.ipynb
- Student Name: Nicholas Mandylas
- Student Number: 27840328
- Student Email: nman48@student.monash.edu

In [4]:
from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
import datetime as dt
import pandas

# Reading data from CSV
def readCSV():
    climate_streaming_data = pandas.read_csv('climate_streaming.csv') # Get data from CSV
    streaming_data = []
    for _, row in climate_streaming_data.iterrows(): # Iterate through each row in the CSV
        data_point = {} # Create dictionary & create key for each item from CSV data.
        data_point['latitude'] = float(row['latitude'])
        data_point['longitude'] = float(row['longitude'])
        data_point['air_temperature_celcius'] = float(
            row['air_temperature_celcius'])
        data_point['relative_humidity'] = float(row['relative_humidity'])
        data_point['windspeed_knots'] = float(row['windspeed_knots'])
        data_point['max_wind_speed'] = float(row['max_wind_speed'])

        # Unncessary space at beginning of value is removed.
        # We also split precipation type and amount, to make it easier for sorting/searching later.
        precipitation = str(row['precipitation ']).replace(" ", "")
        data_point['precipitation_type'] = precipitation[-1]
        data_point['precipitation'] = float(precipitation[0:-1])

        data_point['ghi'] = float(row['GHI_w/m2'])

        streaming_data.append(data_point)

    return streaming_data


def publish_message(producer_instance, topic_name, data):
    try:
        producer_instance.send(topic_name, value=data)
        producer_instance.flush()
        print('Message published successfully. Data: ' + str(data))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))


def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer( # Added serializer on the producer, which will automatically serialize to JSON string format.
            bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('ascii'), api_version=(0, 10)) 
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer


if __name__ == '__main__':

    data = readCSV()
    topic = 'Climate'
    producer = connect_kafka_producer()
    created_date = dt.datetime(2018, 12, 31) # Last date from historic CSV

    while True:
        random_number = random.randrange(0, len(data))
        selected_data = data[random_number] # Pick a random climate data point.
        created_date += dt.timedelta(days=1) # Increase date from previous date.
        selected_data['created_date'] = created_date.isoformat() # Set date to string format (to be stored in JSON)
        selected_data['producer_id'] = 'producer_climate'

        publish_message(producer, topic, selected_data) # Publish message

        sleep(10)


Message published successfully. Data: {'latitude': -37.415, 'longitude': 148.105, 'air_temperature_celcius': 12.0, 'relative_humidity': 47.0, 'windspeed_knots': 7.7, 'max_wind_speed': 15.0, 'precipitation_type': 'G', 'precipitation': 0.08, 'ghi': 105.0, 'created_date': '2019-01-01T00:00:00', 'producer_id': 'producer_climate'}
Message published successfully. Data: {'latitude': -37.438, 'longitude': 148.09, 'air_temperature_celcius': 10.0, 'relative_humidity': 44.7, 'windspeed_knots': 8.0, 'max_wind_speed': 15.9, 'precipitation_type': 'G', 'precipitation': 0.08, 'ghi': 90.0, 'created_date': '2019-01-02T00:00:00', 'producer_id': 'producer_climate'}
Message published successfully. Data: {'latitude': -37.605, 'longitude': 149.308, 'air_temperature_celcius': 13.0, 'relative_humidity': 48.5, 'windspeed_knots': 10.7, 'max_wind_speed': 14.0, 'precipitation_type': 'G', 'precipitation': 0.71, 'ghi': 113.0, 'created_date': '2019-01-03T00:00:00', 'producer_id': 'producer_climate'}
Message published

KeyboardInterrupt: 