In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

# MongoDB connection details
#username = "your_username"
#password = "your_password"
#ip_address = "172.18.0.2"
port = 27017
#uri = f"mongodb://{username}:{password}@{ip_address}:{port}/{db_name}.{collection_name}?authSource=admin"
uri = "mongodb://mongo1"
spark = (
    SparkSession 
    .builder 
    .appName("Streaming from Kafka into Mongodb") 
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .config('spark.jars.packages', "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.mongodb.spark:mongo-spark-connector_2.12:10.3.0")
    .config("spark.mongodb.input.uri", uri)
    .config("spark.sql.catalog.myCatalog", "com.mongodb.spark.sql.v2.MongoCatalog")
    .config("spark.sql.extensions", "com.mongodb.spark.sql.v2.MongoSparkExtensions")
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [2]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka1:29092")
    .option("subscribe", "items_shopify_posted")
    .option("startingOffsets", "earliest")
    .load()
)

In [3]:
# View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
schema = StructType([
        StructField("name", StringType(), True),
        StructField("price", StringType(), True),
        StructField("category", StringType(), True),
        StructField("instock", StringType(), True),
        StructField("tags", StringType(), True),
        StructField("description", StringType(), False),
        StructField("filename", StringType(), True)
    ])


In [5]:
from pyspark.sql.functions import from_json,col
streaming_df = kafka_df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col('value'), schema).alias('data')).select("data.*")

In [6]:
! pip install pymongo

Collecting pymongo
  Downloading pymongo-4.7.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (669 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m669.1/669.1 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting dnspython<3.0.0,>=1.16.0
  Downloading dnspython-2.6.1-py3-none-any.whl (307 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m307.7/307.7 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.6.1 pymongo-4.7.3


In [7]:
# Function to create MongoDB database
def create_database(client, db_name):
    db = client[db_name]
    print(f"Database '{db_name}' created successfully!")
    return db

In [8]:
# Function to create MongoDB collection
def create_collection(db, collection_name):
    collection = db[collection_name]
    return collection

In [9]:
import logging
from pymongo import MongoClient, errors
# Function to insert data into MongoDB collection
def insert_data(collection, **kwargs):
    print("Inserting data...")

    document = {
        'name': kwargs.get('name'),
        'price': kwargs.get('price'),
        'category': kwargs.get('category'),
        'instock': kwargs.get('instock'),
        'tags': kwargs.get('tags'),
        'description': kwargs.get('description'),
        'filename': kwargs.get('filename')
    }

    try:
        collection.insert_one(document)
        logging.info(f"Data inserted for {document['name']}")
    except errors.DuplicateKeyError:
        logging.error(f"Document with phone {document['name']} already exists")
    except Exception as e:
        logging.error(f"Could not insert data due to {e}")

In [10]:
# Function to create MongoDB connection
def create_mongo_connection(uri):
    try:
        client = MongoClient(uri)
        return client
    except Exception as e:
        logging.error(f"Could not create MongoDB connection due to {e}")
        return None

In [11]:
# To the schema of the data, place a sample json file and change readStream to read 
streaming_df.printSchema()
#streaming_df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- category: string (nullable = true)
 |-- instock: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- description: string (nullable = true)
 |-- filename: string (nullable = true)



In [None]:
client = create_mongo_connection(uri)
if client is not None:
    db_name = "shopify"
    collection_name = "items"
    db = create_database(client, db_name)
    collection = create_collection(db, collection_name)
    
    logging.info("Streaming is being started...")

    # Example insert data
    #insert_data(collection,
    #            name="t-shirt",
    #            price="20",
    #            category="vetement été",
    #            instock=true,
   #             tags="version1",
   #             description="good",
   #             filename="product.png"


    # Assuming you have a streaming DataFrame `streaming_df` with Spark
    streaming_query = (streaming_df.writeStream
                       .format("mongodb")
                       .outputMode("append")
                       .option("checkpointLocation", "checkpoint_dir_kafka")
                       .option("'spark.mongodb.connection.uri'", uri)
                       .option("database", db_name)
                       .option("collection", collection_name)
                       .start()
                       .awaitTermination())

Database 'shopify' created successfully!
