<a target="_blank" href="../cluster" style="font-size:20px">All Applications (YARN)</a>

# MapReduce

We will use the logs of listening to music artists in the Yandex.Music service.

The `events.csv` file contains entries like `User,Artist,Number of plays,Number of skips`:
```csv
userId,artistId,plays,skips
0,335,1,0
0,708,1,0
0,710,2,1
0,815,1,1
```

We need to do the following:
1. **Leave in the data only those users for whom the sum of plays is strictly greater than 1000. How many such users?**
2. **In the data filtered at the first step, find the 5 most popular performers by the number of users (identifiers).**

Details:
1. Let's assume that a single user's playlist always fits in memory.

Save the solution to the `result.json` file.

In [1]:
# file content example
! head -n 5 yandex_music/events.csv

userId,artistId,plays,skips
0,335,1,0
0,708,1,0
0,710,2,1
0,815,1,1


In [2]:
# copy files to HDFS
! hadoop fs -copyFromLocal yandex_music /
! hadoop fs -ls -h /yandex_music


Found 3 items
-rw-r--r--   1 jovyan supergroup        254 2023-12-13 13:45 /yandex_music/README.txt
-rw-r--r--   1 jovyan supergroup      3.7 M 2023-12-13 13:45 /yandex_music/artists.jsonl
-rw-r--r--   1 jovyan supergroup     47.6 M 2023-12-13 13:45 /yandex_music/events.csv


In [57]:
# sample
! head -n 10000 yandex_music/events.csv > yandex_music/evants_sample.csv

In [90]:
! wc -l yandex_music/events.csv

3412505 yandex_music/events.csv


In [52]:
%%file mapper.py
import csv
import sys


frow = True

for line in sys.stdin:
    if frow:
        frow = False
        continue
    
    element = line.strip().split(',')
    print(element[0] + "\t" + element[2])
    
    

Overwriting mapper.py


In [64]:
%%file reducer.py
import sys

current_user = None
total_plays = 0
user_ids = []

for line in sys.stdin:
    user_id, plays = line.strip().split('\t')

    if current_user != user_id:
        if current_user and total_plays > 1000:
            print(f"{current_user}\t{total_plays}")
        current_user = user_id
        total_plays = 0

    total_plays += int(plays)

if current_user and total_plays > 1000:
    print(f"{current_user}\t{total_plays}")


Overwriting reducer.py


In [65]:
# Testing MapReduce Locally

In [88]:
%%bash
cat yandex_music/evants_sample.csv | python ./mapper.py | sort -k 1,1 -t $'\t' | python ./reducer.py > result.txt
cat result.txt | sort -k 2,2 -t $'\t' -n -r | head -n 10

12


In [91]:
import pandas as pd


df1 = pd.read_csv('result.txt', names = ['userId', 'values'], delimiter='\t')
df2 = pd.read_csv('yandex_music/evants_sample.csv')
merged_df = pd.merge(df1, df2, on='userId', how='inner')

# Save the merged DataFrame to a new CSV file
# merged_df.to_csv('merged_file.csv', index=False)

In [100]:
merged = merged_df[['userId', 'artistId']].drop_duplicates()

In [102]:
merged.to_csv('merged.csv', index=False)

In [None]:
#mapper2 and reduser2

In [121]:
%%file mapper2.py
import csv
import sys


frow = True

for line in sys.stdin:
    if frow:
        frow = False
        continue
    
    element = line.strip().split(',')
    print(element[1]+ "\t" + "1")

Overwriting mapper2.py


In [123]:
%%file reducer2.py
import sys

prev_key = None
count = 0
for line in sys.stdin:  # stream is sorted by key
    key, value  = line.split("\t")
    
    if prev_key is not None and key != prev_key:
        # new key in stream, dump previous
        print(prev_key + "\t" + str(count))
        count = 0
    
    count += int(value)
    prev_key = key

# dump last key
print(prev_key + "\t" + str(count))

Overwriting reducer2.py


In [105]:
#testing locally

In [124]:
%%bash
cat merged.csv | python ./mapper2.py | sort -k 1,1 -t $'\t' | python ./reducer2.py > result_singers.txt
cat result_singers.txt | sort -k 2,2 -t $'\t' -n -r | head -n 5

11368	12
63958	10
59783	10
3629	10
3495	10


In [None]:
# df1 = pd.read_csv('result_singers.txt', names = ['singerId', 'values'], delimiter='\t')
# df2 = pd.read_csv('artists.jsonl')
# merged_df = pd.merge(df1, df2, on='userId', how='inner')

In [159]:
%%file singers.py
import sys
import json
import re
import string

l_ids = [11368, 3629, 259, 44148, 23524]
for line in sys.stdin:
    text = json.loads(line)
    if text['artistId'] in l_ids:
        print(text['artistName'])



Overwriting singers.py


In [140]:
! head -n 10000 yandex_music/artists.jsonl > yandex_music/artist_sample.jsonl

In [141]:
%%bash
cat yandex_music/artist_sample.jsonl | python ./singers.py

Robin Schulz
Sia


In [142]:
# Run on a Hadoop cluster¶

