# Твиты

1) В качестве брокера сообщений использовать Kafka.

2) Для работы с потоком твитов - пакет tweepy под Python.

## Подготовка

## Задача 4. Подсчет количества твитов пользователей

Напишите программу, которая подсчитывает количество твитов каждого пользователя в течение 1 мин. и в течение 10 мин. каждые 30 сек.

Выведите результат в отсортированном по убыванию виде. В списке id пользователей заменить на их screen_name.

Исходные данные:
- id пользователей: "285532415", "147964447", "34200559", "338960856", "200036850", "72525490", "20510157", "99918629"

**tweets_producer.py**

```python
# -*- coding: utf-8 -*-
import logging
import sys

import rapidjson
import tweepy

from tweepy.streaming import json
from kafka import KafkaProducer

logger = logging.getLogger('tweets_producer')
logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('[%(levelname)s] %(asctime)s  %(name)s: %(message)s'))
logger.addHandler(handler)


KAFKA_SERVERS = ('localhost:9092',)
TWEETS_KAFKA_TOPIC = 'Tweets'

CONSUMER_TOKEN = '5o0JhYy18cIGWgpOGXX8iK2Ig'
CONSUMER_SECRET = 'ie67wSeBT6ZxjLOuXD1ICXV7uE9gOjfUlK6TR7IrYZmfwG1DUF'
ACCESS_TOKEN = '4196894355-B58LIK7zQF0Hb061cNcRRxAff6mQaC1EA2XL3Hb'
ACCESS_SECRET = 'qcrG7SK1fVprCOgbT2dqbNCTMLkVuSwdmOE2XwcewT7OT'

USERS_IDS = ('285532415', '147964447', '34200559', '338960856', '200036850', '72525490', '20510157', '99918629')


class BaseListener(tweepy.StreamListener):

    def __init__(self):
        self.kafka_producer = KafkaProducer(bootstrap_servers=KAFKA_SERVERS)

    def on_status(self, status):
        logger.warning('Status: %s', status.text)

    def on_error(self, status_code):
        logger.warning(f'Error: %s', status_code)


class TweetsStreamListener(BaseListener):

    def on_data(self, raw_data):
        data = rapidjson.loads(raw_data)

        tid = data.get('id', None)
        if not tid:
            logger.warning('Tweet ID was not found in data: %s', raw_data)
            return True

        user = data.get('user', None)
        if not user:
            logger.warning('User was not found in data: %s', raw_data)
            return True

        screen_name, uid = user['screen_name'], user['id']

        logger.info(
            'Received 1 tweet from @%s[%s]. Send to Kafka topic `%s`',
            screen_name, uid, TWEETS_KAFKA_TOPIC
        )

        self.kafka_producer.send(TWEETS_KAFKA_TOPIC,
            rapidjson.dumps({
                'tid': tid,
                'screen_name': screen_name,
            }).encode('utf-8')
        )
        return True


def main():
    auth = tweepy.OAuthHandler(CONSUMER_TOKEN, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

    api = tweepy.API(auth)

    tweets_stream = tweepy.Stream(auth=api.auth, listener=TweetsStreamListener())
    logger.info('Start tweets receiving...')
    tweets_stream.filter(follow=USERS_IDS)


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        logger.warning('Was stopped by user request')

```

**tweets_counter_per_timeunit.py**

```python
# -*- coding: utf-8 -*-
import sys

import rapidjson

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

ZOOKEEPER_SERVER = 'localhost:2181'
KAFKA_TOPIC = 'Tweets'

APP_NAME = 'UsersTweetsCounter'
BATCH_DURATION_SEC = 30

MINUTE = 60
TEN_MINUTES = MINUTE * 10

# Init streaming
spark_context = SparkContext(appName=APP_NAME)
spark_context.setLogLevel('ERROR')

streaming_context = StreamingContext(spark_context, BATCH_DURATION_SEC)
streaming_context.checkpoint(f'{APP_NAME}__checkpoint')

# Process tweets
kafka_stream = KafkaUtils.createStream(
	streaming_context,
	ZOOKEEPER_SERVER,
	f'{APP_NAME}__consumers_group',
	{KAFKA_TOPIC: 1},
)

def ssum(a, b):
	return a + b

# [
#	(None, '{"tid":1074310692235292672,"screen_name":"Odev110"}'),
#	(None, '{"tid":1074310709134139392,"screen_name":"kirillica1957"}')
#	(None, '{"tid":1074310692235292673,"screen_name":"Odev110"}')
# ] => [
#	'Odev110',
# 	'kirillica1957',
# 	'Odev110'
# ]
screen_names = kafka_stream.map(lambda data: rapidjson.loads(data[1])['screen_name'])

# [
#	'Odev110',
# 	'kirillica1957',
# 	'Odev110'
# ] => [
# 	('Odev110', 2),
#	('kirillica1957', 1)
# ]
screen_name__count = screen_names.map(lambda name: (name, 1)).reduceByKey(ssum)

def print_windowed(data_stream, func, time):
	windowed_data = data_stream.reduceByKeyAndWindow(func, None, windowDuration=time, slideDuration=time)
	windowed_data.transform(lambda rdd: rdd.coalesce(1).sortByKey(ascending=False)).pprint(100)

print_windowed(screen_name__count, ssum, MINUTE)
print_windowed(screen_name__count, ssum, TEN_MINUTES)

# Start and deinit later
streaming_context.start()
streaming_context.awaitTermination()

```

