<div style="font-size:18pt; padding-top:20px; text-align:center"><b>Spark Streaming and </b> <span style="font-weight:bold; color:green">Twitter</span></div><hr>
<div style="text-align:right;">Sergei Yu. Papulin <span style="font-style: italic;font-weight: bold;">(papulin_bmstu@mail.ru, papulin_hse@mail.ru)</span></div>

In [None]:
sudo yum install kafka

In [None]:
sudo yum install kafka-server

In [None]:
sudo nano /etc/kafka/conf/server.properties 

In [None]:
sudo service kafka-server start

In [None]:
kafka-topics --create --zookeeper localhost:2181 --topic tweets-kafka --partition 1 --replication-factor 1

In [None]:
kafka-console-consumer --zookeeper localhost:2181 --topic tweets-kafka --from-beginning

In [None]:
kafka-console-producer --broker-list localhost:9092 -topic tweets-kafka

In [None]:
sudo pip install kafka-python

In [None]:
sudo pip install tweepy

In [None]:
# -*- coding: utf-8 -*-

import tweepy
from tweepy.streaming import json
from kafka import KafkaProducer


producer = KafkaProducer(bootstrap_servers="localhost:9092")
topic_name = "tweets-kafka"


class MyStreamListener(tweepy.StreamListener):
    
    def on_data(self, raw_data):

        data = json.loads(raw_data)

        if "extended_tweet" in data:
            text = data["extended_tweet"]["full_text"]
            print(text)
            producer.send(topic_name, text.encode("utf-8"))
        else:
            if "text" in data:
                text = data["text"].lower()
                print(data["text"])
                producer.send(topic_name, data["text"].encode("utf-8"))


consumer_token = ""
consumer_secret = "" 
access_token = ""
access_secret = ""

auth = tweepy.OAuthHandler(consumer_token, consumer_secret)
auth.set_access_token(access_token, access_secret)

api = tweepy.API(auth)

myStreamListener = MyStreamListener()

myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)

region = [34.80, 49.87, 149.41, 74.13]

myStream.filter(locations=region)

In [None]:
# -*- coding: utf-8 -*-

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

zk_server = "localhost:2181" # сервер Zookeeper
topic = "tweets-kafka" # топик

# Функция для обновления значений количества слов
def updateTotalCount(currentCount, countState):
    if countState is None:
        countState = 0
    return sum(currentCount, countState)

# Создаем Spark Context
sc = SparkContext(appName="KafkaTwitterWordCount")

sc.setLogLevel("OFF")

# Создаем Streaming Context
ssc = StreamingContext(sc, 10)

# Объявляем checkpoint и указываем директорию в HDFS, где будут храниться значения
ssc.checkpoint("tmp_spark_streaming1")

# Создаем подписчика на поток от Kafka c топиком topic = "tweets-kafka"
kafka_stream = KafkaUtils.createStream(ssc, zk_server, "spark-streaming-consumer", {topic: 1})

# Трансформируем мини-batch 
lines = kafka_stream.map(lambda x: x[1])

# Подсчитывем количество слов для мини-batch
counts = lines.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda x1, x2: x1 + x2)

# Обновляем значения количества слов с учетом нового мини-batch
totalCounts = counts.updateStateByKey(updateTotalCount)

totalCountsSorted = totalCounts.transform(lambda x_rdd: x_rdd.sortBy(lambda x: -x[1]))

# Выводим текущий результат
totalCountsSorted.pprint()

# Запускаем Spark Streaming
ssc.start()

# Ожидаем остановку
ssc.awaitTermination()

In [None]:
http://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html
https://kafka.apache.org/0100/documentation/#configuration
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html