In [143]:
! hadoop fs -rm -r /output

! mapred streaming \
  -input /yandex_music/events.csv \
  -output /output \
  -mapper "/opt/conda/bin/python3.6 mapper.py" \
  -reducer "/opt/conda/bin/python3.6 reducer.py" \
  -file mapper.py \
  -file reducer.py

rm: `/output': No such file or directory
2023-12-13 22:57:09,021 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.py, reducer.py] [/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.4.jar] /tmp/streamjob3282643317676855417.jar tmpDir=null
2023-12-13 22:57:10,202 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2023-12-13 22:57:10,515 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2023-12-13 22:57:11,364 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/jovyan/.staging/job_1702473973502_0001
2023-12-13 22:57:12,986 INFO mapred.FileInputFormat: Total input files to process : 1
2023-12-13 22:57:13,130 INFO mapreduce.JobSubmitter: number of splits:3
2023-12-13 22:57:13,497 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1702473973502_0001
2023-12-13 22:57:13,49

In [144]:
! hadoop fs -ls /output

Found 2 items
-rw-r--r--   1 jovyan supergroup          0 2023-12-13 22:58 /output/_SUCCESS
-rw-r--r--   1 jovyan supergroup      30537 2023-12-13 22:58 /output/part-00000


In [145]:
%%bash
hadoop fs -cat "/output/*" | sort -k 2,2 -t $'\t' -n -r | head -n 10

4745	6137
989	5443
4688	5022
4689	5003
2051	4847
2102	4784
4627	4682
1016	4657
4575	4646
2266	4641


In [146]:
%%bash
hadoop fs -cat "/output/*" | sort -k 2,2 -t $'\t' -n -r > result.txt

In [147]:
! head -n 10 result.txt

4745	6137
989	5443
4688	5022
4689	5003
2051	4847
2102	4784
4627	4682
1016	4657
4575	4646
2266	4641


In [148]:
! wc -l result.txt

3117 result.txt


In [149]:
import pandas as pd


df1 = pd.read_csv('result.txt', names = ['userId', 'values'], delimiter='\t')
df2 = pd.read_csv('yandex_music/events.csv')
merged_df = pd.merge(df1, df2, on='userId', how='inner')

# Save the merged DataFrame to a new CSV file
# merged_df.to_csv('merged_file.csv', index=False)

In [150]:
merged = merged_df[['userId', 'artistId']].drop_duplicates()
merged.to_csv('merged-2.csv', index=False)

In [151]:
! wc -l merged-2.csv

2560217 merged-2.csv


In [None]:
answer = 2560217

In [152]:
! hadoop fs -copyFromLocal merged-2.csv /yandex_music/merged-2.csv

In [153]:
! hadoop fs -ls -h /yandex_music

Found 4 items
-rw-r--r--   1 jovyan supergroup        254 2023-12-13 13:45 /yandex_music/README.txt
-rw-r--r--   1 jovyan supergroup      3.7 M 2023-12-13 13:45 /yandex_music/artists.jsonl
-rw-r--r--   1 jovyan supergroup     47.6 M 2023-12-13 13:45 /yandex_music/events.csv
-rw-r--r--   1 jovyan supergroup     25.8 M 2023-12-13 23:04 /yandex_music/merged-2.csv


In [154]:
! hadoop fs -rm -r /output-2

! mapred streaming \
  -input /yandex_music/merged-2.csv \
  -output /output-2 \
  -mapper "/opt/conda/bin/python3.6 mapper2.py" \
  -reducer "/opt/conda/bin/python3.6 reducer2.py" \
  -file mapper2.py \
  -file reducer2.py

rm: `/output-2': No such file or directory
2023-12-13 23:05:33,979 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper2.py, reducer2.py] [/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.4.jar] /tmp/streamjob5703292509531749194.jar tmpDir=null
2023-12-13 23:05:34,771 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2023-12-13 23:05:35,059 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2023-12-13 23:05:35,304 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/jovyan/.staging/job_1702473973502_0002
2023-12-13 23:05:35,636 INFO mapred.FileInputFormat: Total input files to process : 1
2023-12-13 23:05:35,656 INFO net.NetworkTopology: Adding a new node: /default-rack/127.0.0.1:9866
2023-12-13 23:05:36,542 INFO mapreduce.JobSubmitter: number of splits:2
2023-12-13 23:05:37,142

In [155]:
%%bash
hadoop fs -cat "/output-2/*" | sort -k 2,2 -t $'\t' -n -r > result-2.txt

In [156]:
! head -n 10 result-2.txt

11368	2574
3629	2286
259	2208
44148	2161
23524	2110
59783	2049
21042	1925
23595	1909
21643	1902
645	1876


In [162]:
%%bash
cat yandex_music/artists.jsonl | python ./singers.py

David Guetta
Sia
Imagine Dragons
Би-2
Queen


In [169]:
res = { "q1": 3117, "q2": [11368, 3629, 259, 44148, 23524]}
result = json.dumps(res)
print(result)

{"q1": 3117, "q2": [11368, 3629, 259, 44148, 23524]}


In [170]:
f = open("result.json", "w")
f.write(result)
f.close()