**run_spark.sh**

```bash
#!/usr/bin/env bash

source /opt/rh/rh-python36/enable

export PYSPARK_PYTHON=python
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_DRIVER_PYTHON_OPTS=""
spark2-submit --master local[2] $1
```

Проверим подсчет твитов за первую и четвертую минуты.

**Первая минута**
```text
2018-12-16 06:58:23,198 [INFO] tweets_producer: Start tweets receiving...
2018-12-16 06:58:26,898 [INFO] tweets_producer: Received 1 tweet from @SultanIskander[999348547802853378]
2018-12-16 06:58:29,764 [INFO] tweets_producer: Received 1 tweet from @mlTerNw9oVRhAl5[3134768049]
2018-12-16 06:58:33,962 [INFO] tweets_producer: Received 1 tweet from @a_weuve[4385767653]
2018-12-16 06:58:34,987 [INFO] tweets_producer: Received 1 tweet from @rentvchannel[200036850]
2018-12-16 06:58:35,909 [INFO] tweets_producer: Received 1 tweet from @_kmv_[161254913]
2018-12-16 06:58:52,534 [INFO] tweets_producer: Received 1 tweet from @proffmed[849698890173612032]
2018-12-16 06:58:52,907 [INFO] tweets_producer: Received 1 tweet from @KZ8JA4mRYTO78Uj[1071064400340140033]
```
```text
-------------------------------------------
Time: 2018-12-16 06:59:00
-------------------------------------------
('rentvchannel', 1)
('proffmed', 1)
('mlTerNw9oVRhAl5', 1)
('a_weuve', 1)
('_kmv_', 1)
('SultanIskander', 1)
('KZ8JA4mRYTO78Uj', 1)
```

**Четвертая минута**
```text
2018-12-16 07:01:07,566 [INFO] tweets_producer: Received 1 tweet from @nehueviy[2608064537]         <-- 1
2018-12-16 07:01:13,932 [INFO] tweets_producer: Received 1 tweet from @doctortima[473824335]
2018-12-16 07:01:19,341 [INFO] tweets_producer: Received 1 tweet from @VasiliyRakovich[1340003623]
2018-12-16 07:01:20,811 [INFO] tweets_producer: Received 1 tweet from @nehueviy[2608064537]         <-- 2
2018-12-16 07:01:27,225 [INFO] tweets_producer: Received 1 tweet from @vesti_news[72525490]
2018-12-16 07:01:33,154 [INFO] tweets_producer: Received 1 tweet from @nehueviy[2608064537]         <-- 3
2018-12-16 07:01:45,351 [INFO] tweets_producer: Received 1 tweet from @pegass1[209599139]
2018-12-16 07:01:47,194 [INFO] tweets_producer: Received 1 tweet from @odesit741[2513730044]
2018-12-16 07:01:48,731 [INFO] tweets_producer: Received 1 tweet from @VStruncov[335556970]
2018-12-16 07:01:52,109 [INFO] tweets_producer: Received 1 tweet from @alexiKulik[881746967042437121]
2018-12-16 07:01:53,338 [INFO] tweets_producer: Received 1 tweet from @AMilena8891[453810289]
```
```text
-------------------------------------------
Time: 2018-12-16 07:02:00
-------------------------------------------
('vesti_news', 1)
('pegass1', 1)
('odesit741', 1)
('nehueviy', 3)       <-- 3
('doctortima', 1)
('alexiKulik', 1)
('VasiliyRakovich', 1)
('VStruncov', 1)
('AMilena8891', 1)
```

