In [2]:
pip install kafka-python

Note: you may need to restart the kernel to use updated packages.


## Import Librairies

In [1]:
#producer
from time import sleep
from json import dumps, loads
from kafka import KafkaProducer, KafkaConsumer
from time import sleep
from json import dumps
import pandas as pd
import csv
import shutil
import time
from os import path, listdir, makedirs

## Function to handle Path creation

In [2]:
def path_creator(folder_path):
    """Create a new directory because it does not exist."""
    if not path.exists(folder_path):
            makedirs(folder_path)

## Kafka producer

In [3]:
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

### Initialize paths for data processed by Kafka

In [4]:
# Set the filename with path
source_path = "../Fake_Data/events/"
processed_data_path = "../Fake_Data/processed_data/"
consumer_destination_path = "../Fake_Data/consumer_sink_folder/"

### Some useful functions for the producer

In [5]:
def file_mover(source_path, processed_data_path, file_name):
    '''When a file is processed by Kafka producer,
    move it in a processed folder in order not to treat it multiple times 
    and so not write multiple times the same data at destination (local or HDFS)'''
    path_creator(processed_data_path)
    if path.exists(source_path+file_name):
        # Set the directory path where the file will be moved
        destination_path = processed_data_path+file_name
        # Move the file to the new location
        new_location = shutil.move(source_path+file_name, destination_path)
        # Print the new location of the file
        print(f"The {source_path} is moved to the location {new_location}")
    else:
        # Print the message if the file not exists
        print("File does not exist.")
        
def send_data(source_path, processed_data_path, file_name):
    '''Read an input file line by line and process it to send it in kafka broker.
    When all file has been sent, call file_mover function on it.'''
    if path.exists(source_path+file_name):
        with open(source_path+file_name,'r')as file:
            filecontent=csv.reader(file)
            for row_number, row in enumerate(filecontent):
                data = {f"{file_name}_row{row_number}": ",".join(row)}
                producer.send('test', value=data)
                print(data)

        file_mover(source_path, processed_data_path, file_name)
    else:
        print("File does not exist.")

def manage_data_sending(source_path, processed_data_path):
    '''Main function for the producer. Apply producer to all file in source folder.'''
    all_files = listdir(source_path)
    for file_name in all_files[:1]:
        send_data(source_path, processed_data_path, file_name)
              
            

## Apply producer

In [6]:
manage_data_sending(source_path, processed_data_path)

