# Распределенные вычисления ДЗ-2 | Шамаев Онар Евгеньевич 

## Постановка задачи

Необходимо в потоковом формате считывать последнюю активность сообщества r/AskReddit платформы Reddit.
Целью будет найти самые встречающиеся слова.
Необходимо использовать Spark Streaming сохраняя данные на HDFS.

## План
1. Создание TCP сервера (RSS читателя заголовков)
2. Запуск HDFS
3. Написать MapReduce для Spark Streaming
4. Кеширование
5. Результаты

## 1. Создание TCP сервера (RSS читателя заголовков)

Идея технологии RSS заключается в том, чтобы предоставлять последние N текстовых информативных блоков чего-либо. Например на новостных сайтах - это краткая сводка о последних события. В случае Reddit - это список последних назвний последних тем, поднимаемых пользователями в сообществе r/AskReddit.

_Импорты библиотек._

In [1]:
import feedparser
import socket
import time

Последние новости (в RSS формате) доступны по ссылке:
`https://www.reddit.com/r/AskReddit/new/.rss`

In [2]:
rss_link = 'https://www.reddit.com/r/AskReddit/new/.rss'

Определим программу читателя новостной ленты RSS и извлекающей оттуда заголовки, создающий TCP сокет и отправляющий их туда.

In [3]:
encoding = 'utf-8'


def rss2tcp_reader(ip, port, interval_sec: float = 30.0, ttl=None):
    last_stamp: time.struct_time | None = None
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((ip, port))
    s.listen(1)

    print(f'Awaiting connection at {ip}:{port}...')
    conn, addr = s.accept()
    print('Connected by ', addr)

    try:
        while True:
            if (ttl is not None) and last_stamp > ttl:
                print('Time to live passed. Exiting.')
                break

            feed = feedparser.parse(rss_link)
            if feed.status != 200:
                print(f'Bad response {feed.status} received! Exiting.')
                break

            _mx_stmp = None
            for i, entry in enumerate(feed.entries):
                title = entry.title
                time_stamp = entry.published_parsed

                if (last_stamp is None) or (last_stamp < time_stamp):
                    if _mx_stmp is None: _mx_stmp = time_stamp
                else:
                    break

                conn.send(title.encode(encoding) + b'\n')
            if _mx_stmp is not None:
                last_stamp = _mx_stmp

            time.sleep(interval_sec)
    except Exception as ex:
        print(f'Exception happened {ex}')
    finally:
        s.close()
        print(f'Closed connection')

Данная программа представлена отдельно в файле `tcp_server.py`. Запустим ее независмо, чтобы можно было запускать ячейки ниже.

## 2. Запуск HDFS

Воспользуемся наработками 1 дз. Поднимем HDFS кластер из 1 namenode и 2 datanode с помощью Docker.

Запустим кластер из папки `compose` командой `docker-compose -f "docker.compose.yml" up -d`.

UI кластера доступен по адресу `http://localhost:9870/`.

Сама HDFS доступна по адресу `hdfs://localhost:8020`.

In [4]:
hdfs = 'hdfs://localhost:8020'

## 3. Написать MapReduce для Spark Streaming

_Импорты библиотек._

In [5]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import os

os.environ['PYSPARK_PYTHON'] = 'python'

Создадим Спарк-контекст и Стриминговый контекст.

In [6]:
from pyspark.sql import SparkSession

ctx = SparkContext(master="local[2]", appName="AskRedditWordCounter")
ctx._jsc.hadoopConfiguration().set("dfs.client.use.datanode.hostname", "true")
spark = SparkSession(ctx)

In [7]:
batchDuration = 30
ctx_stream = StreamingContext(ctx, batchDuration)
ctx_stream.checkpoint(hdfs+'/rdd_cp')



PySpark UI доступен по адресу `http://localhost:4040`.

Будем использовать DStream, считывающий данные по TCP соединению, созданном нашим RSS читателем.

In [8]:
ip = 'localhost'
port = 9999

d_stream = ctx_stream.socketTextStream(ip, port)

Определим фильтр, для подготовки к разбиению заголовков на слова, и непосредственно разделитель на слова.

In [9]:
import string


def prepare(line: str) -> str:
    return line.translate({key: ' ' for key in string.punctuation + '?!()$@/'}).lower()


def word_splitter(line: str) -> list[str]:
    return line.split(' ')

Определим обработчик RDD по топ, получаемых DStream.

In [10]:
from pyspark import RDD

def rdd_handler(time_, rdd: RDD):
    if rdd.isEmpty():
        return

    print(f'RDD handled at {time_}')
    df = rdd.toDF(schema=["word", "count"])
    df.write.format('json').mode('overwrite').save(hdfs + '/askReddit.json')

def rdds_summator(new_values, old_state):
    print(f'RDD state updated')
    return sum(new_values) + (old_state or 0)

Определим порядок действий джобы Spark Stream:
- разделяем входные заголовки на слова;
- map в пару (слово, количество), где количество всегда равно 1;
- обновление состояния по ключу (reduce) путем сложения количеств;
- запись в кластер hdfs.

In [11]:
word_stream = d_stream.flatMap(lambda line: word_splitter(prepare(line))) \
                    .map(lambda word: (word, 1)) \
                    .reduceByKey(lambda x, y: x + y) \
                    .updateStateByKey(rdds_summator) \
                    .foreachRDD(rdd_handler)

Запустим джобу. Теперь она будет работать на фоне и собирать статистику встречаемых слов.

In [None]:
ctx_stream.start()
duration = 5 * 60
ctx_stream.awaitTermination(duration)

In [None]:
df = spark.read.json(hdfs + "/askReddit.json")

# Преобразование Spark DataFrame в Pandas DataFrame
pandas_df = df.toDF()

# Сортировка данных по количеству вхождений
pandas_df = pandas_df.sort_values(by="count", ascending=False)

# Построение графика
plt.figure(figsize=(10, 6))
plt.bar(pandas_df["word"][:10], pandas_df["count"][:10])  # Топ-10 слов
plt.xlabel("Word")
plt.ylabel("Count")
plt.title("Top 10 Words")
plt.xticks(rotation=45)
plt.show()