Для проверки подсчета твитов за 10 минут напишем скрипт, работающий с выводом **tweets_producer.py**.

**producer_counter.py**
```python
#!/usr/local/bin/python3.6
import re
import sys

screen_name_pattern = re.compile(r'@(.*)\[')


def main():
	screen_names = {}

	# 2018-12-16 07:01:53,338 [INFO] tweets_producer: Received 1 tweet from @AMilena8891[453810289]
	for line in sys.stdin:
		if 'tweet from' not in line:
			continue
		line = line.rstrip()

		name = re.search(screen_name_pattern, line)
		if name:
			name = name.group(1)
		else:
			print(f'Screen name was not found in "{line}"')
			continue

		screen_names[name] = screen_names.get(name, 0) + 1

	print(f'\n'.join(
		map(str, reversed(sorted(screen_names.items(), key=lambda item: item[0])))
	))


if __name__ == '__main__':
	main()
```

Сверим подсчет твитов на основе лога **tweets_producer.py** и на основе лога **tweets_counter_per_timeunit.py** (Spark Streaming).

<table>
    <tr>
        <td>
<br><br>
$ cat producer.out | ./producer_checker.py<br>
('zharaleksan', 1)<br>
('ygodinka', 1)<br>
('wou4ik', 1)<br>
('vesti_news', 2)<br>
('uporotyisterh', 1)<br>
('tab2334', 1)<br>
('t_Wankan', 1)<br>
('slav_sh', 1)<br>
('sergey0905', 1)<br>
('rentvchannel', 1)<br>
('ramil_b_80', 1)<br>
('proffmed', 1)<br>
('pet19784', 1)<br>
('pegass1', 1)<br>
('pawelwah', 1)<br>
('odesit741', 1)<br>
('oZJnhO5zRr5Fg7c', 1)<br>
('oCaealqhhAJEfqk', 1)<br>
('ntvru', 2)<br>
('nehueviy', 5)<br>
('mlTerNw9oVRhAl5', 1)<br>
('kryzhanskyi', 3)<br>
('kozenna', 1)<br>
('kirasir61', 2)<br>
('isv_yamal', 1)<br>
('interfax_news', 1)<br>
('gziks1', 1)<br>
('gotovceva', 1)<br>
('goreshek', 3)<br>
('galla4512', 1)<br>
('friendlybus', 1)<br>
('fedyusha', 1)<br>
('eqvival', 2)<br>
('eduard_kenig', 1)<br>
('doctortima', 1)<br>
('dmitrij7', 1)<br>
('denisvxbk', 1)<br>
('climanseur', 1)<br>
('c1eHA9qsKhbG3Pn', 1)<br>
('astahov777', 1)<br>
('asLcqdxsZKLAjJD', 1)<br>
('anatoliy156794', 1)<br>
('alexiKulik', 1)<br>
('adgast', 1)<br>
('a_weuve', 1)<br>
('_kmv_', 1)<br>
('YMGT373', 3)<br>
('VasiliyRakovich', 1)<br>
('VStruncov', 3)<br>
('VOROBEEY', 1)<br>
('VLagunov', 3)<br>
('UkrSlavemarket', 1)<br>
('Traveller_ru', 1)<br>
('TikhonovVal', 1)<br>
('Tatyana_A2', 1)<br>
('Szaj4enko', 1)<br>
('SultanIskander', 6)<br>
('Sirius_st', 1)<br>
('Sasha_Zatonsky', 1)<br>
('SantaNikol', 1)<br>
('SO7fHpk6HBBgQUK', 1)<br>
('SMI_TASS', 1)<br>
('SEWEROKs', 2)<br>
('RuslanLutcenko', 1)<br>
('ProstoNatasha77', 1)<br>
('Povar444', 1)<br>
('PkJY2g8QyTGcH56', 2)<br>
('NekuHatsumi', 1)<br>
('Mr_AMV', 1)<br>
('Lebirdushka', 1)<br>
('Kag687WN3Wi6SHr', 1)<br>
('KZ8JA4mRYTO78Uj', 2)<br>
('Honey_in_Blue', 1)<br>
('Gjhecmz', 1)<br>
('GirafeMarius', 1)<br>
('GalSockolowa', 1)<br>
('Evgenia6849', 1)<br>
('Eleech38', 1)<br>
('Egorwolfen', 1)<br>
('DyakonovaLyubov', 1)<br>
('Dmitriy0709', 1)<br>
('DNMotovilov', 1)<br>
('BulentKasapolu2', 1)<br>
('AnnaAlchimie', 1)<br>
('Anatoliy_A_Z', 1)<br>
('AMilena8891', 2)<br>
('916_71', 1)<br>
('8127682', 1)<br>
('47Angelina', 1)<br>
('46Ieliena', 1)<br>
('1BMlo42WfteoBF7', 1)<br>
('13tucha1', 2)<br>
        </td>
        <td>