{'event_20170103.csv_row0': 'id_evt,date_occur_evt,date_system_evt,type_evt,type_equipement,id_equipement,info_div'}
{'event_20170103.csv_row1': '20170103000000,2017-01-03 00:00:00,2017-01-03 12:00:00,ev_K,K,4,'}
{'event_20170103.csv_row2': '20170103000010,2017-01-03 00:00:10,2017-01-03 12:00:00,ev_C,C,7,'}
{'event_20170103.csv_row3': '20170103000020,2017-01-03 00:00:20,2017-01-03 12:00:00,ev_C,C,11,'}
{'event_20170103.csv_row4': '20170103000030,2017-01-03 00:00:30,2017-01-03 12:00:00,ev_C,C,3,'}
{'event_20170103.csv_row5': '20170103000040,2017-01-03 00:00:40,2017-01-03 12:00:00,ev_C,C,11,'}
{'event_20170103.csv_row6': '20170103000050,2017-01-03 00:00:50,2017-01-03 12:00:00,ev_N,K,4,'}
{'event_20170103.csv_row7': '20170103000100,2017-01-03 00:01:00,2017-01-03 12:00:00,ev_C,C,4,'}
{'event_20170103.csv_row8': '20170103000110,2017-01-03 00:01:10,2017-01-03 12:00:00,ev_C,C,19,'}
{'event_20170103.csv_row9': '20170103000120,2017-01-03 00:01:20,2017-01-03 12:00:00,ev_C,C,5,'}
{'event_20170103

{'event_20170103.csv_row1034': '20170103025210,2017-01-03 02:52:10,2017-01-03 12:00:00,ev_C,C,17,'}
{'event_20170103.csv_row1035': '20170103025220,2017-01-03 02:52:20,2017-01-03 12:00:00,ev_C,C,14,'}
{'event_20170103.csv_row1036': '20170103025230,2017-01-03 02:52:30,2017-01-03 12:00:00,ev_C,C,3,'}
{'event_20170103.csv_row1037': '20170103025240,2017-01-03 02:52:40,2017-01-03 12:00:00,ev_C,C,13,'}
{'event_20170103.csv_row1038': '20170103025250,2017-01-03 02:52:50,2017-01-03 12:00:00,ev_N,K,3,'}
{'event_20170103.csv_row1039': '20170103025300,2017-01-03 02:53:00,2017-01-03 12:00:00,ev_N,K,2,'}
{'event_20170103.csv_row1040': '20170103025310,2017-01-03 02:53:10,2017-01-03 12:00:00,ev_C,C,5,'}
{'event_20170103.csv_row1041': '20170103025320,2017-01-03 02:53:20,2017-01-03 12:00:00,ev_C,C,7,'}
{'event_20170103.csv_row1042': '20170103025330,2017-01-03 02:53:30,2017-01-03 12:00:00,ev_C,C,20,'}
{'event_20170103.csv_row1043': '20170103025340,2017-01-03 02:53:40,2017-01-03 12:00:00,ev_N,K,5,'}
{'even

{'event_20170103.csv_row2052': '20170103054150,2017-01-03 05:41:50,2017-01-03 12:00:00,ev_C,C,2,'}
{'event_20170103.csv_row2053': '20170103054200,2017-01-03 05:42:00,2017-01-03 12:00:00,ev_C,C,11,'}
{'event_20170103.csv_row2054': '20170103054210,2017-01-03 05:42:10,2017-01-03 12:00:00,ev_C,C,12,'}
{'event_20170103.csv_row2055': '20170103054220,2017-01-03 05:42:20,2017-01-03 12:00:00,ev_C,C,10,'}
{'event_20170103.csv_row2056': '20170103054230,2017-01-03 05:42:30,2017-01-03 12:00:00,ev_C,C,13,'}
{'event_20170103.csv_row2057': '20170103054240,2017-01-03 05:42:40,2017-01-03 12:00:00,ev_K,K,2,'}
{'event_20170103.csv_row2058': '20170103054250,2017-01-03 05:42:50,2017-01-03 12:00:00,ev_C,C,7,'}
{'event_20170103.csv_row2059': '20170103054300,2017-01-03 05:43:00,2017-01-03 12:00:00,ev_C,C,3,'}
{'event_20170103.csv_row2060': '20170103054310,2017-01-03 05:43:10,2017-01-03 12:00:00,ev_C,C,16,'}
{'event_20170103.csv_row2061': '20170103054320,2017-01-03 05:43:20,2017-01-03 12:00:00,ev_N,K,5,'}
{'eve

{'event_20170103.csv_row2859': '20170103075620,2017-01-03 07:56:20,2017-01-03 12:00:00,ev_C,C,15,'}
{'event_20170103.csv_row2860': '20170103075630,2017-01-03 07:56:30,2017-01-03 12:00:00,ev_C,C,1,'}
{'event_20170103.csv_row2861': '20170103075640,2017-01-03 07:56:40,2017-01-03 12:00:00,ev_C,C,2,'}
{'event_20170103.csv_row2862': '20170103075650,2017-01-03 07:56:50,2017-01-03 12:00:00,ev_C,C,19,'}
{'event_20170103.csv_row2863': '20170103075700,2017-01-03 07:57:00,2017-01-03 12:00:00,ev_K,K,1,'}
{'event_20170103.csv_row2864': '20170103075710,2017-01-03 07:57:10,2017-01-03 12:00:00,ev_C,C,12,'}
{'event_20170103.csv_row2865': '20170103075720,2017-01-03 07:57:20,2017-01-03 12:00:00,ev_C,C,12,'}
{'event_20170103.csv_row2866': '20170103075730,2017-01-03 07:57:30,2017-01-03 12:00:00,ev_C,C,9,'}
{'event_20170103.csv_row2867': '20170103075740,2017-01-03 07:57:40,2017-01-03 12:00:00,ev_K,K,3,'}
{'event_20170103.csv_row2868': '20170103075750,2017-01-03 07:57:50,2017-01-03 12:00:00,ev_C,C,11,'}
{'eve

{'event_20170103.csv_row3585': '20170103095720,2017-01-03 09:57:20,2017-01-03 12:00:00,ev_K,K,2,'}
{'event_20170103.csv_row3586': '20170103095730,2017-01-03 09:57:30,2017-01-03 12:00:00,ev_C,C,5,'}
{'event_20170103.csv_row3587': '20170103095740,2017-01-03 09:57:40,2017-01-03 12:00:00,ev_C,C,17,'}
{'event_20170103.csv_row3588': '20170103095750,2017-01-03 09:57:50,2017-01-03 12:00:00,ev_K,K,5,'}
{'event_20170103.csv_row3589': '20170103095800,2017-01-03 09:58:00,2017-01-03 12:00:00,ev_K,K,1,'}
{'event_20170103.csv_row3590': '20170103095810,2017-01-03 09:58:10,2017-01-03 12:00:00,ev_K,K,3,'}
{'event_20170103.csv_row3591': '20170103095820,2017-01-03 09:58:20,2017-01-03 12:00:00,ev_C,C,2,'}
{'event_20170103.csv_row3592': '20170103095830,2017-01-03 09:58:30,2017-01-03 12:00:00,ev_K,K,5,'}
{'event_20170103.csv_row3593': '20170103095840,2017-01-03 09:58:40,2017-01-03 12:00:00,ev_N,K,4,'}
{'event_20170103.csv_row3594': '20170103095850,2017-01-03 09:58:50,2017-01-03 12:00:00,ev_C,C,13,'}
{'event_

{'event_20170103.csv_row4654': '20170103125530,2017-01-03 12:55:30,2017-01-03 23:59:59,ev_C,C,8,'}
{'event_20170103.csv_row4655': '20170103125540,2017-01-03 12:55:40,2017-01-03 23:59:59,ev_C,C,3,'}
{'event_20170103.csv_row4656': '20170103125550,2017-01-03 12:55:50,2017-01-03 23:59:59,ev_C,C,20,'}
{'event_20170103.csv_row4657': '20170103125600,2017-01-03 12:56:00,2017-01-03 23:59:59,ev_C,C,20,'}
{'event_20170103.csv_row4658': '20170103125610,2017-01-03 12:56:10,2017-01-03 23:59:59,ev_C,C,4,'}
{'event_20170103.csv_row4659': '20170103125620,2017-01-03 12:56:20,2017-01-03 23:59:59,ev_C,C,15,'}
{'event_20170103.csv_row4660': '20170103125630,2017-01-03 12:56:30,2017-01-03 23:59:59,ev_C,C,2,'}
{'event_20170103.csv_row4661': '20170103125640,2017-01-03 12:56:40,2017-01-03 23:59:59,ev_K,K,4,'}
{'event_20170103.csv_row4662': '20170103125650,2017-01-03 12:56:50,2017-01-03 23:59:59,ev_C,C,10,'}
{'event_20170103.csv_row4663': '20170103125700,2017-01-03 12:57:00,2017-01-03 23:59:59,ev_C,C,1,'}
{'even

{'event_20170103.csv_row5561': '20170103152640,2017-01-03 15:26:40,2017-01-03 23:59:59,ev_C,C,9,'}
{'event_20170103.csv_row5562': '20170103152650,2017-01-03 15:26:50,2017-01-03 23:59:59,ev_C,C,16,'}
{'event_20170103.csv_row5563': '20170103152700,2017-01-03 15:27:00,2017-01-03 23:59:59,ev_C,C,18,'}
{'event_20170103.csv_row5564': '20170103152710,2017-01-03 15:27:10,2017-01-03 23:59:59,ev_K,K,1,'}
{'event_20170103.csv_row5565': '20170103152720,2017-01-03 15:27:20,2017-01-03 23:59:59,ev_C,C,4,'}
{'event_20170103.csv_row5566': '20170103152730,2017-01-03 15:27:30,2017-01-03 23:59:59,ev_C,C,1,'}
{'event_20170103.csv_row5567': '20170103152740,2017-01-03 15:27:40,2017-01-03 23:59:59,ev_C,C,4,'}
{'event_20170103.csv_row5568': '20170103152750,2017-01-03 15:27:50,2017-01-03 23:59:59,ev_C,C,15,'}
{'event_20170103.csv_row5569': '20170103152800,2017-01-03 15:28:00,2017-01-03 23:59:59,ev_N,K,4,'}
{'event_20170103.csv_row5570': '20170103152810,2017-01-03 15:28:10,2017-01-03 23:59:59,ev_C,C,14,'}
{'even

{'event_20170103.csv_row6387': '20170103174420,2017-01-03 17:44:20,2017-01-03 23:59:59,ev_C,C,7,'}
{'event_20170103.csv_row6388': '20170103174430,2017-01-03 17:44:30,2017-01-03 23:59:59,ev_C,C,17,'}
{'event_20170103.csv_row6389': '20170103174440,2017-01-03 17:44:40,2017-01-03 23:59:59,ev_C,C,14,'}
{'event_20170103.csv_row6390': '20170103174450,2017-01-03 17:44:50,2017-01-03 23:59:59,ev_K,K,5,'}
{'event_20170103.csv_row6391': '20170103174500,2017-01-03 17:45:00,2017-01-03 23:59:59,ev_N,K,1,'}
{'event_20170103.csv_row6392': '20170103174510,2017-01-03 17:45:10,2017-01-03 23:59:59,ev_N,K,5,'}
{'event_20170103.csv_row6393': '20170103174520,2017-01-03 17:45:20,2017-01-03 23:59:59,ev_C,C,7,'}
{'event_20170103.csv_row6394': '20170103174530,2017-01-03 17:45:30,2017-01-03 23:59:59,ev_K,K,2,'}
{'event_20170103.csv_row6395': '20170103174540,2017-01-03 17:45:40,2017-01-03 23:59:59,ev_C,C,2,'}
{'event_20170103.csv_row6396': '20170103174550,2017-01-03 17:45:50,2017-01-03 23:59:59,ev_C,C,11,'}
{'event

{'event_20170103.csv_row7402': '20170103203330,2017-01-03 20:33:30,2017-01-03 23:59:59,ev_C,C,10,'}
{'event_20170103.csv_row7403': '20170103203340,2017-01-03 20:33:40,2017-01-03 23:59:59,ev_C,C,12,'}
{'event_20170103.csv_row7404': '20170103203350,2017-01-03 20:33:50,2017-01-03 23:59:59,ev_K,K,2,'}
{'event_20170103.csv_row7405': '20170103203400,2017-01-03 20:34:00,2017-01-03 23:59:59,ev_C,C,13,'}
{'event_20170103.csv_row7406': '20170103203410,2017-01-03 20:34:10,2017-01-03 23:59:59,ev_C,C,19,'}
{'event_20170103.csv_row7407': '20170103203420,2017-01-03 20:34:20,2017-01-03 23:59:59,ev_C,C,3,'}
{'event_20170103.csv_row7408': '20170103203430,2017-01-03 20:34:30,2017-01-03 23:59:59,ev_C,C,5,'}
{'event_20170103.csv_row7409': '20170103203440,2017-01-03 20:34:40,2017-01-03 23:59:59,ev_C,C,19,'}
{'event_20170103.csv_row7410': '20170103203450,2017-01-03 20:34:50,2017-01-03 23:59:59,ev_K,K,4,'}
{'event_20170103.csv_row7411': '20170103203500,2017-01-03 20:35:00,2017-01-03 23:59:59,ev_K,K,3,'}
{'eve

{'event_20170103.csv_row8210': '20170103224810,2017-01-03 22:48:10,2017-01-03 23:59:59,ev_C,C,6,'}
{'event_20170103.csv_row8211': '20170103224820,2017-01-03 22:48:20,2017-01-03 23:59:59,ev_C,C,16,'}
{'event_20170103.csv_row8212': '20170103224830,2017-01-03 22:48:30,2017-01-03 23:59:59,ev_K,K,4,'}
{'event_20170103.csv_row8213': '20170103224840,2017-01-03 22:48:40,2017-01-03 23:59:59,ev_C,C,4,'}
{'event_20170103.csv_row8214': '20170103224850,2017-01-03 22:48:50,2017-01-03 23:59:59,ev_C,C,5,'}
{'event_20170103.csv_row8215': '20170103224900,2017-01-03 22:49:00,2017-01-03 23:59:59,ev_C,C,10,'}
{'event_20170103.csv_row8216': '20170103224910,2017-01-03 22:49:10,2017-01-03 23:59:59,ev_C,C,6,'}
{'event_20170103.csv_row8217': '20170103224920,2017-01-03 22:49:20,2017-01-03 23:59:59,ev_K,K,2,'}
{'event_20170103.csv_row8218': '20170103224930,2017-01-03 22:49:30,2017-01-03 23:59:59,ev_K,K,5,'}
{'event_20170103.csv_row8219': '20170103224940,2017-01-03 22:49:40,2017-01-03 23:59:59,ev_C,C,11,'}
{'event

## Kafka Consumer

In [7]:
from kafka import KafkaConsumer

consumer = KafkaConsumer('test',
                         auto_offset_reset='earliest',
                         enable_auto_commit=True,
                         group_id='test-group-py',
                         bootstrap_servers=['localhost:9092'],
                         value_deserializer=lambda m: loads(m))

In [9]:
def csv_writer(line, dest_folder, parsed_date):
    '''
    Receive a line and the parsed datetime from Kafka Brocker
    and write it in the corresponding partionned folder
    '''
    csv_line = line.split(',')
    if csv_line[0]=="id_evt":
            pass
    else:
        path = f"{dest_folder}/{parsed_date}"
        path_creator(path)
        with open(f"{path}/event_{parsed_date}.csv", "a", newline='') as csv_file:
            writer = csv.writer(csv_file)
            writer.writerow(csv_line)
    return None

In [None]:
for msg in consumer:
    msg_value = list(msg.value.values())
    msg_occur_date = msg_value[0][:8]
    csv_writer(msg_value[0],dest_folder=consumer_destination_path, parsed_date=msg_occur_date)