# FIT3182 Major Assignment Part B Task 1a (Event Producer 1)
## George Tan Juan Sheng (30884128)
### Part B Task 1a
#### Write a python program that loads all the data from climate_streaming.csv and randomly (with replacement) feed the data to the stream every 10 seconds. You will need to append additional information such as producer information to identify the producer and created date.

First, we have to connect to our MongoClient and access the collection we have made from Part A (so that we can get the latest date).

In [3]:
import pymongo
from pymongo import MongoClient
from pprint import pprint
import pandas as pd
from datetime import datetime,timedelta

client = MongoClient () 
db = client.fit3182_assignment_db
collection = db.partA

Then, we would have to write our program so that we would be able to feed our data to Kafka.

In [None]:
from time import sleep
from json import dumps
from kafka import KafkaProducer
import random

# Reads data from climate_streaming.csv, puts each row data into a document and appends all such documents into a list.
def read_climate_streaming():
    climate_streaming = pd.read_csv('climate_streaming.csv')
    climate_streaming.rename(columns = {'precipitation ':'precipitation'}, inplace = True)
    climate_streaming['precipitation_flag'] = climate_streaming['precipitation'].str[-1]
    climate_streaming['precipitation'] = climate_streaming['precipitation'].str[0:-1]
    
    data = []
    for index,climateRow in climate_streaming.iterrows():
        document = {}
        document['latitude'] = float(climateRow['latitude'])
        document['longitude'] = float(climateRow['longitude'])
        document['air_temperature_celcius'] = int(climateRow['air_temperature_celcius'])
        document['relative_humidity'] = float(climateRow['relative_humidity'])
        document['windspeed_knots'] = float(climateRow['windspeed_knots'])
        document['max_wind_speed'] = float(climateRow['max_wind_speed'])
        document['precipitation'] = float(climateRow['precipitation'].strip()) #Remove any leading and trailing spaces for precipitation data
        document['precipitation_flag'] = climateRow['precipitation_flag'].strip()
        document['GHI_w/m2'] = int(climateRow['GHI_w/m2'])
        data.append(document)
    
    return data

# Gets the latest date in our collection
def get_latest_date():
    latest_date = collection.aggregate([
                {"$sort":{"date":-1}},
                {"$project":{"_id":0,"date":1}},
                {"$limit":1}
                ])
    for document in latest_date:
        latest_date = document['date']
    return latest_date
    

# Publishes message to Kafka
def publish_message(producer_instance, topic_name, data):
    try:
        producer_instance.send(topic_name, value=data)
        print('Message published successfully. Data: ' + str(data))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))
        
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                                  value_serializer=lambda x: dumps(x).encode('ascii'),
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer
    
if __name__ == '__main__':
   
    topic = 'PartB'
    print('Publishing records..')
    producer01 = connect_kafka_producer()
    data = read_climate_streaming() # Gets all the documents produced from climate_streaming.csv
    latest_date = get_latest_date() + timedelta(days=1) # After getting latest date, we would add one day to it to get the first date we would use to start feeding data
    daysPassed = 0  # Tracks how many days we should add to our latest date 

    while True:
        chosenData = random.choice(data) # Randomly chooses a document from our list of documents
        curr_date = latest_date + timedelta(days=daysPassed) # Creates the date we will use to feed our data by adding the number of days passed to our latest date from Part A
        chosenData['producer'] = "climate_streaming" 
        chosenData["created_date"] = curr_date.strftime("%d/%m/%Y")
        publish_message(producer01, topic, chosenData)
        daysPassed += 1 # After we insert a climate streaming data, add 1 day to daysPassed so the next date we would use to feed our data would be incremented by 1 day
        sleep(10)



Publishing records..
Message published successfully. Data: {'latitude': -37.641999999999996, 'longitude': 149.263, 'air_temperature_celcius': 20, 'relative_humidity': 55.8, 'windspeed_knots': 10.5, 'max_wind_speed': 15.9, 'precipitation': 0.01, 'precipitation_flag': 'G', 'GHI_w/m2': 163, 'producer': 'climate_streaming', 'created_date': '01/01/2022'}
Message published successfully. Data: {'latitude': -36.779, 'longitude': 146.108, 'air_temperature_celcius': 13, 'relative_humidity': 42.0, 'windspeed_knots': 11.4, 'max_wind_speed': 16.9, 'precipitation': 0.0, 'precipitation_flag': 'G', 'GHI_w/m2': 119, 'producer': 'climate_streaming', 'created_date': '02/01/2022'}
Message published successfully. Data: {'latitude': -37.375, 'longitude': 148.063, 'air_temperature_celcius': 10, 'relative_humidity': 43.5, 'windspeed_knots': 12.0, 'max_wind_speed': 16.9, 'precipitation': 0.04, 'precipitation_flag': 'G', 'GHI_w/m2': 90, 'producer': 'climate_streaming', 'created_date': '03/01/2022'}
Message publ