### Exploring Data Generation from File

In [44]:
import pandas as pd
import datetime
import time
from confluent_kafka import Producer

class MaxIterationsException ( Exception) :
    '''Exception thrwon when reaches max iterations limit.'''
    def __init__(self, max_iterations) -> None:
        self.max_iterations = max_iterations

    def __str__(self) -> str:
        return f"Reached max iterations ({self.max_iterations})"

class KafkaProducerFileStreamer:
    '''Wrapper class to turn a file into a stream for Kafka Producer.'''
    def __init__(self, process_handlers, step_frequency:int, accelerator:int = 1) -> None:
        self._process_handlers = process_handlers
        self._step_frequency = step_frequency
        self._running = True
        self._accelerator = accelerator

    def load_file ( self, file_path, time_col = "start_time", value_col = "value"):
        '''Load a file for stream production.'''
        self._df = pd.read_csv(file_path)
        self._time_col = time_col
        self._df [ self._time_col ] = pd.to_datetime( self._df [ self._time_col] )
        self._value_col = value_col

    def stop_stream(self):
        '''Stop the stream.'''
        self._running = False

    def start_stream(self, polling_start_dt:datetime, curr_iteration = 0, max_iterations = None):
        '''Starts Kafka Producer stream.
            params:
            - polling_start_dt - start for polling
            - value_col - name of column for value for Kafka production
            - curr_iteration - offset iteration to start from.
            - max_iterations - number of time iterations to execute.
        '''
        if self._time_col == None:
            raise Exception("load_file needs to be called before starting stream.")

        df = self._df

        while (self._running):
            # Guard Statement:
            if max_iterations and curr_iteration >= max_iterations:
                raise MaxIterationsException(max_iterations)
            
            # Time bound filtering:
            start_time = polling_start_dt + datetime.timedelta(0, curr_iteration * self._step_frequency * self._accelerator)
            end_time = start_time + datetime.timedelta(0, self._step_frequency * self._accelerator)
            
            res_df = df [ 
                (df[self._time_col] >= start_time) & 
                (df[self._time_col] < end_time) 
            ]
            
            print( f"{curr_iteration} -- start: {start_time} to end: {end_time}")
            print (f"Found {len(res_df)} records.")

            # Process Values:
            for indx, row in res_df.iterrows():
                for p in self._process_handlers:
                    p(row[self._value_col])
                
            curr_iteration = curr_iteration + 1
            time.sleep( self._step_frequency )
    

In [45]:
handlers = [
    lambda x: print(f"Processing value: {x}.")
]

producer_stream = KafkaProducerFileStreamer(handlers, 5, accelerator=60 ) # 5 * 60
producer_stream.load_file( 
    "../data/data_stream.csv",
    time_col = "usage_datetime_start_eastern_time",
    value_col = "usage_kw"
)

In [47]:
try:
    producer_stream.start_stream( datetime.datetime(2023,1,13), max_iterations=10 )
except MaxIterationsException as exc:
    print ("Completed max iterations")

0 -- start: 2023-01-13 00:00:00 to end: 2023-01-13 00:05:00
Found 1 records.
Processing value: 820.8.
1 -- start: 2023-01-13 00:05:00 to end: 2023-01-13 00:10:00
Found 1 records.
Processing value: 806.4.
2 -- start: 2023-01-13 00:10:00 to end: 2023-01-13 00:15:00
Found 1 records.
Processing value: 792.0.
3 -- start: 2023-01-13 00:15:00 to end: 2023-01-13 00:20:00
Found 1 records.
Processing value: 763.2.
4 -- start: 2023-01-13 00:20:00 to end: 2023-01-13 00:25:00
Found 1 records.
Processing value: 748.8.
5 -- start: 2023-01-13 00:25:00 to end: 2023-01-13 00:30:00
Found 1 records.
Processing value: 748.8.
6 -- start: 2023-01-13 00:30:00 to end: 2023-01-13 00:35:00
Found 1 records.
Processing value: 756.0.
7 -- start: 2023-01-13 00:35:00 to end: 2023-01-13 00:40:00
Found 1 records.
Processing value: 763.2.
8 -- start: 2023-01-13 00:40:00 to end: 2023-01-13 00:45:00
Found 1 records.
Processing value: 777.6.
9 -- start: 2023-01-13 00:45:00 to end: 2023-01-13 00:50:00
Found 1 records.
Proce