In [1]:
#Setting the environment variable for dpark submit command
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

In [2]:
#    Spark
from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json

In [3]:
#Initializing SparkContext 
sc = SparkContext(appName="PythonSparkStreamingKafka_01")
sc.setLogLevel("WARN")

In [4]:
#Creating Spark Streaming Object
ssc = StreamingContext(sc, 30)

In [5]:
#Creating a function to create Dataframe from a RDD and saving that Dataframe to Parquet Format
def readMyRddsFromKafkaStream( readRdd ):
    from pyspark.sql import SparkSession
  # Put RDD into a Dataframe
    spark = SparkSession(sc)
    df = spark.read.json( readRdd )
    df.registerTempTable( "temporary_table" )
    df = spark.sql( """
    SELECT
      *
    FROM
      temporary_table
  """ )
    if df.count()>0:
        df.show()
        df.write.format("parquet").mode("append").save("c:/Users/Saurabh/Desktop/My_Data/Parquet_Files/proto.parquet")
        #df.write.parquet("c:/Users/Saurabh/Desktop/Cerebri AI/Parquet_Files/proto.parquet")

In [6]:
#kafkaStream = KafkaUtils.createStream(ssc, 'localhost:9092', 'spark-streaming', {'numtest':1})
kafkaStream = KafkaUtils.createDirectStream(ssc, ['weatherdata'], {"metadata.broker.list": 'localhost:9092'})

In [7]:
#Loading json data from each Rdd
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

In [8]:
#Printing the data from each batch to console
parsed.count().map(lambda x:'Entries in this batch: %s' % x).pprint()

In [9]:
#Converting each RDD to Dataframe to save it into parquet file
parsed.foreachRDD( lambda myRdd: readMyRddsFromKafkaStream( myRdd ) )

In [10]:
ssc.start()
ssc.awaitTermination(timeout=180)

-------------------------------------------
Time: 2019-12-18 04:34:30
-------------------------------------------
Entries in this batch: 3

+---+------------+-------+
|aqi|        date|reading|
+---+------------+-------+
|172|3/1/10 22:00|      1|
|173|3/1/10 23:00|      2|
|174| 3/2/10 0:00|      3|
+---+------------+-------+

-------------------------------------------
Time: 2019-12-18 04:35:00
-------------------------------------------
Entries in this batch: 6

+---+-----------+-------+
|aqi|       date|reading|
+---+-----------+-------+
|174|3/2/10 1:00|      4|
|178|3/2/10 2:00|      5|
|178|3/2/10 3:00|      6|
|182|3/2/10 4:00|      7|
|180|3/2/10 5:00|      8|
|184|3/2/10 6:00|      9|
+---+-----------+-------+

-------------------------------------------
Time: 2019-12-18 04:35:30
-------------------------------------------
Entries in this batch: 6

+---+------------+-------+
|aqi|        date|reading|
+---+------------+-------+
|187| 3/2/10 7:00|     10|
|194| 3/2/10 8:00|   

In [11]:
sc.stop()