# Preparing the Environment

We need to make sure that the packages we're going to use are available to Spark. Instead of downloading jar files and worrying about paths, we can instead use the --packages option and specify the group/artifact/version based on what's available on Maven and Spark will handle the downloading. We specify PYSPARK_SUBMIT_ARGS for this to get passed correctly when executing from within Jupyter.

In [14]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [None]:
topic = 'test'
sendData = {"a": 1, "b": 2, "c": 3}
kafkaParams = {"metadata.broker.list": 192.468.43.68:9092,
               "auto.offset.reset": "smallest"}

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)

stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
self._validateStreamResult(sendData, stream) 

In [23]:
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#"""
# Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every 2 seconds.
# Usage: direct_kafka_wordcount.py <broker_list> <topic>
# To run this on your local machine, you need to setup Kafka and create a producer first, see
# http://kafka.apache.org/documentation.html#quickstart
# and then run the example
#    `$ bin/spark-submit --jars \
#      external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \
#      examples/src/main/python/streaming/direct_kafka_wordcount.py \
#      localhost:9092 test`
#"""

import findspark
findspark.init()
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

offsetRanges = []

def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    return rdd

def printOffsetRanges(rdd):
    for o in offsetRanges:
        print ("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))

 

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
        exit(-1)

#    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    sc = SparkContext.getOrCreate()
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 2)
    print('ssc =================== {} {}')
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.transform(storeOffsetRanges).foreachRDD(printOffsetRanges)
    lines = kvs.map(lambda x: x[1])
    print('line =================== {} {}')
    lines.pprint()
    counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
    print('count =================== {} {}')
    counts.pprint()
    
    ssc.start()
    ssc.awaitTermination()


________________________________________________________________________________________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.4.5 ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.4.5.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...

________________________________________________________________________________________________




TypeError: 'JavaPackage' object is not callable

In [13]:
import os
# os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] pyspark-shell";
# os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_181'
import findspark
findspark.init()

import cgitb
cgitb.enable()
import sys
sys.path.append('D:\Software\Apache\Spark\python')
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    os.environ['SPARK_HOME'] = 'D:\Software\Apache\Spark'
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-streaming-kafka-0-8_2.11-2.4.3.jar,spark-streaming-kafka-assembly_2.11-1.6.3.jar pyspark-shell' #note that the "pyspark-shell" part is very important!!.
#     os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.3 pyspark-shell'
#     conf = SparkConf().setAppName("PythonSparkStreamingKafka").setMaster("spark://127.0.0.1:7077")
#     conf = SparkConf().setAppName("PythonSparkStreamingKafka")
#     sc = SparkContext(appName="KafkaSpark")
#     sc = SparkContext(conf=conf)
    sc = SparkContext.getOrCreate()
    sc.setLogLevel("WARN") 
    ssc = StreamingContext(sc, 60)    
    print('ssc =================== {} {}')
#     brokers = '192.168.43.68:9092'
#     topic = 'test'
#     kvs = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": brokers})
#     lines = kvs.map(lambda x: x[1])
#     lines.pprint()
#     counts = lines.flatMap(lambda line: line.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)    
#     counts.pprint()
    
#     kafkaStream = KafkaUtils.createStream(ssc, '192.168.43.68:2181', 'groupId2', {'test':1})
#     kafkastream.pprint()
    
    ssc.start()
    ssc.awaitTermination()


________________________________________________________________________________________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.4.5 ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.4.5.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...

________________________________________________________________________________________________




TypeError: 'JavaPackage' object is not callable

In [29]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

# del os.environ["PYSPARK_SUBMIT_ARGS"]

import findspark as fs

fs.init()

## Import dependencies

We need to import the necessary pySpark modules for Spark, Spark Streaming, and Spark Streaming with Kafka. We also need the python json module for parsing the inbound twitter data

In [7]:
#    Spark
from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    json parsing
import json

## Create Spark context

The Spark context is the primary object under which everything else is called. The setLogLevel call is optional, but saves a lot of noise on stdout that otherwise can swamp the actual outputs from the job.

In [8]:
sc = SparkContext(appName="PythonSparkStreamingKafka")

sc.setLogLevel("WARN")


ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PythonSparkStreamingKafka, master=local[*]) created by __init__ at <ipython-input-3-7f0b2612f461>:1 

## Create Streaming Context

We pass the Spark context (from above) along with the batch duration which here is set to 60 seconds.

See the API reference and programming guide for more details.

In [9]:
ssc = StreamingContext(sc, 60)

## Connect to Kafka

Using the native Spark Streaming Kafka capabilities, we use the streaming context from above to connect to our Kafka cluster. The topic connected to is twitter, from consumer group spark-streaming. The latter is an arbitrary name that can be changed as required.

For more information see the documentation.

In [10]:
kafkaStream = KafkaUtils.createStream(ssc, '192.168.43.250:2181', 'groupId2', {'test2':1})


________________________________________________________________________________________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.4.5 ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.4.5.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...

________________________________________________________________________________________________




TypeError: 'JavaPackage' object is not callable

## Message Processing

The inbound stream is a DStream, which supports various built-in transformations such as map.

In [None]:
lines = kafkaStream.map(lambda x: x[1])

lines.pprint()

## Start the streaming context

Having defined the streaming context, now we’re ready to actually start it! When you run this cell, the program will start, and you’ll see the result of all the pprint functions above appear in the output to this cell below.

In [None]:
ssc.start()  

ssc.awaitTermination()