In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
packages = ["org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4",
            "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1",
            "org.mongodb:mongodb-driver-sync:4.9.1"
           ]
# create spark session
spark = SparkSession.builder\
    .appName('kafka_spark_consumer')\
    .master('local[2]')\
    .config("spark.jars.packages", ",".join(packages)) \
    .config("spark.jars.repositories", "https://repo.maven.apache.org/maven2") \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')

24/07/19 10:52:37 WARN Utils: Your hostname, varna-IdeaPad-Flex-5-14ALC05 resolves to a loopback address: 127.0.1.1; using 192.168.18.227 instead (on interface wlp2s0)
24/07/19 10:52:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
https://repo.maven.apache.org/maven2 added as a remote repository with the name: repo-1


:: loading settings :: url = jar:file:/home/varna/hadoop/spark/spark-3.2.4-bin-hadoop3.2/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/varna/.ivy2/cache
The jars for the packages stored in: /home/varna/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
org.mongodb#mongodb-driver-sync added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0e94ce4b-96c6-4c44-9cb2-2767835db990;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.4 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.4 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in 

In [3]:
# make a spark connection with kafka topic to read dstreams
KAFKA_TOPIC = 'newsTopic3'
KAFKA_BOOTSTRAP_SERVER = 'localhost:9092'

stream_df = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)\
    .option("subscribe", KAFKA_TOPIC)\
    .option("startingOffsets", "earliest")\
    .load()

In [4]:
news_df = stream_df.selectExpr("CAST(value as STRING)", "timestamp")
news_df

DataFrame[value: string, timestamp: timestamp]

In [5]:
# applying new schema
news_schema = StructType([
    StructField('author', StringType()),
    StructField('title', StringType()),
    StructField('description', StringType()),
])

news_info_df = news_df.select(from_json(col("value"), news_schema).alias("news_info"), "timestamp")\
    .select('news_info.*', "timestamp")
news_info_df

DataFrame[author: string, title: string, description: string, timestamp: timestamp]

In [None]:
# write data stream to mongodb.
query = news_info_df.writeStream\
    .format("mongodb")\
    .option("spark.mongodb.connection.uri", "mongodb://localhost:27017/?directConnection=true") \
    .option("spark.mongodb.database", "new") \
    .option("spark.mongodb.collection", "news") \
    .option("checkpointLocation", "checkpoint/chk1")\
    .start()

query.awaitTermination()