In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
jars = ["/home/varna/hadoop/spark/spark-3.2.4-bin-hadoop3.2/jars/spark-sql-kafka-0-10_2.12-3.2.4.jar",
        "/home/varna/hadoop/spark/spark-3.2.4-bin-hadoop3.2/jars/spark-token-provider-kafka-0-10_2.12-3.2.4.jar",
        "/home/varna/hadoop/spark/spark-3.2.4-bin-hadoop3.2/jars/commons-pool2-2.6.2.jar",
        "/home/varna/hadoop/spark/spark-3.2.4-bin-hadoop3.2/jars/kafka-clients-2.8.1.jar",
        "/home/varna/hadoop/spark/spark-3.2.4-bin-hadoop3.2/jars/aws-java-sdk-1.11.951.jar",
        "/home/varna/hadoop/spark/spark-3.2.4-bin-hadoop3.2/jars/hadoop-aws-3.2.2.jar"
       ]

# create spark session
spark = SparkSession.builder\
    .appName('kafka_spark_consumer')\
    .master('local[2]')\
    .config("spark.jars", ','.join(jars))\
    .config("spark.hadoop.fs.s3a.access.key", "<aws_access_key>") \
    .config("spark.hadoop.fs.s3a.secret.key", "<aws_secret_key>") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")\
    .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')\
    .getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

In [3]:
# make a spark connection with kafka topic to read dstreams
KAFKA_TOPIC = 'newsTopic3'
KAFKA_BOOTSTRAP_SERVER = 'localhost:9092'

stream_df = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)\
    .option("subscribe", KAFKA_TOPIC)\
    .option("startingOffsets", "earliest")\
    .load()

In [4]:
news_df = stream_df.selectExpr("CAST(value as STRING)", "timestamp")
news_df

DataFrame[value: string, timestamp: timestamp]

In [5]:
# applying new schema
news_schema = StructType([
    StructField('author', StringType()),
    StructField('title', StringType()),
    StructField('description', StringType()),
])

news_info_df = news_df.select(from_json(col("value"), news_schema).alias("news_info"), "timestamp")\
    .select('news_info.*', "timestamp")
news_info_df

DataFrame[author: string, title: string, description: string, timestamp: timestamp]

In [None]:
# print output to console.
query = news_info_df.writeStream\
    .outputMode("append")\
    .format('csv')\
    .option("header", True)\
    .option("path", "s3a://kafka-news-etl-varkar/news_streamed_output/")\
    .option("checkpointLocation", "s3a://kafka-news-etl-varkar/checkpoint")\
    .start()

query.awaitTermination()