In [1]:
! pip3 install findspark textblob happybase



In [1]:
import findspark
findspark.init()

In [2]:
from __future__ import print_function

import argparse
import json
import os
import re
import sys


from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkFiles

import happybase

In [3]:
BROKER = os.getenv('KAFKA_HOST', 'kafka:9093')
TOPICS = os.getenv('KAFKA_TOPICS', 'tweets')
SPARK_MASTER = os.getenv('SPARK_MASTER', 'spark://spark-master:7077')
HBASE_MASTER = os.getenv('HBASE_MASTER', 'hbase-thrift')
BATCH_DURATION = 10

In [4]:
# conf = SparkConf() \
#             .setMaster(spark_master) \
#             .setAppName("Streamer") \
#             .set('spark.submit.pyFiles', '/tmp/libs.zip,/tmp/nltk_data.zip') \
#             .set('spark.yarn.appMasterEnv.NLTK_DATA', './nltk_data')

conf = SparkConf().setMaster(SPARK_MASTER).setAppName("Streamer")
sc = SparkContext(conf=conf)

sc.addPyFile('https://raw.githubusercontent.com/duyetdev/data-pipeline-demo/master/spark/libs/nltk.zip')
sc.addPyFile('https://raw.githubusercontent.com/duyetdev/data-pipeline-demo/master/spark/libs/textblob.zip')
sc.addPyFile('https://raw.githubusercontent.com/duyetdev/data-pipeline-demo/master/spark/libs/six.py')

sc.setLogLevel("WARN")

In [5]:
# Streaming Context
ssc = StreamingContext(sc, BATCH_DURATION)

In [6]:
kvs = KafkaUtils.createDirectStream(ssc, [TOPICS], {"metadata.broker.list": BROKER})

In [7]:
# https://datafloq.com/read/real-time-kafka-data-ingestion-into-hbase-pyspark/2658

In [8]:
def process_text(text):
    import sys
    import os
    import json
    from pyspark import SparkFiles

    # Fix distributed libs to workers
    app_dir = os.path.realpath(SparkFiles.getRootDirectory())
    for file in os.listdir(app_dir):
        if file.endswith('.zip'):
            print('Add {}'.format(file))
            sys.path.insert(0, os.path.join(app_dir, file))

    import nltk
    from nltk.corpus import stopwords
    from nltk.tokenize import TweetTokenizer
    from textblob import TextBlob

    nltk.download('brown')
    nltk.download('punkt')
    nltk.download('stopwords')
    
    stop_words = stopwords.words('english')
    stop_words += ['rt', ]
    
#     id_str, text = text.split('##')
    
    tknzr = TweetTokenizer()
    tokenized_words = tknzr.tokenize(text.lower())
    processed_text = [
        word for word in tokenized_words if word not in stop_words]

    processed_text = ' '.join(processed_text)
    sentence = TextBlob(processed_text)
    
    output = (id_str, text, processed_text, sentence.noun_phrases, sentence.sentiment.polarity, sentence.sentiment.subjectivity)
    
    # Send to Hbase
    id_str, text, processed_text, noun_phrases, polarity, subjectivity = output
    
    # Init HBase Connection
    hbase_table = 'twitter-repo'
    hconn = happybase.Connection(HBASE_MASTER)  
    ctable = hconn.table(hbase_table)

    ctable.put((
        'tweet' + id_str,
        {
            b'id': id_str,
            b'tweet': text,
            b'processed_text': processed_text,
            b'noun_phrases': noun_phrases,
            b'polarity': polarity,
            b'subjectivity': subjectivity
        }
    ))
    hconn.close()
    
    return output
    

In [9]:
lines = kvs.map(lambda x: x[1])
ssc.checkpoint("./checkpoint-tweet")

lines.count().map(lambda x: 'Tweets in this batch: %s' % x).pprint()

sentient_tweet = lines.map(lambda line: process_text(line))
sentient_tweet.pprint()

ssc.start()

ssc.awaitTermination()

-------------------------------------------
Time: 2019-11-15 14:01:00
-------------------------------------------
Tweets in this batch: 3



Py4JJavaError: An error occurred while calling o26.awaitTermination.
: org.apache.spark.SparkException: ArrayBuffer(java.nio.channels.ClosedChannelException)
	at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:140)
	at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:244)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:244)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:244)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)


In [11]:
ssc.stop()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33617)
Traceback (most recent call last):
  File "/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:33617)