In [None]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pymongo import MongoClient

# Configuration (Customize these values)
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_TOPIC_NAME = 'music-data'
MONGO_HOST = 'localhost'
MONGO_PORT = 27017
MONGO_DB_NAME = 'project'
MONGO_COLLECTION_NAME = 'music_dataset'

# Define the schema for music data (Customize based on your actual data)
schema = {
    "title": "string",
    "artist": "string",
    "genre": "string",
    "duration_seconds": "integer",
    "play_count": "long",
    "release_date": "string",
    "album": "string",
    "year": "string",
    "label": "string",
    "rating": "string"
}

# Create a SparkSession with proper error handling
try:
    spark = SparkSession.builder \
        .appName("KafkaToMongoDB") \
        .getOrCreate()
except Exception as e:
    print(f"Error creating SparkSession: {e}")
    exit(1)

# Read from Kafka topic using the kafka format
try:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
        .option("subscribe", KAFKA_TOPIC_NAME) \
        .option("startingOffsets", "latest") \
        .load()
except Exception as e:
    print(f"Error reading from Kafka topic: {e}")
    spark.stop()
    exit(1)

# Parse JSON and select relevant columns with data type validation
try:
    df = df.select(from_json(col("value").cast("string"), "schema").alias("data")).select("data.*")
except Exception as e:
    print(f"Error parsing JSON or selecting columns: {e}")
    spark.stop()
    exit(1)

# Connect to MongoDB with error handling and authentication (if needed)
try:
    client = MongoClient(MONGO_HOST, MONGO_PORT)  # Authentication can be added here
    db = client[MONGO_DB_NAME]
    collection = db[MONGO_COLLECTION_NAME]
except Exception as e:
    print(f"Error connecting to MongoDB: {e}")
    spark.stop()
    exit(1)

# Write data to MongoDB with batching, checkpointing, and error handling
try:
    writeStream = df \
        .writeStream \
        .outputMode("append") \
        .foreachBatch(lambda batchDF, batchId: batchDF.foreach(lambda row: collection.insert_one(row.asDict()))) \
        .option("checkpointLocation", "/tmp/spark-checkpoint") \
        .start()
    writeStream.awaitTermination()
except Exception as e:
    print(f"Error writing data to MongoDB: {e}")
    spark.stop()
    exit(1)

# Stop SparkSession gracefully
spark.stop()