In [1]:
import sys
sys.path.append("..")
import spark_utils
spark_utils.print_ui_links()

NameNode: http://ec2-3-235-150-104.compute-1.amazonaws.com:50070
YARN: http://ec2-3-235-150-104.compute-1.amazonaws.com:8088
Spark UI: http://ec2-3-235-150-104.compute-1.amazonaws.com:20888/proxy/application_1620307932889_0002


# YARN config

https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-common/yarn-default.xml

In [7]:
! cat /etc/hadoop/conf/yarn-site.xml | egrep -A 1 "mb|cpu-vcores"

    <name>yarn.scheduler.increment-allocation-mb</name>
    <value>32</value>
--
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>8</value>
--
    <name>yarn.scheduler.maximum-allocation-mb</name>
    <value>57344</value>
--
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>57344</value>
--
    <name>yarn.scheduler.minimum-allocation-mb</name>
    <value>32</value>


# MapReduce config

https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml

In [9]:
! cat /etc/hadoop/conf/mapred-site.xml | egrep -A 1 "mb"

    <name>mapreduce.task.io.sort.mb</name>
    <value>200</value>
--
    <name>mapreduce.map.memory.mb</name>
    <value>7168</value>
--
    <name>mapreduce.reduce.memory.mb</name>
    <value>14336</value>
--
    <name>yarn.app.mapreduce.am.resource.mb</name>
    <value>14336</value>


# Wikipedia dataset

In [10]:
# download from s3 to local disk (you can also read them directly with Hadoop though)
! aws s3 cp s3://ydatazian/week1 . --recursive

download: s3://ydatazian/week1/categories.jsonl to ./categories.jsonl
download: s3://ydatazian/week1/wiki.jsonl to ./wiki.jsonl           


In [11]:
! ls -lh .

total 205M
-rw-rw-r-- 1 hadoop hadoop  61M May  6 21:11 categories.jsonl
-rw-rw-r-- 1 hadoop hadoop  149 May  6 21:54 generate.py
-rw-rw-r-- 1 hadoop hadoop  27K May  6 21:55 hdfs-basics.ipynb
-rw-rw-r-- 1 hadoop hadoop  24K May  6 22:01 mapreduce-wordcount.ipynb
-rw-r--r-- 1 hadoop hadoop   10 May  6 21:18 test2.txt
-rw-rw-r-- 1 hadoop hadoop   10 May  6 21:53 test.txt
-rw-rw-r-- 1 hadoop hadoop 144M May  6 21:11 wiki.jsonl


In [12]:
! head -n 1 wiki.jsonl

