In [2]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Streaming Process Files") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .master("local[*]") \
    .getOrCreate()

spark

In [2]:
# To allow automatic schemaInference while reading
spark.conf.set("spark.sql.streaming.schemaInference", True)

# Create the streaming_df to read from input directory
streaming_df = spark.readStream\
    .format("json") \
    .option("cleanSource", "archive") \
    .option("sourceArchiveDir", "MidTermExam_data/archive-05111740000122/") \
    .option("maxFilesPerTrigger", 3) \
    .load("MidTermExam_data/input-05111740000122/")

#read_df to show the schema
read_df = spark.read\
    .format("json") \
    .option("cleanSource", "archive") \
    .option("sourceArchiveDir", "MidTermExam_data/archive-05111740000122/") \
    .option("maxFilesPerTrigger", 3) \
    .load("MidTermExam_data/input-05111740000122/")

In [3]:
# To the schema of the data, place a sample json file and change readStream to read 
read_df.printSchema()
read_df.show(truncate=False)

root
 |-- authors: string (nullable = true)
 |-- category: string (nullable = true)
 |-- date: string (nullable = true)
 |-- headline: string (nullable = true)
 |-- link: string (nullable = true)
 |-- short_description: string (nullable = true)

+---------------------------------------+--------------+----------+----------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|authors                                |category      |date      |headline                                                                                |link                                                                                                             |short_descri

In [None]:
# Write the output to console sink to check the output
writing_df = streaming_df.writeStream \
    .format("json") \
    .option("path", "MidTermExam_data/output-05111740000122/") \
    .option("checkpointLocation","MidTermExam_data/checkpoint-05111740000122") \
    .outputMode("append") \
    .start()
    
# Start the streaming application to run until the following happens
# 1. Exception in the running program
# 2. Manual Interruption
writing_df.awaitTermination()

In [3]:
# Check the data at the output location

out_df = spark.read.json("MidTermExam_data/output-05111740000122/")
out_df.show(truncate=False)


+---------------------------------------+--------------+----------+----------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|authors                                |category      |date      |headline                                                                                |link                                                                                                             |short_description                                                                                                                                                                                       |
+---------------------------------------+--------------+