In [None]:
import os
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 pyspark-shell'

from pyspark import SparkConf
from pyspark import  SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
import pyspark

# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 pyspark-shell'



In [None]:
spark_conf = SparkConf() \
            .setMaster("spark://spark-master:7077") \
            .setAppName("MyApp3") \
            .setAll([('spark.executor.memory', '4g'),
                     ('spark.executor.cores', '4'), 
                     ('spark.cores.max', '4'), 
                     ('spark.driver.memory','4g'),
                     ('spark.driver.host','192.168.96.3')
            ])

spark = SparkSession.builder \
                    .config(conf=spark_conf) \
                    .enableHiveSupport() \
                    .getOrCreate()

In [None]:
%%capture
sc = SparkContext(conf=spark_conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 5)

In [None]:
topic = "mytopic"
broker = "kafka:9092"

directKafkaStream = KafkaUtils.createDirectStream(ssc, topics=[topic], kafkaParams={"metadata.broker.list": broker})

In [None]:
from pprint import pprint
offsetRanges = []

def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    return rdd

def printOffsetRanges(rdd):
    for o in offsetRanges:
        print("o: ", o)
        pprint(vars(o))
        print(o.topic, o.value, o.partition, o.fromOffset, o.untilOffset)


directKafkaStream.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
ssc.start() 
ssc.awaitTermination() 

In [None]:
query = directKafkaStream \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

In [None]:
ssc = StreamingContext(sc,2)
topic = "mytopic"
# kvs = KafkaUtils.createStream(ssc,"kafka:9092","raw-event-streaming-consumer",{topic:1})
dks = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

# lines = kvs.map(lambda x: x[1])
lines = dks.map(lambda x: x[1])

lines.pprint()
ssc.start()
ssc.awaitTermination()

In [None]:
words = directKafkaStream.map(lambda x: x[1]).flatMap(lambda x: x.split(" "))
wordcount = words.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)

wordcount.pprint()

ssc.start()
ssc.awaitTermination()

In [22]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#original input schema
jsonSchema = (
  StructType()
  .add("timestamp", TimestampType()) #event time at the source
  .add("deviceId", LongType())
  .add("deviceType", StringType())
  .add("signalStrength", DoubleType())
)
# modified schema with added columns since we are 
# doing some ETL (transforming and adding extra columns)
# this transformed data will be stored into parquet files
# from which an SQL table can be created for consumption or
# report generation
parquetSchema = (
  StructType()
  .add("timestamp", TimestampType()) #event time at the source
  .add("deviceId", LongType())
  .add("deviceType", StringType())
  .add("signalStrength", DoubleType())
  .add("INPUT_FILE_NAME", StringType()) #file name from which this data item was read
  .add("PROCESSED_TIME", TimestampType())) #time at the executor while processing

In [None]:
inputDF = ( spark 
          .readStream 
          .format("kafka")
          .schema(jsonSchema) 
          .option("maxFilesPerTrigger", 1)  #slow it down for tutorial
          .option("badRecordsPath", bad_records_path) #any bad records will go here
          .json(sensor_path) #the source
          .withColumn("INPUT_FILE_NAME", input_file_name()) #maintain file path
          .withColumn("PROCESSED_TIME", current_timestamp()) #add a processing timestamp at the time of processing
          .withWatermark("PROCESSED_TIME", "1 minute") #optional: window for out of order data
         )

In [None]:
!pyspark --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 

In [41]:
parsed = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("subscribe", "topic1")
  .load()
#   .select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
)

parsed.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

DataFrame[key: string, value: string]

In [37]:
type(parsed)

pyspark.sql.dataframe.DataFrame

In [38]:
parsed.toPandas()

AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

In [None]:
parsed.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()


In [None]:
inputDF = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "topic1") \
  .load()
    
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


In [None]:
output_path = "./output_path/"
checkpoint_path = "./checkpoint_path/"

query = (inputDF
         .writeStream
         .format("parquet") #our sink to save it for posterity or batch queries if needed
         .option("path", output_path)
         .option("checkpointLocation", checkpoint_path) # add checkpointing for resiliency
         .outputMode("append")
         .start() 
        )

In [None]:
ssc.start()
ssc.awaitTermination()

In [None]:
df

In [None]:
df.printSchema()

In [42]:
spark = SparkSession.builder \
                    .config(conf=spark_conf) \
                    .getOrCreate()

In [44]:
trans_det_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("subscribe", "topic1") \
        .option("startingOffsets", "latest") \
        .load() \
        .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")

In [None]:
query = trans_det_df.writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+---------+-----+
|value|timestamp|topic|
+-----+---------+-----+
+-----+---------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------+-----------------------+------+
|value       |timestamp              |topic |
+------------+-----------------------+------+
|next message|2021-08-05 16:06:29.073|topic1|
+------------+-----------------------+------+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------+-----------------------+------+
|value       |timestamp              |topic |
+------------+-----------------------+------+
|next message|2021-08-05 16:06:46.776|topic1|
+------------+-----------------------+------+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------+-----------------------+------+
|value 