In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, struct
from confluent_kafka import Producer
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SparkToKafkaBatch") \
    .getOrCreate()

# Define batch size and topic
batch_size = 100
topic = 'Football'
csv_file_path = "train.csv"

# Read CSV file into Spark DataFrame
train_df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Kafka configuration
kafka_conf = {
    'bootstrap.servers': 'localhost:9092',  # Kafka server address
}

# Create Kafka producer instance
producer = Producer(kafka_conf)

                                                                                

In [8]:
# Function to send DataFrame batch to Kafka
def send_batch_to_kafka(df_batch, topic):
    # Convert DataFrame to JSON format
    df_batch = df_batch.select(to_json(struct(*df_batch.columns)).alias("json"))
    
    # Collect data as a list of JSON strings
    json_data = df_batch.rdd.map(lambda row: row["json"]).collect()
    
    # Produce messages to Kafka
    for json_row in json_data:
        producer.produce(topic, value=json_row)
    producer.flush()

# Function to process DataFrame in batches and send to Kafka
def process_and_send_batches(df, batch_size, producer, topic):
    # Get total number of rows in the DataFrame
    total_rows = df.count()
    
    # Calculate the number of batches
    num_batches = (total_rows + batch_size - 1) // batch_size  # Ceiling division
    
    # Iterate over batches
    for i in range(num_batches):
        start_index = i * batch_size
        end_index = (i + 1) * batch_size
        
        # Get the batch DataFrame
        batch_df = df.limit(end_index).subtract(df.limit(start_index))
        
        # Send the batch to Kafka
        send_batch_to_kafka(batch_df, topic)
        print(f"Batch {i+1}/{num_batches} sent to Kafka.")
        time.sleep(360)  # Sleep for 60 seconds (1 minute)



In [None]:
# Process and send data to Kafka in batches
try:
    process_and_send_batches(train_df, batch_size, producer, topic)
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    producer.flush()  # Ensure all messages are sent before exiting
    spark.stop()  # Stop the Spark session


                                                                                

Batch 1/1110 sent to Kafka.
Batch 2/1110 sent to Kafka.
Batch 3/1110 sent to Kafka.
Batch 4/1110 sent to Kafka.
Batch 5/1110 sent to Kafka.
Batch 6/1110 sent to Kafka.
Batch 7/1110 sent to Kafka.
Batch 8/1110 sent to Kafka.
Batch 9/1110 sent to Kafka.


[Stage 61:>                                                         (0 + 1) / 1]                                                                                

Batch 10/1110 sent to Kafka.
Batch 11/1110 sent to Kafka.
Batch 12/1110 sent to Kafka.




Batch 13/1110 sent to Kafka.
Batch 14/1110 sent to Kafka.
Batch 15/1110 sent to Kafka.


                                                                                

Batch 16/1110 sent to Kafka.




Batch 17/1110 sent to Kafka.


                                                                                

Batch 18/1110 sent to Kafka.
Batch 19/1110 sent to Kafka.


                                                                                

Batch 20/1110 sent to Kafka.


                                                                                

Batch 21/1110 sent to Kafka.


                                                                                

Batch 22/1110 sent to Kafka.


                                                                                

Batch 23/1110 sent to Kafka.


                                                                                

Batch 24/1110 sent to Kafka.


                                                                                

Batch 25/1110 sent to Kafka.


                                                                                

Batch 26/1110 sent to Kafka.


                                                                                

Batch 27/1110 sent to Kafka.


                                                                                

Batch 28/1110 sent to Kafka.




Batch 29/1110 sent to Kafka.


                                                                                

Batch 30/1110 sent to Kafka.




Batch 31/1110 sent to Kafka.


                                                                                

Batch 32/1110 sent to Kafka.


                                                                                

Batch 33/1110 sent to Kafka.


                                                                                

Batch 34/1110 sent to Kafka.


                                                                                

Batch 35/1110 sent to Kafka.
Batch 36/1110 sent to Kafka.


In [None]:
# Stop the Spark session
spark.stop()