# Spark Streaming

In [1]:
# Создайте ключи для доступа к потоку на сайте https://apps.twitter.com
# Запустите прокси ./data/twitter.py для чтения потока твитов

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# создаем локальный контекст с двумя потоками
sc = SparkContext(master='local[2]', appName='Twitter Processing')

# создаем потоковый контескт
streaming = StreamingContext(sc, batchDuration=5)

## Читаем данные и считаем статистики

In [2]:
import json

# подключаемся к потоку твитов и загружаем данные в json формате
tweet_ds = streaming.socketTextStream(
             hostname='localhost', port=8889) \
                    .map(json.loads)

# создаем окно 30сек, обновляем даные каждые 5сек
tweet_window_ds = tweet_ds.window(30, 5)
    
# разбиваем текст на слова
words_ds = tweet_window_ds.map(lambda entry: entry['text']) \
              .flatMap(lambda line: line.lower().split())\
              .filter(lambda word: word.startswith('#'))

# считаем частоты слов
counts_ds = words_ds.map(lambda word: (word, 1)) \
              .reduceByKey(lambda x, y: x + y)

# сортируем слова по убыванию частоты 
sorted_ds = counts_ds.transform(
  lambda rdd: rdd.sortBy(lambda item: -item[1]))

# выводим сортированый список
# sorted_ds.pprint(num=10)

In [4]:
# запускаем чтение данных из потока
# streaming.start()

# продолжаем чтение до тех пор, пока не произойдет прерывание выполнения
# streaming.awaitTermination()

# Строим модель оценки тональности хештега

## Загружаем данные

In [3]:
# загружаем данные для обучения модели
sentiment_rdd = sc.textFile('../data/sentiment_sample.csv')

In [4]:
import string
from pyspark.sql import Row
from pyspark.sql import SparkSession

# контекст для создания DataFrame
spark = SparkSession.builder.getOrCreate()

def process_line(line): 
    # оставляем пробелы и латинские символы
    line = ''.join(char for char in line
                   if char in string.ascii_letters 
                   or char == ' ')
    return line.lower().split()

# создаем DataFrame
sentiment_df = sentiment_rdd\
        .map(lambda line: line.split(','))\
        .map(lambda row: Row(label=int(row[1]), 
             text=process_line(row[3]))).toDF()

In [8]:
sentiment_df.show(3, truncate=True)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    0|[is, so, sad, for...|
|    0|[i, missed, the, ...|
|    1|[omg, its, alread...|
+-----+--------------------+
only showing top 3 rows



## Препроцессинг и построение модели

In [13]:
from pyspark.ml.feature import NGram
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline 
from pyspark.ml.classification import LogisticRegression

with open('../../004/data/stopwords.txt') as src:
    stopwords_list = [word.strip() for word in src]
    
stopwords = StopWordsRemover(inputCol='text', 
                             outputCol='stopwords',
                             stopWords=stopwords_list)

ngram = NGram(n=2, 
              inputCol=stopwords.getOutputCol(),
              outputCol='ngram')

hashing = HashingTF(numFeatures=4096,
                    binary=True,
                    inputCol=ngram.getOutputCol(),
                    outputCol='hashing')

logreg = LogisticRegression(
    featuresCol=hashing.getOutputCol(),
    labelCol='label',
    regParam=1e-1)

# пайплайн обработки текста
pipeline = Pipeline(stages=[stopwords, 
                            ngram,
                            hashing,
                            logreg])

## Подбираем параметры модели

In [34]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.tuning import ParamGridBuilder

params = ParamGridBuilder()\
   .addGrid(hashing.numFeatures, [4096, 8 * 4096])\
   .addGrid(logreg.regParam, [1e-3, 1e-2, 1e-1])\
   .build()

evaluator = BinaryClassificationEvaluator(
                  metricName='areaUnderROC')

split = TrainValidationSplit(estimator=pipeline,
                             evaluator=evaluator,
                             seed=12345,
                             estimatorParamMaps=params)

eval_result = split.fit(sentiment_df)

In [35]:
import pandas as pd 
pd.DataFrame([dict([(k.name, v) for k, v in param.items()]
                   + [(evaluator.getMetricName(), metric)])
              for param, metric in zip(params, eval_result.validationMetrics)])

Unnamed: 0,areaUnderROC,numFeatures,regParam
0,0.612853,4096,0.001
1,0.612848,4096,0.01
2,0.612726,4096,0.1
3,0.674075,32768,0.001
4,0.674281,32768,0.01
5,0.675329,32768,0.1


In [36]:
# сохраняем модель с лучшим результатом
eval_result.bestModel.save('logreg.spark')

## Применяем модель на потоковых данных

In [5]:
from pyspark.ml import PipelineModel
# загружаем обученную модель (пайплайн)
model = PipelineModel.load('logreg.spark')

In [6]:
from pyspark.sql import Row
from pyspark.sql import SparkSession

# контекст для создания DataFrame
spark = SparkSession.builder.getOrCreate()

def transform_rdd(rdd):  
    # подсчитываем среднее значение тональности хештега
    
    def _get_hashtags(line):
        return [word for word in line.lower().split()
                if word.startswith('#')]
    
    # проверяем не пустой ли rdd
    if len(rdd.take(1)) == 0:
        return rdd
    
    # создаем DataFrame для получения предсказаний модели
    df = rdd.map(lambda entry: Row(
                 text=process_line(entry['text']), 
                 hashtags=_get_hashtags(entry['text']))).toDF()
    
    # получаем предсказания
    prediction_df = model.transform(df)
    
    hashtags_rdd = prediction_df.rdd\
            .flatMap(lambda row: 
                     [(hashtag, (1, row.probability[1])) 
                      for hashtag in row.hashtags])
    
    # получаем усредненную оценку тональности хештега
    result = hashtags_rdd.reduceByKey(
         lambda x, y: (x[0] + y[0], x[1] + y[1]))\
          .filter(lambda k_v: k_v[1][0] > 3)\
          .map(lambda k_v: (k_v[0], k_v[1][1] / k_v[1][0]))

    return result

# оцениваем тональность хештега в окне 30сек
tweet_window_ds.transform(transform_rdd).pprint(10)

In [None]:
# запускаем чтение данных из потока
streaming.start()

# продолжаем чтение до тех пор, пока не произойдет прерывание выполнения
streaming.awaitTermination()