-------------------------------------------<br>
Time: 2018-12-16 07:08:00<br>
-------------------------------------------<br>
('zharaleksan', 1)<br>
('ygodinka', 1)<br>
('wou4ik', 1)<br>
('vesti_news', 2)<br>
('uporotyisterh', 1)<br>
('tab2334', 1)<br>
('t_Wankan', 1)<br>
('slav_sh', 1)<br>
('sergey0905', 1)<br>
('rentvchannel', 1)<br>
('ramil_b_80', 1)<br>
('proffmed', 1)<br>
('pet19784', 1)<br>
('pegass1', 1)<br>
('pawelwah', 1)<br>
('odesit741', 1)<br>
('oZJnhO5zRr5Fg7c', 1)<br>
('oCaealqhhAJEfqk', 1)<br>
('ntvru', 2)<br>
('nehueviy', 5)<br>
('mlTerNw9oVRhAl5', 1)<br>
('kryzhanskyi', 3)<br>
('kozenna', 1)<br>
('kirasir61', 2)<br>
('isv_yamal', 1)<br>
('interfax_news', 1)<br>
('gziks1', 1)<br>
('gotovceva', 1)<br>
('goreshek', 3)<br>
('galla4512', 1)<br>
('friendlybus', 1)<br>
('fedyusha', 1)<br>
('eqvival', 2)<br>
('eduard_kenig', 1)<br>
('doctortima', 1)<br>
('dmitrij7', 1)<br>
('denisvxbk', 1)<br>
('climanseur', 1)<br>
('c1eHA9qsKhbG3Pn', 1)<br>
('astahov777', 1)<br>
('asLcqdxsZKLAjJD', 1)<br>
('anatoliy156794', 1)<br>
('alexiKulik', 1)<br>
('adgast', 1)<br>
('a_weuve', 1)<br>
('_kmv_', 1)<br>
('YMGT373', 3)<br>
('VasiliyRakovich', 1)<br>
('VStruncov', 3)<br>
('VOROBEEY', 1)<br>
('VLagunov', 3)<br>
('UkrSlavemarket', 1)<br>
('Traveller_ru', 1)<br>
('TikhonovVal', 1)<br>
('Tatyana_A2', 1)<br>
('Szaj4enko', 1)<br>
('SultanIskander', 6)<br>
('Sirius_st', 1)<br>
('Sasha_Zatonsky', 1)<br>
('SantaNikol', 1)<br>
('SO7fHpk6HBBgQUK', 1)<br>
('SMI_TASS', 1)<br>
('SEWEROKs', 2)<br>
('RuslanLutcenko', 1)<br>
('ProstoNatasha77', 1)<br>
('Povar444', 1)<br>
('PkJY2g8QyTGcH56', 2)<br>
('NekuHatsumi', 1)<br>
('Mr_AMV', 1)<br>
('Lebirdushka', 1)<br>
('Kag687WN3Wi6SHr', 1)<br>
('KZ8JA4mRYTO78Uj', 2)<br>
('Honey_in_Blue', 1)<br>
('Gjhecmz', 1)<br>
('GirafeMarius', 1)<br>
('GalSockolowa', 1)<br>
('Evgenia6849', 1)<br>
('Eleech38', 1)<br>
('Egorwolfen', 1)<br>
('DyakonovaLyubov', 1)<br>
('Dmitriy0709', 1)<br>
('DNMotovilov', 1)<br>
('BulentKasapolu2', 1)<br>
('AnnaAlchimie', 1)<br>
('Anatoliy_A_Z', 1)<br>
('AMilena8891', 2)<br>
('916_71', 1)<br>
('8127682', 1)<br>
('47Angelina', 1)<br>
('46Ieliena', 1)<br>
('1BMlo42WfteoBF7', 1)<br>
('13tucha1', 2)<br>
        </td>
    </tr>
</table>

Таким образом, программа верно выводит количество твитов пользователя **за очередную минуту** и **за очередные 10 минут**,

при этом подсчет выполняется **каждые 30 секунд**.