In [1]:
from kafka import KafkaProducer
import json
import time
import boto3
import pandas as pd

# Kafka configuration
KAFKA_BROKER = 'kafka:9092'
TOPIC = 'mes-data'

# Initialize Kafka producer with JSON serialization
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8')
)

# MinIO configuration
minio_endpoint = "http://minio:9000"
bucket_name = "industrial-data"
file_key = "mes_data.csv"

# Connect to MinIO and read the CSV file
s3 = boto3.client(
    's3',
    endpoint_url=minio_endpoint,
    aws_access_key_id='minioadmin',
    aws_secret_access_key='minioadmin'
)
obj = s3.get_object(Bucket=bucket_name, Key=file_key)
df = pd.read_csv(obj['Body'])

# Convert 'Timestamp' column to datetime
df['Timestamp'] = pd.to_datetime(df['Timestamp'])

# Group data by 'Timestamp' and send each machine row as a separate message
for timestamp, group in df.groupby('Timestamp'):
    print(f"\n📤 Sending 10 messages for timestamp {timestamp}")
    for _, row in group.iterrows():
        message = row.to_dict()
        producer.send(TOPIC, message)
        print(f"✅ Sent: {message}")
    producer.flush()
    time.sleep(1)  # Delay between each timestamp group for fast run 

print("\n✅✅ All messages sent.")

ModuleNotFoundError: No module named 'kafka'

In [2]:
pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.1.5-py2.py3-none-any.whl (285 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m285.4/285.4 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.1.5
Note: you may need to restart the kernel to use updated packages.


In [3]:
!pip install boto3

Collecting boto3
  Downloading boto3-1.38.3-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.9/139.9 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting s3transfer<0.13.0,>=0.12.0
  Downloading s3transfer-0.12.0-py3-none-any.whl (84 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.8/84.8 kB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jmespath<2.0.0,>=0.7.1
  Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting botocore<1.39.0,>=1.38.3
  Downloading botocore-1.38.3-py3-none-any.whl (13.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.5/13.5 MB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
Installing collected packages: jmespath, botocore, s3transfer, boto3
Successfully installed boto3-1.38.3 botocore-1.38.3 jmespath-1.0.1 s3transfer-0.12.0


In [11]:
from kafka import KafkaProducer
import json
import time
import boto3
import pandas as pd

# Kafka configuration
KAFKA_BROKER = 'kafka:9092'
TOPIC = 'mes-data'

# Initialize Kafka producer with JSON serialization
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8')
)

# MinIO configuration
minio_endpoint = "http://minio:9000"
bucket_name = "industrial-data"
file_key = "future-stream-mes-20240115-20240121.csv"

# Connect to MinIO and read the CSV file
s3 = boto3.client(
    's3',
    endpoint_url=minio_endpoint,
    aws_access_key_id='minioadmin',
    aws_secret_access_key='minioadmin'
)
obj = s3.get_object(Bucket=bucket_name, Key=file_key)
df = pd.read_csv(obj['Body'])

# Convert 'Timestamp' column to datetime
df['Timestamp'] = pd.to_datetime(df['Timestamp'])

# Group data by 'Timestamp' and send each machine row as a separate message
for timestamp, group in df.groupby('Timestamp'):
    print(f"\n📤 Sending {len(group)} messages for timestamp {timestamp}")
    for _, row in group.iterrows():
        message = row.to_dict()
        producer.send(TOPIC, message)
        print(f"✅ Sent: {message}")
    producer.flush()
    time.sleep(10)  # Delay between each timestamp group

print("\n✅✅ All messages sent.")


📤 Sending 10 messages for timestamp 2024-01-15 00:00:00
✅ Sent: {'Timestamp': Timestamp('2024-01-15 00:00:00'), 'Machine_ID': 'Machine_1', 'Operator_ID': 1006, 'Units_Produced': 355, 'Defective_Units': 3, 'Production_Time_min': 20}
✅ Sent: {'Timestamp': Timestamp('2024-01-15 00:00:00'), 'Machine_ID': 'Machine_2', 'Operator_ID': 1004, 'Units_Produced': 54, 'Defective_Units': 7, 'Production_Time_min': 51}
✅ Sent: {'Timestamp': Timestamp('2024-01-15 00:00:00'), 'Machine_ID': 'Machine_3', 'Operator_ID': 1007, 'Units_Produced': 181, 'Defective_Units': 1, 'Production_Time_min': 37}
✅ Sent: {'Timestamp': Timestamp('2024-01-15 00:00:00'), 'Machine_ID': 'Machine_4', 'Operator_ID': 1008, 'Units_Produced': 86, 'Defective_Units': 2, 'Production_Time_min': 57}
✅ Sent: {'Timestamp': Timestamp('2024-01-15 00:00:00'), 'Machine_ID': 'Machine_5', 'Operator_ID': 1001, 'Units_Produced': 182, 'Defective_Units': 5, 'Production_Time_min': 116}
✅ Sent: {'Timestamp': Timestamp('2024-01-15 00:00:00'), 'Machine

In [10]:
nc -vz localhost 9093

SyntaxError: invalid syntax (2225024820.py, line 1)