In [1]:
from pyspark.sql import SparkSession

# you need these two to transform the json strings to dataframes
from pyspark.sql.types import MapType, StringType
from pyspark.sql.functions import from_json, col, explode, to_timestamp, from_utc_timestamp

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-mongo-streaming')
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0,org.mongodb.spark:mongo-spark-connector_2.12:2.4.4")
         # Mongo config including the username and password from compose file
         .config("spark.mongodb.input.uri","mongodb://root:example@mongo:27017/docstreaming.yelp?authSource=admin")
         .config("spark.mongodb.output.uri","mongodb://root:example@mongo:27017/docstreaming.yelp?authSource=admin")
         .config("spark.executor.extraJavaOptions", "-Djava.security.egd=file:/dev/./urandom")
         .config("spark.sql.streaming.kafka.partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner")
         .config("failOnDataLoss", "true")
         .config("spark.mongodb.input.maxPoolSize", "100")
         .getOrCreate())
sc = spark.sparkContext


In [2]:
# Read the message from the kafka stream
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "Yelp-topic") \
  .load()

# convert the binary values to string
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [3]:
#Create a temporary view for SparkSQL
df1.createOrReplaceTempView("message")

In [4]:
# Write out the message to the console of the environment
res = spark.sql("SELECT * from message")
res.writeStream.format("console") \
            .outputMode("append") \
            .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f15129bd030>

In [5]:
# Write the unvonverted dataframe (no strings)
# message back into Kafka in another topic#
# listen to it with a local consumer
ds = df \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("topic", "spark-output") \
  .option("checkpointLocation", "/tmp") \
  .start() 

In [6]:
# Write the message into MongoDB
def foreach_batch_function(df, epoch_id):
    if df.rdd.isEmpty():
        print("Dataframe is empty")
        return True
    
    #Transform the values of all rows in column value and create a dataframe out of it (will also only have one row)
    df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType()))) 
    
    #Explode the value column to create a key-value pair for each row
    df2 = df2.select(explode("value").alias("key", "value"))
    
    #Pivot the key-value pairs such that each key becomes a separate column, The "first" function returns the first non-null value in the group of values for each key.
    df2 = df2.groupBy().pivot("key").agg({"value": "first"})
    column_names = df2.columns
    # print(column_names)
    
    # df2.show()
    # df2.printSchema()
   
    user_columns = ["user_id", "name", "review_count", "yelping_since"]
    # Transform the dataframe
    # Check if data is coming from User 
    if all(col in column_names for col in user_columns):
        # Rename Columns
        df3 = df2.withColumnRenamed("name", "user_name") \
                 .withColumnRenamed("friends", "Numbers_of_friends")
        
        # Convert timestamp to UTC
        df3 = df3.withColumn("yelping_since_utc", to_timestamp("yelping_since", "yyyy-MM-dd HH:mm:ss"))
        # Convert timestamp to GMT+1
        df3 = df3.withColumn("yelping_since", from_utc_timestamp("yelping_since_utc", "+01:00"))
        # Drop the intermediate UTC column
        df3 = df3.drop("yelping_since_utc")
        # print("we process user")
        
    # Check if data is coming from Review
    elif "review_id" in column_names and "user_id" in column_names and "business_id" in column_names and "stars" in column_names:
        df3 = df2.withColumnRenamed("text", "review_text") \
                 .withColumnRenamed("date", "review_date")
        
        # Convert timestamp to UTC
        df3 = df3.withColumn("review_date_utc", to_timestamp("review_date", "yyyy-MM-dd HH:mm:ss"))
        # Convert timestamp to GMT+1
        df3 = df3.withColumn("review_date", from_utc_timestamp("review_date_utc", "+01:00"))
        # Drop the intermediate UTC column
        df3 = df3.drop("review_date_utc")
        # print("we process review")
        
    # Check if data is coming from Tip
    elif "user_id" in column_names and "business_id" in column_names and "text" in column_names and "date" in column_names:
        # Rename Columns
        df3 = df2.withColumnRenamed("text", "tip_text") \
                 .withColumnRenamed("date", "tip_date")
        
        # Convert timestamp to UTC
        df3 = df3.withColumn("tip_date_utc", to_timestamp("tip_date", "yyyy-MM-dd HH:mm:ss"))
        # Convert timestamp to GMT+1
        df3 = df3.withColumn("tip_date", from_utc_timestamp("tip_date_utc", "+01:00"))
        # Drop the intermediate UTC column
        df3 = df3.drop("tip_date_utc")
        # print("we process tip")
        
    # Check if data is coming from Checkin
    elif "business_id" in column_names and "date" in column_names:
        df3 = df2.withColumnRenamed("date", "total_of_checkin")
        
        df3 = df3.withColumn("total_of_checkin", col("total_of_checkin").cast("int"))
        # print("we process checkin")
        
    # Check if data is coming from Business
    elif "latitude" in column_names and "address" in column_names and "city" in column_names and "postal_code" in column_names:
        df3 = df2.withColumnRenamed("name", "business_name")
        
        df3 = df2.withColumn("latitude", col("latitude").cast("double")) \
                 .withColumn("longitude", col("longitude").cast("double")) \
                 .withColumn("stars", col("stars").cast("double"))
          
        # print("we process Business")
  
    # Reading data from MongoDB
    existing_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("checkpointLocation", "/tmp/checkpoint_location").load()
    # existing_df.show()
    
    if existing_df.rdd.isEmpty():
        # print("existing_df is empty")
        df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    else:
        if "business_id" in column_names:
            joined_df = df3.join(existing_df, "business_id", 'inner')
            if joined_df.select("business_id").count() > 0:
                joined_df.write.format("com.mongodb.spark.sql.DefaultSource").option("replaceDocument", "true").mode("append").save()
            else:
                df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
        elif "user_id" in column_names:
            joined_df = df3.join(existing_df, "user_id", 'inner')
            if joined_df.select("user_id").count() > 0:
                joined_df.write.format("com.mongodb.spark.sql.DefaultSource").option("replaceDocument", "true").mode("append").save()
            else:
                df3.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
        else:
            raise ValueError("column_names should contain either business_id or user_id")

    return True

In [7]:
# Start the MongoDB stream and wait for termination
df1.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()

Dataframe is empty


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 