{"title": "April", "text": "April\n\nApril is the fourth month of the year, and comes between March and May. It is one of four months to have 30 days.\n\nApril always begins on the same day of week as July, and additionally, January in leap years. April always ends on the same day of the week as December.\n\nApril's flowers are the Sweet Pea and Daisy. Its birthstone is the diamond. The meaning of the diamond is innocence.\n\nApril comes between March and May, making it the fourth month of the year. It also comes first in the year out of the four months that have 30 days, as June, September and November are later in the year.\n\nApril begins on the same day of the week as July every year and on the same day of the week as January in leap years. April ends on the same day of the week as December every year, as each other's last days are exactly 35 weeks (245 days) apart.\n\nIn common years, April starts on the same day of the week as October of the previous year, and in leap years, May 

In [13]:
# make a sample
! head -n 1000 wiki.jsonl > sample.jsonl

# WordCount mapper

https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html

In [14]:
%%file mapper.py
import sys
import json
import re
import string

for line in sys.stdin:
    text = json.loads(line)['text']
    text = re.sub(f'[^{re.escape(string.printable)}]', ' ', text)  # not printable to space
    words = text.lower().split()
    for word in words:
        print(word + "\t" + "1")

Writing mapper.py


# Test mapper

In [17]:
! echo "{\"text\": \"This  is  text\"}" | python3 ./mapper.py

this	1
is	1
text	1


# WordCount reducer

https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html

In [22]:
# reducer input
! python3 -c "print('\n'.join(['aaa\t1'] * 10 + ['bbb\t1'] * 5))"

aaa	1
aaa	1
aaa	1
aaa	1
aaa	1
aaa	1
aaa	1
aaa	1
aaa	1
aaa	1
bbb	1
bbb	1
bbb	1
bbb	1
bbb	1


In [19]:
%%file reducer.py
import sys

prev_key = None
count = 0
for line in sys.stdin:  # stream is sorted by key
    key, value = line.split("\t")
    
    if prev_key is not None and key != prev_key:
        # new key in stream, dump previous
        print(prev_key + "\t" + str(count))
        count = 0
    
    count += int(value)
    prev_key = key

# dump last key
print(prev_key + "\t" + str(count))

Writing reducer.py


# Test reducer

In [23]:
! python3 -c "print('\n'.join(['aaa\t1'] * 10 + ['bbb\t1'] * 5))" | python3 ./reducer.py

aaa	10
bbb	5


# Test MapReduce locally

In [24]:
%%bash
cat sample.jsonl | python3 ./mapper.py | sort -k 1,1 -t $'\t' | python3 ./reducer.py > result.txt
cat result.txt | sort -k 2,2 -t $'\t' -n -r | head -n 10

the	34538
of	15944
and	12966
in	12448
a	11322
is	10870
to	9469
are	5598
it	4330
that	4057


# Copy files to HDFS

In [27]:
! hadoop fs -copyFromLocal *.jsonl /

In [28]:
! hadoop fs -ls -h /

Found 10 items
-rw-r--r--   1 hadoop hadoop    128.0 M 2021-05-06 21:54 /128.txt
-rw-r--r--   1 hadoop hadoop    256.0 M 2021-05-06 21:54 /256.txt
-rw-r--r--   1 hadoop hadoop     60.9 M 2021-05-06 22:07 /categories.jsonl
-rw-r--r--   1 hadoop hadoop      2.9 M 2021-05-06 22:07 /sample.jsonl
-rw-r--r--   2 hadoop hadoop         10 2021-05-06 21:18 /test.txt
-rw-r--r--   1 hadoop hadoop          8 2021-05-06 21:19 /test3.txt
drwxrwxrwt   - hdfs   hadoop          0 2021-05-06 13:32 /tmp
drwxr-xr-x   - hdfs   hadoop          0 2021-05-06 13:32 /user
drwxr-xr-x   - hdfs   hadoop          0 2021-05-06 13:32 /var
-rw-r--r--   1 hadoop hadoop    143.4 M 2021-05-06 22:07 /wiki.jsonl


# Run on Hadoop

https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html

https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html#Packaging_Files_With_Job_Submissions

In [34]:
! which python3

/usr/bin/python3


In [44]:
! hadoop fs -rm -r /word-count

! hadoop jar /usr/lib/hadoop/hadoop-streaming.jar \
  -input /sample.jsonl \
  -output /word-count \
  -mapper "/usr/bin/python3 mapper.py" \
  -reducer "/usr/bin/python3 reducer.py" \
  -file mapper.py \
  -file reducer.py

rm: `/word-count': No such file or directory
21/05/06 22:12:10 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.py, reducer.py] [/usr/lib/hadoop/hadoop-streaming-2.8.5-amzn-5.jar] /tmp/streamjob1600431364475733951.jar tmpDir=null
21/05/06 22:12:11 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-13-255.ec2.internal/172.31.13.255:8032
21/05/06 22:12:11 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-13-255.ec2.internal/172.31.13.255:8032
21/05/06 22:12:12 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
21/05/06 22:12:12 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 5f788d5e8f90539ee331702c753fa250727128f4]
21/05/06 22:12:12 INFO mapred.FileInputFormat: Total input files to process : 1
21/05/06 22:12:12 INFO mapreduce.JobSubmitter: number of splits:16
21/05/06 22:12:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_16203079328

In [45]:
! hadoop fs -ls /word-count

Found 8 items
-rw-r--r--   1 hadoop hadoop          0 2021-05-06 22:12 /word-count/_SUCCESS
-rw-r--r--   1 hadoop hadoop      73746 2021-05-06 22:12 /word-count/part-00000
-rw-r--r--   1 hadoop hadoop      74194 2021-05-06 22:12 /word-count/part-00001
-rw-r--r--   1 hadoop hadoop      73981 2021-05-06 22:12 /word-count/part-00002
-rw-r--r--   1 hadoop hadoop      75232 2021-05-06 22:12 /word-count/part-00003
-rw-r--r--   1 hadoop hadoop      74032 2021-05-06 22:12 /word-count/part-00004
-rw-r--r--   1 hadoop hadoop      76532 2021-05-06 22:12 /word-count/part-00005
-rw-r--r--   1 hadoop hadoop      74322 2021-05-06 22:12 /word-count/part-00006


In [47]:
%%bash
hadoop fs -cat "/word-count/*" | sort -k 2,2 -t $'\t' -n -r | head -n 10

the	34538
of	15944
and	12966
in	12448
a	11322
is	10870
to	9469
are	5598
it	4330
that	4057


# Debugging mapper

In [48]:
%%file mapper.py
import sys
import json
import re
import string

for line in sys.stdin:
    text = json.loads(line)['text123']  # error: missing key!
    text = re.sub(f'[^{re.escape(string.printable)}]', ' ', text)  # not printable to space
    words = text.lower().split()
    for word in words:
        print(word + "\t" + "1")

Overwriting mapper.py


In [49]:
! hadoop fs -rm -r /word-count

! hadoop jar /usr/lib/hadoop/hadoop-streaming.jar \
  -input /sample.jsonl \
  -output /word-count \
  -mapper "/usr/bin/python3 mapper.py" \
  -reducer "/usr/bin/python3 reducer.py" \
  -file mapper.py \
  -file reducer.py

Deleted /word-count
21/05/06 22:14:14 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.py, reducer.py] [/usr/lib/hadoop/hadoop-streaming-2.8.5-amzn-5.jar] /tmp/streamjob3331255832138617194.jar tmpDir=null
21/05/06 22:14:14 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-13-255.ec2.internal/172.31.13.255:8032
21/05/06 22:14:15 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-13-255.ec2.internal/172.31.13.255:8032
21/05/06 22:14:15 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
21/05/06 22:14:15 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 5f788d5e8f90539ee331702c753fa250727128f4]
21/05/06 22:14:15 INFO mapred.FileInputFormat: Total input files to process : 1
21/05/06 22:14:15 INFO mapreduce.JobSubmitter: number of splits:16
21/05/06 22:14:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1620307932889_0005
21/05/06 22:14:15

# Debugging reducer

In [50]:
%%file mapper.py
import sys
import json
import re
import string

for line in sys.stdin:
    text = json.loads(line)['text']
    text = re.sub(f'[^{re.escape(string.printable)}]', ' ', text)  # not printable to space
    words = text.lower().split()
    for word in words:
        print(word + "\n" + "1")  # error: reducer will fail!

Overwriting mapper.py


In [51]:
! hadoop fs -rm -r /word-count

! hadoop jar /usr/lib/hadoop/hadoop-streaming.jar \
  -input /sample.jsonl \
  -output /word-count \
  -mapper "/usr/bin/python3 mapper.py" \
  -reducer "/usr/bin/python3 reducer.py" \
  -file mapper.py \
  -file reducer.py

Deleted /word-count
21/05/06 22:15:27 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.py, reducer.py] [/usr/lib/hadoop/hadoop-streaming-2.8.5-amzn-5.jar] /tmp/streamjob8786550264471346840.jar tmpDir=null
21/05/06 22:15:27 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-13-255.ec2.internal/172.31.13.255:8032
21/05/06 22:15:28 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-13-255.ec2.internal/172.31.13.255:8032
21/05/06 22:15:28 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
21/05/06 22:15:28 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 5f788d5e8f90539ee331702c753fa250727128f4]
21/05/06 22:15:28 INFO mapred.FileInputFormat: Total input files to process : 1
21/05/06 22:15:28 INFO mapreduce.JobSubmitter: number of splits:16
21/05/06 22:15:28 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1620307932889_0006
21/05/06 22:15:29

# Debugging memory usage

In [52]:
%%file mapper.py
import sys
import numpy as np

matrix = np.ones((int(3 * 1024 * 1024 * 1024),), dtype=np.uint8) # 3 gb
s = matrix.sum()

for line in sys.stdin:
    print(str(s))

Overwriting mapper.py


In [56]:
! hadoop fs -rm -r /test

! hadoop jar /usr/lib/hadoop/hadoop-streaming.jar \
  -Dmapreduce.map.memory.mb=4096 \
  -input /sample.jsonl \
  -output /test \
  -mapper "/usr/bin/python3 mapper.py" \
  -file mapper.py

Deleted /test
21/05/06 22:21:14 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.py] [/usr/lib/hadoop/hadoop-streaming-2.8.5-amzn-5.jar] /tmp/streamjob2629747957270035408.jar tmpDir=null
21/05/06 22:21:14 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-13-255.ec2.internal/172.31.13.255:8032
21/05/06 22:21:15 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-13-255.ec2.internal/172.31.13.255:8032
21/05/06 22:21:15 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
21/05/06 22:21:15 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 5f788d5e8f90539ee331702c753fa250727128f4]
21/05/06 22:21:15 INFO mapred.FileInputFormat: Total input files to process : 1
21/05/06 22:21:15 INFO mapreduce.JobSubmitter: number of splits:16
21/05/06 22:21:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1620307932889_0009
21/05/06 22:21:15 INFO impl.YarnCli

In [55]:
! hadoop fs -rm -r /test

! hadoop jar /usr/lib/hadoop/hadoop-streaming.jar \
  -Dmapreduce.map.memory.mb=2048 \
  -input /sample.jsonl \
  -output /test \
  -mapper "/usr/bin/python3 mapper.py" \
  -file mapper.py

Deleted /test
21/05/06 22:19:43 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [mapper.py] [/usr/lib/hadoop/hadoop-streaming-2.8.5-amzn-5.jar] /tmp/streamjob2142224779299513657.jar tmpDir=null
21/05/06 22:19:44 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-13-255.ec2.internal/172.31.13.255:8032
21/05/06 22:19:44 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-13-255.ec2.internal/172.31.13.255:8032
21/05/06 22:19:45 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
21/05/06 22:19:45 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 5f788d5e8f90539ee331702c753fa250727128f4]
21/05/06 22:19:45 INFO mapred.FileInputFormat: Total input files to process : 1
21/05/06 22:19:45 INFO mapreduce.JobSubmitter: number of splits:16
21/05/06 22:19:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1620307932889_0008
21/05/06 22:19:45 INFO impl.YarnCli

In [None]:
# kill application with:
# ! yarn application -kill application_*