**1. Смотрим на Hadoop Distributed File System**

В рамках этой части вам нужно будет обращаться к HDFS с помощью CLI, разместить файлы для следующих заданий в распределеннй файловой системе и выполнить несколько преобразований над ними.

Для работы файлы можно скачать по следующим ссылкам:
- Логи посещения сайтов юзерами за некоторый промежуток времени [ссылка](https://drive.google.com/file/d/1WXyq5WVSWwJYXPuH4kyAJ5mrR3XgfO_H/view?usp=sharing)

Разместите их в нашем внутреннем файловом хранилище с помощью HDFS CLI, для дальнейшего удобства под каждый файл стоит создать каталог с простым и понятным именем, разместить сами файлы в разных каталогах.

Набор комманд, которые вам могут в этом помочь, доступны [здесь](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/FileSystemShell.html)

В ячейках ниже должен быть полный набор комманд ваших обращей к консоли

In [None]:
## вы можете обращаться к консоли из ноутбука таким способом
!hdfs dfs -ls /

Found 3 items
drwxr-xr-x   - root supergroup          0 2024-06-01 07:40 /input
drwxr-xr-x   - root supergroup          0 2024-06-01 07:42 /output
drwxr-xr-x   - root supergroup          0 2024-06-01 07:34 /tmp


In [None]:
%%bash
## или же использовать для этого меджик строчку в ячейке %%bash, как вам будет удобнее

hdfs dfs -ls /

Found 3 items
drwxr-xr-x   - root supergroup          0 2024-06-01 07:40 /input
drwxr-xr-x   - root supergroup          0 2024-06-01 07:42 /output
drwxr-xr-x   - root supergroup          0 2024-06-01 07:34 /tmp


In [None]:
!hdfs dfs -ls -R -h /input

-rw-r--r--   1 root supergroup         24 2024-06-01 07:40 /input/input.txt


In [None]:
## ваше решение здесь

In [1]:
%%bash
hdfs dfs -mkdir /user/ubuntu

In [2]:
%%bash

hdfs dfs -test -d /user/ubuntu/nomer1 || hdfs dfs -mkdir /user/ubuntu/nomer1
hdfs dfs -test -f /user/ubuntu/nomer1/bible.txt || hdfs dfs -put ./bible.txt /user/ubuntu/nomer1/bible.txt

In [4]:
%%bash

hdfs dfs -test -d /user/ubuntu/nomer2 || hdfs dfs -mkdir /user/ubuntu/nomer2
hdfs dfs -test -f /user/ubuntu/nomer2/Посещения_сайтов.csv || hdfs dfs -put ./Посещения_сайтов.csv /user/ubuntu/nomer2/Посещения_сайтов.csv

**2. Решаем задачи MapReduce**

**2.1 Подсчет слов в тексте**

В рамках данного задания вам нужно подсчитать кол-во слов в тексте Библии (файл приложен к ДЗ в чате тг), то есть необходимо реализовать базовый функционал утилиты word count.

**Важно** - подсчитывайте число только тех слов, длина которых больше 4 символов. Проводить процесс удаления знаков препинания и прочих символов **не нужно**

Ниже вам представлены ячейки, в которых вы должны описать структуру маппера/редьсюера и ниже вызвать их в bash-скрипте запуска MR-таски

In [5]:
%%writefile mapper.py
import sys

for stroka in sys.stdin:
    stroka = stroka.strip()
    if stroka:
        slova = stroka.split()
        for word in slova:
            if len(word)>4:
                print(word.lower() + str('\t1'))
    


Writing mapper.py


In [6]:
%%writefile reducer.py
import sys

ans_key = None
ans_count = 0

for stroka in sys.stdin:
    stroka = stroka.strip()
    if stroka:
        key, count = stroka.split('\t', 1)
        count = int(count) 
        
    if ans_key == key:
        ans_count += 1
    else:
        if ans_key:
            print(ans_key + str('\t') + str(ans_count))
        ans_count = 1
        ans_key = key
        
print(ans_key + str('\t') + str(ans_count))


Writing reducer.py


В качестве проверки ваших python-скриптов до запуска MR таски можно произвести их запуск через консольные команды

Тогда наша задача не будет выполняться через датаноды, а только на локальной машине, но в случае ошибок в скриптах вы увидите их и сможете исправить

In [7]:
%%bash
## пример запуска скриптов на неймноде для проверки их работы

cat bible.txt | python3 mapper.py | sort -k1,1 | python3 reducer.py | sort -k2,2nr | head -n 34


shall	9733
which	4244
their	3859
there	2008
before	1722
lord,	1613
against	1596
shalt	1589
children	1560
said,	1556
them,	1549
saying,	1272
house	1222
every	1208
people	1194
because	1178
thee,	1170
these	1147
saith	1135
after	1128
behold,	1073
therefore	1054
israel	1025
among	907
thine	883
neither	861
great	847
brought	815
things	780
jesus	777
should	772
according	755
israel,	724
forth	720


Как только в данной проверке вы получите успешный и корректный результат, можете запустить Map Reduce в ячейке ниже

In [8]:
%%bash
## шаблон для запуска MR таски

# обязательная чистка директории, куда будем складывать результат отрабоки mr
hdfs dfs -rm -r /user/ubuntu/nomer1/word_count_task || true

# запус mr таски с указанием пути до нужного jar
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.name="word-count" \
    -files mapper.py,reducer.py \
    -mapper "python3 mapper.py" \
    -reducer "python3 reducer.py" \
    -input /user/ubuntu/nomer1/bible.txt \
    -output /user/ubuntu/nomer1/word_count_task

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob3744988546770856471.jar tmpDir=null


rm: `/user/ubuntu/nomer1/word_count_task': No such file or directory
2025-04-16 08:00:24,964 INFO client.RMProxy: Connecting to ResourceManager at rc1d-dataproc-m-8hvn1jfvk1pddbqe.mdb.yandexcloud.net/10.130.0.11:8032
2025-04-16 08:00:25,211 INFO client.AHSProxy: Connecting to Application History server at rc1d-dataproc-m-8hvn1jfvk1pddbqe.mdb.yandexcloud.net/10.130.0.11:10200
2025-04-16 08:00:25,258 INFO client.RMProxy: Connecting to ResourceManager at rc1d-dataproc-m-8hvn1jfvk1pddbqe.mdb.yandexcloud.net/10.130.0.11:8032
2025-04-16 08:00:25,259 INFO client.AHSProxy: Connecting to Application History server at rc1d-dataproc-m-8hvn1jfvk1pddbqe.mdb.yandexcloud.net/10.130.0.11:10200
2025-04-16 08:00:25,488 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1744790037149_0006
2025-04-16 08:00:26,455 INFO mapred.FileInputFormat: Total input files to process : 1
2025-04-16 08:00:26,643 INFO mapreduce.JobSubmitter: number of split

Мониторить процесс работы таски можно на nodemanager по порту 8088 (уже прокинут в конфиге), там будет UI, в котором будет видно вашу запущенную задачу и её статус.

Результат работы скрипта должен выглядеть следующим образом (вывод тестовый):

```bash
word count
abtr 6852
btoad 4237
stress 1932
zen 1885
```

In [10]:
%%bash
## запустите эту команду, чтобы вывести счетчик определенных слов, которые мы указали на grep
## Это нам будет необходимо для визуального анализа результата работы вашего скрипта
## в sort можете указать тот разделитель колонок, с которым у вас результат выплевывает редьюсер

hdfs dfs -cat /user/ubuntu/nomer1/word_count_task/* | grep  -E 'lord\.|god\.|pray\.' | sort -t$'\t' -k2.2nr  | head -n 3

lord.	674
pray.	5


**2.2 Решаем задачу поиска самых посещаемых сайтов**

В данном задании нужно поработать с логом данных о посещении юзерами различных сайтов.
Формат данных: `url;временная метка`. Вам нужно вывести топ 5 сайтов по посещаемости в каждую из дат, которая представлена в наших данных.

Результат работы скрипта должен выглядеть следующим образом:

```bash
date        site                            count
2024-05-25  https://gonzales-bautista.com/  987
2024-05-25  https://smith.com/              654
2024-05-25  https://www.smith.com/          321
```

**Рекомендации**

1. Вам могу пригодиться дополнительные параметры mr таски, отвечающие за настройку шаффла, и правил сортировки ключей внутри него. Почитать о примерах их использования можно [здесь](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html#More_Usage_Examples).

2. Не рекомендуем использовать `\t` в качестве символа разделителя для сложного ключа (потому что по дефолту таб используется для разделения колонок данных, и ключом в таком случае будет только первая колонка до таба). Если вы будете собирать сложный ключ для нужной вам сортировки данных, лучше всего будет использовать другие симловы, к примеру `+, =`.

3. Возможно, у вас не получится решить данную задачу за одну mr таску, тогда вы просто описываете в решении скрипты ваших мапперов, редьюсеров под каждую из mr тасок, которые вам нужно запустить для получения нужного результата.

**Важно** помнить, что любой маппер и редьюсер должен работать за O(1) памяти, и если вы будете создавать какой-то список, куда будете складывать какие-то данные, то он не должен быть размера O(n). Если такой момент в вашем решении будет, пожалуйста, поясните его текстово, что с вашими переменными все ок, и у них нет размера O(n).

In [None]:
## первая вариант решения

In [11]:
%%writefile mapper2.py
import sys

for stroka in sys.stdin:
    stroka = stroka.strip()
    if not stroka:
        continue
    try:
        url, time = stroka.split(';')
        date = time.split(' ')[0]
        print(f"{date}\t{url}\t1")  
    except:
        pass  # Игнорируем некорректные строки

Writing mapper2.py


In [12]:
%%writefile reducer2.py
import sys

ans_date = None
ans_url = None
ans_count = 0

for stroka in sys.stdin:
    stroka = stroka.strip()
    if not stroka:
        continue
    stolb = stroka.split('\t')
    if len(stolb) != 3:
        continue
    date, url, count = stolb[0], stolb[1], int(stolb[2])
    
    if date == ans_date and url == ans_url:
        ans_count += count
    else:
        if ans_date:
            print(f"{ans_date}\t{ans_url}\t{ans_count}")
        ans_date = date
        ans_url = url
        ans_count = count

# Выводим последнюю запись
if ans_date:
    print(f"{ans_date}\t{ans_url}\t{ans_count}")


Writing reducer2.py


In [13]:
%%bash
# Выводим заголовок
echo -e "date\tsite\tcount"
# Обрабатываем данные
cat Посещения_сайтов.csv | python3 mapper2.py | sort -k1,1 | python3 reducer2.py | sort -k1,1 -k3,3nr | awk '$1 != prev {prev=$1; cnt=1} cnt <=5 {print; cnt++}'

date	site	count
2024-05-26	https://gonzales-bautista.com/	335
2024-05-26	http://smith.com/	235
2024-05-26	https://www.smith.com/	221
2024-05-26	https://smith.com/	212
2024-05-26	http://www.smith.com/	212
2024-05-27	https://gonzales-bautista.com/	376
2024-05-27	https://www.smith.com/	270
2024-05-27	https://smith.com/	236
2024-05-27	http://smith.com/	215
2024-05-27	http://www.smith.com/	208
2024-05-28	https://gonzales-bautista.com/	368
2024-05-28	https://smith.com/	256
2024-05-28	https://www.smith.com/	251
2024-05-28	http://smith.com/	224
2024-05-28	http://www.smith.com/	204
2024-05-29	https://gonzales-bautista.com/	402
2024-05-29	https://www.smith.com/	242
2024-05-29	http://www.smith.com/	223
2024-05-29	https://smith.com/	220
2024-05-29	http://smith.com/	206
2024-05-30	https://gonzales-bautista.com/	353
2024-05-30	https://smith.com/	246
2024-05-30	https://www.smith.com/	239
2024-05-30	http://smith.com/	229
2024-05-30	http://www.smith.com/	225
2024-05-31	https://gonzales-bautista.com/	37

In [None]:
#второе решение

In [25]:
%%writefile mapper3.py
import sys

for stroka in sys.stdin:
    try:
        url, time = stroka.strip().split(';')
        date = time.split(' ')[0]
        print(f"{date}+{url}\t1")
    except:
        # Пропускаем некорректные строки
        continue

Overwriting mapper3.py


In [44]:
%%writefile reducer3.py
import sys
from collections import defaultdict

ans_key = None
ans_count = 0
date_dict = defaultdict(list) #т.к. я буду хранить тут всегда только 5 элементов и это фиксированное число, то память останется o(1)
for stroka in sys.stdin:
    key, value = stroka.strip().split('\t')
    
    if key != ans_key:
        if ans_key:
            date, url = ans_key.split('+')
            # Добавляем в список и поддерживаем топ-5
            date_dict[date].append((url, ans_count))
            date_dict[date].sort(key=lambda x: -x[1])
            date_dict[date] = date_dict[date][:5]
        ans_key = key
        ans_count = 0
    ans_count += int(value)

# Обрабатываем последний ключ
if ans_key:
    date, url = ans_key.split('+')
    date_dict[date].append((url, ans_count))
    date_dict[date].sort(key=lambda x: -x[1])
    date_dict[date] = date_dict[date][:5]

for date in sorted(date_dict.keys()):
    for url, count in date_dict[date]:
        print(f"{date}\t{url}\t{count}")

Overwriting reducer3.py


In [29]:
%%bash
# Выводим заголовок
echo -e "date\tsite\tcount"
# Обрабатываем данные
cat Посещения_сайтов.csv | python3 mapper3.py | sort -k1,1 | python3 reducer3.py 

date	site	count
2024-05-26	https://gonzales-bautista.com/	335
2024-05-26	http://smith.com/	235
2024-05-26	https://www.smith.com/	221
2024-05-26	https://smith.com/	212
2024-05-26	http://www.smith.com/	212
2024-05-27	https://gonzales-bautista.com/	376
2024-05-27	https://www.smith.com/	270
2024-05-27	https://smith.com/	236
2024-05-27	http://smith.com/	215
2024-05-27	http://www.smith.com/	208
2024-05-28	https://gonzales-bautista.com/	368
2024-05-28	https://smith.com/	256
2024-05-28	https://www.smith.com/	251
2024-05-28	http://smith.com/	224
2024-05-28	http://www.smith.com/	204
2024-05-29	https://gonzales-bautista.com/	402
2024-05-29	https://www.smith.com/	242
2024-05-29	http://www.smith.com/	223
2024-05-29	https://smith.com/	220
2024-05-29	http://smith.com/	206
2024-05-30	https://gonzales-bautista.com/	353
2024-05-30	https://smith.com/	246
2024-05-30	https://www.smith.com/	239
2024-05-30	http://smith.com/	229
2024-05-30	http://www.smith.com/	225
2024-05-31	https://gonzales-bautista.com/	37

In [42]:
%%bash
# Очистка предыдущих результатов в HDFS (если есть)
hdfs dfs -rm -r /user/ubuntu/nomer2/url_task_2 || true

# Запуск MapReduce задачи через Hadoop Streaming
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.name="url_task_2" \
    -D mapreduce.job.reduces=1 \
    -files mapper3.py,reducer3.py \
    -mapper "python3 mapper3.py" \
    -reducer "python3 reducer3.py" \
    -input /user/ubuntu/nomer2/Посещения_сайтов.csv \
    -output /user/ubuntu/nomer2/url_task_2
    
    # добавила строчку "-D mapreduce.job.reduces=1 \" чтобы сделать вывод корректным, но по факту 
    # снизила производительность, на нашем объеме данных это не сильно влияет, но на больших данных надо думать 
    # и реализовывать что-то более сложное. Он был некорректым потому что в трех нодах были даты которые нам нужны и 
    # и он считал в каждой ноде

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob3425796163214863680.jar tmpDir=null


rm: `/user/ubuntu/nomer2/url_task_2': No such file or directory
2025-04-16 09:21:12,859 INFO client.RMProxy: Connecting to ResourceManager at rc1d-dataproc-m-8hvn1jfvk1pddbqe.mdb.yandexcloud.net/10.130.0.11:8032
2025-04-16 09:21:13,093 INFO client.AHSProxy: Connecting to Application History server at rc1d-dataproc-m-8hvn1jfvk1pddbqe.mdb.yandexcloud.net/10.130.0.11:10200
2025-04-16 09:21:13,137 INFO client.RMProxy: Connecting to ResourceManager at rc1d-dataproc-m-8hvn1jfvk1pddbqe.mdb.yandexcloud.net/10.130.0.11:8032
2025-04-16 09:21:13,138 INFO client.AHSProxy: Connecting to Application History server at rc1d-dataproc-m-8hvn1jfvk1pddbqe.mdb.yandexcloud.net/10.130.0.11:10200
2025-04-16 09:21:13,343 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1744790037149_0012
2025-04-16 09:21:13,659 INFO mapred.FileInputFormat: Total input files to process : 1
2025-04-16 09:21:13,756 INFO mapreduce.JobSubmitter: number of splits:30


In [43]:
%%bash

hdfs dfs -cat /user/ubuntu/nomer2/url_task_2/* | grep -E '2024-05-28|2024-06-02|2024-05-30' | column -t -s$'\t' 

2024-05-28  https://gonzales-bautista.com/  368
2024-05-28  https://smith.com/              256
2024-05-28  https://www.smith.com/          251
2024-05-28  http://smith.com/               224
2024-05-28  http://www.smith.com/           204
2024-05-30  https://gonzales-bautista.com/  353
2024-05-30  https://smith.com/              246
2024-05-30  https://www.smith.com/          239
2024-05-30  http://smith.com/               229
2024-05-30  http://www.smith.com/           225
2024-06-02  http://smith.com/               7
2024-06-02  https://gonzales-bautista.com/  7
2024-06-02  https://www.williams.com/       6
2024-06-02  http://lee.com/                 5
2024-06-02  http://miller.com/              5
