**1. Смотрим на Hadoop Distributed File System**

В рамках этой части вам нужно будет обращаться к HDFS с помощью CLI, разместить файлы для следующих заданий в распределеннй файловой системе и выполнить несколько преобразований над ними.

Для работы файлы можно скачать по следующим ссылкам:
- Логи посещения сайтов юзерами за некоторый промежуток времени [ссылка](https://drive.google.com/file/d/1WXyq5WVSWwJYXPuH4kyAJ5mrR3XgfO_H/view?usp=sharing)

Разместите их в нашем внутреннем файловом хранилище с помощью HDFS CLI, для дальнейшего удобства под каждый файл стоит создать каталог с простым и понятным именем, разместить сами файлы в разных каталогах.

Набор комманд, которые вам могут в этом помочь, доступны [здесь](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/FileSystemShell.html)

В ячейках ниже должен быть полный набор комманд ваших обращей к консоли

In [32]:
## вы можете обращаться к консоли из ноутбука таким способом
!hdfs dfs -ls /

Found 6 items
drwxr-xr-x   - hdfs   hadoop          0 2025-04-15 20:51 /bible_text
drwx------   - mapred hadoop          0 2025-04-15 12:47 /hadoop
drwxrwxrwt   - hdfs   hadoop          0 2025-04-15 12:47 /tmp
drwxrwxrwt   - hdfs   hadoop          0 2025-04-15 20:54 /user
drwxr-xr-x   - hdfs   hadoop          0 2025-04-15 20:51 /user_logs
drwxrwxrwt   - hdfs   hadoop          0 2025-04-15 12:47 /var


In [33]:
%%bash
## или же использовать для этого меджик строчку в ячейке %%bash, как вам будет удобнее

hdfs dfs -ls /

Found 6 items
drwxr-xr-x   - hdfs   hadoop          0 2025-04-15 20:51 /bible_text
drwx------   - mapred hadoop          0 2025-04-15 12:47 /hadoop
drwxrwxrwt   - hdfs   hadoop          0 2025-04-15 12:47 /tmp
drwxrwxrwt   - hdfs   hadoop          0 2025-04-15 20:54 /user
drwxr-xr-x   - hdfs   hadoop          0 2025-04-15 20:51 /user_logs
drwxrwxrwt   - hdfs   hadoop          0 2025-04-15 12:47 /var


In [34]:
!hdfs dfs -ls -R -h /input

ls: `/input': No such file or directory


In [35]:
%%bash
# 1. Создаем директории в домашней папке пользователя
hdfs dfs -mkdir -p /user/ubuntu/user_logs
hdfs dfs -mkdir -p /user/ubuntu/bible_text

# 2. Загружаем файлы
hdfs dfs -put user_logs.csv /user/ubuntu/user_logs/
hdfs dfs -put bible.txt /user/ubuntu/bible_text/

# 3. Проверяем
hdfs dfs -ls -R /user/ubuntu

drwxr-xr-x   - ubuntu hadoop          0 2025-04-15 20:54 /user/ubuntu/bible_text
-rw-r--r--   1 ubuntu hadoop    4047392 2025-04-15 20:54 /user/ubuntu/bible_text/bible.txt
drwxr-xr-x   - ubuntu hadoop          0 2025-04-15 20:54 /user/ubuntu/user_logs
-rw-r--r--   1 ubuntu hadoop   36443383 2025-04-15 20:54 /user/ubuntu/user_logs/user_logs.csv


put: `/user/ubuntu/user_logs/user_logs.csv': File exists
put: `/user/ubuntu/bible_text/bible.txt': File exists


**2. Решаем задачи MapReduce**

**2.1 Подсчет слов в тексте**

В рамках данного задания вам нужно подсчитать кол-во слов в тексте Библии (файл приложен к ДЗ в чате тг), то есть необходимо реализовать базовый функционал утилиты word count.

**Важно** - подсчитывайте число только тех слов, длина которых больше 4 символов. Проводить процесс удаления знаков препинания и прочих символов **не нужно**

Ниже вам представлены ячейки, в которых вы должны описать структуру маппера/редьсюера и ниже вызвать их в bash-скрипте запуска MR-таски

In [17]:
%%writefile mapper.py
#!/usr/bin/env python3
import sys
import re

for line in sys.stdin:
    line = line.strip().split()
    for word in words:
        if len(word) > 4:
            print(f"{word.lower()}\t1")

Writing mapper.py


In [18]:
%%writefile reducer.py
#!/usr/bin/env python3
import sys

current_word = None
current_count = 0

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    
    try:
        count = int(count)
    except ValueError:
        continue
    
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print(f"{current_word}\t{current_count}")
        current_word = word
        current_count = count

if current_word == word:
    print(f"{current_word}\t{current_count}")

Writing reducer.py


В качестве проверки ваших python-скриптов до запуска MR таски можно произвести их запуск через консольные команды

Тогда наша задача не будет выполняться через датаноды, а только на локальной машине, но в случае ошибок в скриптах вы увидите их и сможете исправить

In [36]:
%%bash
# 1. Создаем тестовый файл (если нет file.txt)
echo -e "Hello World\nHello Hadoop\nGoodbye Hadoop" > file.txt

# 2. Запускаем цепочку MapReduce
cat file.txt | python3 mapper.py | sort -k1,1 | python3 reducer.py

goodbye	1
hadoop	2
hello	2
world	1


Как только в данной проверке вы получите успешный и корректный результат, можете запустить Map Reduce в ячейке ниже

In [48]:
%%bash
# Clean previous output if exists (using user directory instead of root)
hdfs dfs -rm -r /user/ubuntu/word_count_task 2> /dev/null || true

# Run the MapReduce job with correct paths
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.name="word-count" \
    -files ./mapper.py,./reducer.py \
    -mapper "python3 mapper.py" \
    -reducer "python3 reducer.py" \
    -input /user/ubuntu/bible_text/bible.txt \
    -output /user/ubuntu/word_count_task

# Check the results
hdfs dfs -cat /user/ubuntu/word_count_task/* | grep -E 'lord|god|pray' | sort -t$'\t' -k2 -nr | head -n 5

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob6447965003391667163.jar tmpDir=null
prayer	107
prayed	59
lords	42
ungodly	27
prayers	24


2025-04-15 21:28:58,176 INFO client.RMProxy: Connecting to ResourceManager at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:8032
2025-04-15 21:28:58,405 INFO client.AHSProxy: Connecting to Application History server at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:10200
2025-04-15 21:28:58,438 INFO client.RMProxy: Connecting to ResourceManager at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:8032
2025-04-15 21:28:58,439 INFO client.AHSProxy: Connecting to Application History server at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:10200
2025-04-15 21:28:58,658 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1744740277109_0011
2025-04-15 21:28:59,081 INFO mapred.FileInputFormat: Total input files to process : 1
2025-04-15 21:28:59,381 INFO mapreduce.JobSubmitter: number of splits:30
2025-04-15 21:29:00,009 INFO mapreduce.JobSubmitter: Submitting 

Мониторить процесс работы таски можно на nodemanager по порту 8088 (уже прокинут в конфиге), там будет UI, в котором будет видно вашу запущенную задачу и её статус.

Результат работы скрипта должен выглядеть следующим образом (вывод тестовый):

```bash
word count
abtr 6852
btoad 4237
stress 1932
zen 1885
```

In [52]:
%%bash
## запустите эту команду, чтобы вывести счетчик определенных слов, которые мы указали на grep
## Это нам будет необходимо для визуального анализа результата работы вашего скрипта
## в sort можете указать тот разделитель колонок, с которым у вас результат выплевывает редьюсер


## Сделал в ячейке выше
hdfs dfs -cat /user/ubuntu/word_count_task/* | grep  -E 'lord\.|god\.|pray\.' | sort -t$'\t' -k2.2nr  | head -n 3

**2.2 Решаем задачу поиска самых посещаемых сайтов**

В данном задании нужно поработать с логом данных о посещении юзерами различных сайтов.
Формат данных: `url;временная метка`. Вам нужно вывести топ 5 сайтов по посещаемости в каждую из дат, которая представлена в наших данных.

Результат работы скрипта должен выглядеть следующим образом:

```bash
date        site                            count
2024-05-25  https://gonzales-bautista.com/  987
2024-05-25  https://smith.com/              654
2024-05-25  https://www.smith.com/          321
```

**Рекомендации**

1. Вам могу пригодиться дополнительные параметры mr таски, отвечающие за настройку шаффла, и правил сортировки ключей внутри него. Почитать о примерах их использования можно [здесь](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html#More_Usage_Examples).

2. Не рекомендуем использовать `\t` в качестве символа разделителя для сложного ключа (потому что по дефолту таб используется для разделения колонок данных, и ключом в таком случае будет только первая колонка до таба). Если вы будете собирать сложный ключ для нужной вам сортировки данных, лучше всего будет использовать другие симловы, к примеру `+, =`.

3. Возможно, у вас не получится решить данную задачу за одну mr таску, тогда вы просто описываете в решении скрипты ваших мапперов, редьюсеров под каждую из mr тасок, которые вам нужно запустить для получения нужного результата.

**Важно** помнить, что любой маппер и редьюсер должен работать за O(1) памяти, и если вы будете создавать какой-то список, куда будете складывать какие-то данные, то он не должен быть размера O(n). Если такой момент в вашем решении будет, пожалуйста, поясните его текстово, что с вашими переменными все ок, и у них нет размера O(n).

In [58]:
%%writefile mapper1.py
#!/usr/bin/env python3
import sys

for line in sys.stdin:
    line = line.strip()
    if not line:
        continue
    
    try:
        url, timestamp = line.split(';')
        date = timestamp.split('T')[0]
        print(f"{date}+{url}\t1")
    except:
        continue

Overwriting mapper1.py


In [59]:
%%writefile reducer1.py
#!/usr/bin/env python3
import sys

current_key = None
current_count = 0

for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)
    
    try:
        count = int(count)
    except ValueError:
        continue
    
    if current_key == key:
        current_count += count
    else:
        if current_key:
            print(f"{current_key}\t{current_count}")
        current_key = key
        current_count = count

if current_key == key:
    print(f"{current_key}\t{current_count}")

Writing reducer1.py


In [60]:
%%writefile mapper2.py
#!/usr/bin/env python3
import sys

for line in sys.stdin:
    line = line.strip()
    if not line:
        continue
    
    try:
        key, count = line.split('\t')
        date, url = key.split('+')
        print(f"{date}\t{count}+{url}")
    except:
        continue

Writing mapper2.py


In [61]:
%%writefile reducer2.py
#!/usr/bin/env python3
import sys
from collections import defaultdict

current_date = None
url_counts = []

for line in sys.stdin:
    line = line.strip()
    date, count_url = line.split('\t', 1)
    count, url = count_url.split('+', 1)
    
    try:
        count = int(count)
    except ValueError:
        continue
    
    if current_date == date:
        url_counts.append((count, url))
    else:
        if current_date:
            url_counts.sort(reverse=True)
            for cnt, u in url_counts[:5]:
                print(f"{current_date}\t{u}\t{cnt}")
        current_date = date
        url_counts = [(count, url)]

if current_date == date:
    url_counts.sort(reverse=True)
    for cnt, u in url_counts[:5]:
        print(f"{current_date}\t{u}\t{cnt}")

Writing reducer2.py


In [62]:
%%bash
hdfs dfs -ls /user/ubuntu/user_logs/

# Clean previous output
hdfs dfs -rm -r /user/ubuntu/site_count_temp 2> /dev/null || true

# Run with verbose output
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.name="site-count" \
    -D mapreduce.job.reduces=4 \
    -files ./mapper1.py,./reducer1.py \
    -mapper "python3 mapper1.py" \
    -reducer "python3 reducer1.py" \
    -input /user/ubuntu/user_logs/user_logs.csv \
    -output /user/ubuntu/site_count_temp
    
hdfs dfs -ls /user/ubuntu/site_count_temp
hdfs dfs -cat /user/ubuntu/site_count_temp/* | head -n 5

Found 1 items
-rw-r--r--   1 ubuntu hadoop   36443383 2025-04-15 20:54 /user/ubuntu/user_logs/user_logs.csv
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob1087441329380737641.jar tmpDir=null
Found 5 items
-rw-r--r--   1 ubuntu hadoop          0 2025-04-15 21:49 /user/ubuntu/site_count_temp/_SUCCESS
-rw-r--r--   1 ubuntu hadoop    9257049 2025-04-15 21:49 /user/ubuntu/site_count_temp/part-00000
-rw-r--r--   1 ubuntu hadoop    9298852 2025-04-15 21:49 /user/ubuntu/site_count_temp/part-00001
-rw-r--r--   1 ubuntu hadoop    9277325 2025-04-15 21:49 /user/ubuntu/site_count_temp/part-00002
-rw-r--r--   1 ubuntu hadoop    9306070 2025-04-15 21:49 /user/ubuntu/site_count_temp/part-00003
2024-05-26 00:35:09.853479+http://www.knight.biz/	1
2024-05-26 00:35:16.853479+https://www.schmitt.biz/	1
2024-05-26 00:35:19.853479+http://nelson.com/	1
2024-05-26 00:35:21.853479+http://stewart.com/	1
2024-05-26 00:35:25.853479+https://lester.net/	1


2025-04-15 21:48:46,161 INFO client.RMProxy: Connecting to ResourceManager at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:8032
2025-04-15 21:48:46,384 INFO client.AHSProxy: Connecting to Application History server at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:10200
2025-04-15 21:48:46,424 INFO client.RMProxy: Connecting to ResourceManager at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:8032
2025-04-15 21:48:46,425 INFO client.AHSProxy: Connecting to Application History server at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:10200
2025-04-15 21:48:46,621 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1744740277109_0012
2025-04-15 21:48:46,982 INFO mapred.FileInputFormat: Total input files to process : 1
2025-04-15 21:48:47,076 INFO mapreduce.JobSubmitter: number of splits:30
2025-04-15 21:48:47,250 INFO mapreduce.JobSubmitter: Submitting 

In [64]:
%%bash
# Clean previous output
hdfs dfs -rm -r /user/ubuntu/top_sites_output 2> /dev/null || true

# Run with verbose output
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.name="top-sites" \
    -D mapreduce.job.reduces=1 \
    -files ./mapper2.py,./reducer2.py \
    -mapper "python3 mapper2.py" \
    -reducer "python3 reducer2.py" \
    -input /user/ubuntu/site_count_temp \
    -output /user/ubuntu/top_sites_output
    
hdfs dfs -ls /user/ubuntu/top_sites_output
hdfs dfs -cat /user/ubuntu/top_sites_output/* | head -n 20

Deleted /user/ubuntu/top_sites_output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob864865308747947462.jar tmpDir=null
Found 2 items
-rw-r--r--   1 ubuntu hadoop          0 2025-04-15 21:52 /user/ubuntu/top_sites_output/_SUCCESS
-rw-r--r--   1 ubuntu hadoop   37090328 2025-04-15 21:52 /user/ubuntu/top_sites_output/part-00000
2024-05-26 00:35:09.853479	http://www.knight.biz/	1
2024-05-26 00:35:10.853479	https://www.david-weaver.biz/	1
2024-05-26 00:35:10.853479	https://sullivan.com/	1
2024-05-26 00:35:10.853479	http://www.powers.com/	1
2024-05-26 00:35:12.853479	https://www.ortiz-hill.com/	1
2024-05-26 00:35:15.853479	https://chavez-cohen.net/	1
2024-05-26 00:35:15.853479	http://www.hanson.com/	1
2024-05-26 00:35:16.853479	https://www.schmitt.biz/	1
2024-05-26 00:35:16.853479	https://watkins-erickson.com/	1
2024-05-26 00:35:16.853479	http://mcgee.com/	1
2024-05-26 00:35:17.853479	https://brown.com/	1
2024-05-26 00:35:17.853479	http://www.butler-n

2025-04-15 21:52:04,629 INFO client.RMProxy: Connecting to ResourceManager at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:8032
2025-04-15 21:52:04,847 INFO client.AHSProxy: Connecting to Application History server at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:10200
2025-04-15 21:52:04,905 INFO client.RMProxy: Connecting to ResourceManager at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:8032
2025-04-15 21:52:04,914 INFO client.AHSProxy: Connecting to Application History server at rc1b-dataproc-m-i45rlr66g11r9d3u.mdb.yandexcloud.net/10.129.0.25:10200
2025-04-15 21:52:05,177 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1744740277109_0014
2025-04-15 21:52:05,647 INFO mapred.FileInputFormat: Total input files to process : 4
2025-04-15 21:52:05,758 INFO mapreduce.JobSubmitter: number of splits:32
2025-04-15 21:52:05,947 INFO mapreduce.JobSubmitter: Submitting 