In [None]:
#pip install kafka


In [None]:
#pip install --upgrade kafka-python


In [1]:
import kafka
print(kafka.__version__)

2.0.2


In [2]:
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, struct
import json
import time
import pandas as pd

In [None]:
import pyspark
print(pyspark.__version__)

In [None]:
print(spark.version) 

In [10]:
# Sample data
data = {
    "id": [1, 2, 3, 4, 5],
    "name": ["Alice", "Bob", "Charlie", "David", "Eve"],
    "age": [24, 27, 22, 32, 29],
    "city": ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]
}

# Create a DataFrame
df = pd.DataFrame(data)

# Save the DataFrame as a CSV file
base_path = "../../data/"
csv_name = "sample_data.csv"
csv_path = base_path + csv_name

df.to_csv(csv_path, index=False)

print(f"CSV file created at {csv_path}")


CSV file created at ../../data/sample_data.csv


In [4]:
# Read the created CSV file to verify
df_check = pd.read_csv(csv_path)
print(df_check)

   id     name  age         city
0   1    Alice   24     New York
1   2      Bob   27  Los Angeles
2   3  Charlie   22      Chicago
3   4    David   32      Houston
4   5      Eve   29      Phoenix


In [5]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("CSV to Kafka") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
    .getOrCreate()

24/11/10 22:06:26 WARN Utils: Your hostname, MacBook-Pro-de-Victor.local resolves to a loopback address: 127.0.0.1; using 192.168.1.130 instead (on interface en0)
24/11/10 22:06:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/victorgalan/miniconda3/envs/iceberg_env/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/victorgalan/.ivy2/cache
The jars for the packages stored in: /Users/victorgalan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c4064855-c4e1-4f36-a318-07c3451c2370;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 1191ms :: artifact

In [8]:
# Kafka configurations

# Update with your Kafka bootstrap servers
kafka_bootstrap_servers = "localhost:9093, localhost:9095, localhost:9097" 

# Replace with your Kafka topic
kafka_topic = "test2-topic"            

# 1st approach:

In [16]:
# Step 1: Read CSV file
# Replace csv_path with the path to your CSV file
df = spark.read.option("header", "true").csv(csv_path)

In [18]:
df.show()

+---+-------+---+-----------+
| id|   name|age|       city|
+---+-------+---+-----------+
|  1|  Alice| 24|   New York|
|  2|    Bob| 27|Los Angeles|
|  3|Charlie| 22|    Chicago|
|  4|  David| 32|    Houston|
|  5|    Eve| 29|    Phoenix|
+---+-------+---+-----------+



## Step 2: Prepare data for Kafka by selecting columns as 'key' and 'value'
 Kafka expects 'key' and 'value' columns as byte array, so we need to format it accordingly.
 In this example, we're serializing the entire row as a JSON string and setting it as the value.
 If you have a specific column for key, you can use that.

In [19]:
# Here we structure all columns as JSON and assign it to the 'value' column
df = (
    df.withColumn(
        "value", 
        to_json(struct([df[x] for x in df.columns])))
     )

In [21]:
df.show(truncate=False)

+---+-------+---+-----------+-------------------------------------------------------+
|id |name   |age|city       |value                                                  |
+---+-------+---+-----------+-------------------------------------------------------+
|1  |Alice  |24 |New York   |{"id":"1","name":"Alice","age":"24","city":"New York"} |
|2  |Bob    |27 |Los Angeles|{"id":"2","name":"Bob","age":"27","city":"Los Angeles"}|
|3  |Charlie|22 |Chicago    |{"id":"3","name":"Charlie","age":"22","city":"Chicago"}|
|4  |David  |32 |Houston    |{"id":"4","name":"David","age":"32","city":"Houston"}  |
|5  |Eve    |29 |Phoenix    |{"id":"5","name":"Eve","age":"29","city":"Phoenix"}    |
+---+-------+---+-----------+-------------------------------------------------------+



In [None]:
type(df)

In [None]:
df.head()

In [None]:
kafka_topic

In [None]:
kafka_bootstrap_servers

In [None]:
csv_path

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema explicitly for the CSV file
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

# Read the streaming CSV data with the defined schema
streaming_df = spark \
    .readStream \
    .option("header", "true") \
    .schema(schema) \
    .csv(base_path)

# Check the schema of the streaming DataFrame
streaming_df.printSchema()

# Proceed with processing the streaming DataFrame, for example:
# You can write the data to Kafka or perform other transformations

In [None]:
# Define the function to send batch data to Kafka
def send_batch_to_kafka(batch_df, batch_id):
    rows = batch_df.collect()  # Collect the rows of the batch
    producer = KafkaProducer(
        bootstrap_servers=kafka_bootstrap_servers,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    for row in rows:
        data = {'value': row.asDict()}
        producer.send(kafka_topic, value=data)
    producer.flush()

# Apply foreachBatch for streaming DataFrame
query = streaming_df.writeStream \
    .foreachBatch(send_batch_to_kafka) \
    .start()

# Wait for the query to terminate
query.awaitTermination()

# 2nd approach

In [22]:
# Initialize Kafka producer
producer = KafkaProducer(
    bootstrap_servers=kafka_bootstrap_servers,
    value_serializer=lambda v: v.encode('utf-8')  # Encode messages as UTF-8
)

In [24]:
type(producer)

kafka.producer.kafka.KafkaProducer

In [31]:
# Open the CSV file as a regular text file
with open(csv_path, mode='r') as file:
    # Read and skip the header if needed
    header = next(file)

    # Send each line as a message
    for line in file:
        # Strip any newline characters from the line
        message = line.strip()
        
        # Send the message to Kafka
        producer.send(kafka_topic, value=message)
        print(f"Sent message: {message}")

Sent message: 1,Alice,24,New York
Sent message: 2,Bob,27,Los Angeles
Sent message: 3,Charlie,22,Chicago
Sent message: 4,David,32,Houston
Sent message: 5,Eve,29,Phoenix


In [None]:
# Close the producer connection
producer.flush()
producer.close()

# 3rd approach:

In [32]:
import csv
import json


# Initialize Kafka producer with JSON serialization
producer = KafkaProducer(
    bootstrap_servers=kafka_bootstrap_servers,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)


# Open the CSV file and read each row
with open(csv_path, mode='r') as file:
    csv_reader = csv.DictReader(file)  # Automatically reads the header

    for row in csv_reader:
        # Construct the message as per the required JSON structure
        message = {"value": row}

        # Send the message to Kafka
        producer.send(kafka_topic, value=message)
        print(f"Sent message: {message}")

Sent message: {'value': {'id': '1', 'name': 'Alice', 'age': '24', 'city': 'New York'}}
Sent message: {'value': {'id': '2', 'name': 'Bob', 'age': '27', 'city': 'Los Angeles'}}
Sent message: {'value': {'id': '3', 'name': 'Charlie', 'age': '22', 'city': 'Chicago'}}
Sent message: {'value': {'id': '4', 'name': 'David', 'age': '32', 'city': 'Houston'}}
Sent message: {'value': {'id': '5', 'name': 'Eve', 'age': '29', 'city': 'Phoenix'}}


In [33]:
# Close the producer connection
producer.flush()
producer.close()

In [34]:
# Stop the Spark session
spark.stop()