# Setup

In [23]:
HADOOP_START_FROM_SCRATCH = True
DOCKER_INTERNAL_HOST = "host.docker.internal"
DOCKER_DNS = ["10.15.20.1"]

HADOOP_NAMENODE_HOSTNAME = "namenode.mavasbel.vpn.itam.mx"
HADOOP_NAMENODE_IP = "10.15.20.2"
HADOOP_NAMENODE_PORT = 8020
HADOOP_NAMENODE_WEBUI_PORT = 9870

HADOOP_RESOURCEMANAGER_HOSTNAME = "resourcemanager.mavasbel.vpn.itam.mx"
HADOOP_RESOURCEMANAGER_IP = "10.15.20.2"
HADOOP_RESOURCEMANAGER_WEBUI_PORT = 8088
HADOOP_RESOURCEMANAGER_RPC_APP_MANAGER_PORT = 8032
HADOOP_RESOURCEMANAGER_TRACKER_PORT = 8031
HADOOP_RESOURCEMANAGER_SCHEDULER_PORT = 8030
HADOOP_RESOURCEMANAGER_ADMIN_PORT = 8033

HADOOP_REPLICATION = 3
HADOOP_NUM_WORKERS = 3

HADOOP_DATANODE_IPS = ["10.15.20.2"] * 3
HADOOP_DATANODE_NAMES = [f"datanode-{i+1}" for i in range(HADOOP_NUM_WORKERS)]
HADOOP_DATANODE_HOSTNAMES = [
    f"{HADOOP_DATANODE_NAMES[i]}.mavasbel.vpn.itam.mx"
    for i in range(HADOOP_NUM_WORKERS)
]
HADOOP_DATANODE_WEBUI_PORTS = [9864 + (i * 10) for i in range(HADOOP_NUM_WORKERS)]
HADOOP_DATANODE_TRANSFER_PORTS = [9866 + (i * 10) for i in range(HADOOP_NUM_WORKERS)]
HADOOP_DATANODE_IPC_PORTS = [6867 + (i * 10) for i in range(HADOOP_NUM_WORKERS)]

HADOOP_NODEMANAGER_IPS = ["10.15.20.2"] * 3
HADOOP_NODEMANAGER_NAMES = [f"nodemanager-{i+1}" for i in range(HADOOP_NUM_WORKERS)]
HADOOP_NODEMANAGER_HOSTNAMES = [
    f"{HADOOP_NODEMANAGER_NAMES[i]}.mavasbel.vpn.itam.mx"
    for i in range(HADOOP_NUM_WORKERS)
]
HADOOP_NODEMANAGER_WEBUI_PORTS = [8050 + (i * 10) for i in range(HADOOP_NUM_WORKERS)]
HADOOP_NODEMANAGER_RPC_PORTS = [8051 + (i * 10) for i in range(HADOOP_NUM_WORKERS)]

HADOOP_WORKDIR = "/opt/hadoop/work-dir"
HADOOP_NAMENODE_NAMEDIR = "/opt/hadoop/dfs/name"
HADOOP_DATANODE_DATADIR = "/opt/hadoop/dfs/data"

HADOOP_HDFS_DATADIR = "/opt/hadoop/work-dir"

In [24]:
import os
from pathlib import Path

LOCALHOST_WORKDIR = f"{os.path.join(os.path.abspath(Path.cwd()))}"
DOCKER_MOUNTDIR = os.path.join(LOCALHOST_WORKDIR, "mount")

Path(DOCKER_MOUNTDIR).mkdir(parents=True, exist_ok=True)

In [25]:
# import os
# import csv
# import random
# from faker import Faker
# from tqdm import tqdm

# fake = Faker()

# if HADOOP_START_FROM_SCRATCH:

#     def generate_data(records=100000):
#         print(f"Generating {records} records...")
#         with open(
#             os.path.join(LOCALHOST_WORKDIR, "data.csv"), "w", newline=""
#         ) as data_file:
#             writer = csv.writer(data_file)
#             writer.writerow(["ts", "id", "user", "amount", "category", "country"])
#             for _ in tqdm(range(records), desc="Progress", unit="rows"):
#                 writer.writerow(
#                     [
#                         fake.date_time_this_year().strftime("%Y-%m-%d %H:%M:%S"),
#                         fake.uuid4(),
#                         fake.name(),
#                         round(random.uniform(10.50, 10000.00), 2),
#                         fake.bs(),
#                         fake.country(),
#                     ]
#                 )

#     generate_data(records=2000000)
#     print("\nFile 'data.csv' created successfully.")

In [26]:
import shutil

dataset_source_path = os.path.join(LOCALHOST_WORKDIR, "data.csv")
dataset_dest_path = os.path.join(
    DOCKER_MOUNTDIR, "namenode", "work-dir", "data.csv"
)
if HADOOP_START_FROM_SCRATCH or not os.path.exists(dataset_dest_path):
    shutil.copy(dataset_source_path, dataset_dest_path)

### Create HDFS input directory and clear previous output

In [27]:
!docker exec namenode hdfs dfs -mkdir -p {HADOOP_HDFS_DATADIR}/input
!docker exec namenode hdfs dfs -rm -r -f {HADOOP_HDFS_DATADIR}/output
print("HDFS environment initialized.")

HDFS environment initialized.


### Upload from the container's mount point to HDFS

In [28]:
!docker exec namenode hdfs dfs -put -f {HADOOP_WORKDIR}/data.csv {HADOOP_HDFS_DATADIR}/input/
!docker exec namenode hdfs dfs -ls {HADOOP_HDFS_DATADIR}/input

Found 1 items
-rw-r--r--   3 hadoop supergroup  244806160 2026-01-10 06:39 /opt/hadoop/work-dir/input/data.csv


### Check block locations and replication across datanodes

In [29]:
!docker exec namenode hdfs fsck {HADOOP_HDFS_DATADIR}/input/data.csv -files -blocks -locations

FSCK started by hadoop (auth:SIMPLE) from /172.19.0.1 for path /opt/hadoop/work-dir/input/data.csv at Sat Jan 10 06:39:32 UTC 2026

/opt/hadoop/work-dir/input/data.csv 244806160 bytes, replicated: replication=3, 2 block(s):  OK
0. BP-1575351362-172.19.0.2-1768027110476:blk_1073741825_1001 len=134217728 Live_repl=3  [DatanodeInfoWithStorage[172.19.0.4:9866,DS-adaba191-1208-48b9-ae2c-794bf5304835,DISK], DatanodeInfoWithStorage[172.19.0.5:9886,DS-d6b7cda5-3d04-4146-b8a3-5227b99ef665,DISK], DatanodeInfoWithStorage[172.19.0.3:9876,DS-9fa38249-ef6b-496e-8401-e365932ad189,DISK]]
1. BP-1575351362-172.19.0.2-1768027110476:blk_1073741826_1002 len=110588432 Live_repl=3  [DatanodeInfoWithStorage[172.19.0.5:9886,DS-d6b7cda5-3d04-4146-b8a3-5227b99ef665,DISK], DatanodeInfoWithStorage[172.19.0.3:9876,DS-9fa38249-ef6b-496e-8401-e365932ad189,DISK], DatanodeInfoWithStorage[172.19.0.4:9866,DS-adaba191-1208-48b9-ae2c-794bf5304835,DISK]]


Status: HEALTHY
 Number of data-nodes:	3
 Number of racks:		1
 Total

Connecting to namenode via http://10.15.20.2:9870/fsck?ugi=hadoop&files=1&blocks=1&locations=1&path=%2Fopt%2Fhadoop%2Fwork-dir%2Finput%2Fdata.csv


### Generate mapper and reducer scripts

In [30]:
import os

mapper_file_contents = """#!/usr/bin/env python
import sys

# Standard for Hadoop Streaming: read from STDIN
for line in sys.stdin:
    line = line.strip()
    # Split the CSV line
    parts = line.split(',')
    
    # Check if we have enough columns and skip the header
    if len(parts) >= 4 and parts[0] != "ts":
        category = parts[4]
        amount = parts[3]
        
        # Output: category [TAB] amount
        # Hadoop will sort these by the key (category) before the Reducer sees them
        print "%s\\t%s" % (category, amount)
"""

with open(os.path.join(DOCKER_MOUNTDIR,"resourcemanager", "work-dir",'mapper.py'), 'w') as mapper_file:
    mapper_file.write(mapper_file_contents)
print("Mapper script created")


reducer_file_contents = """#!/usr/bin/env python
import sys

current_category = None
current_sum = 0.0

for line in sys.stdin:
    line = line.strip()
    if not line:
        continue
        
    try:
        category, amount = line.split('\\t')
        amount = float(amount)
    except ValueError:
        continue

    # Logic: If the category changes, print the total for the previous one
    if current_category == category:
        current_sum += amount
    else:
        if current_category:
            print "%s\\t%.2f" % (current_category, current_sum)
        current_category = category
        current_sum = amount

# Don't forget the last category!
if current_category:
    print "%s\\t%.2f" % (current_category, current_sum)
"""

with open(os.path.join(DOCKER_MOUNTDIR,"resourcemanager", "work-dir",'reducer.py'), 'w') as reducer_file:
    reducer_file.write(reducer_file_contents)
print("Reducer script created")

!docker exec resourcemanager ls -l {HADOOP_WORKDIR}

Mapper script created
Reducer script created
total 5
-rwxrwxrwx 1 root root 527 Jan 10 06:39 mapper.py
-rwxrwxrwx 1 root root 733 Jan 10 06:39 reducer.py


### Count directly in namenode for validation

In [31]:
shutil.copy(
    os.path.join(DOCKER_MOUNTDIR, "resourcemanager", "work-dir", "mapper.py"),
    os.path.join(DOCKER_MOUNTDIR, "namenode", "work-dir", "mapper.py"),
)
shutil.copy(
    os.path.join(
        DOCKER_MOUNTDIR, "resourcemanager", "work-dir", "reducer.py"
    ),
    os.path.join(DOCKER_MOUNTDIR, "namenode", "work-dir", "reducer.py"),
)
!docker exec namenode bash -c "cat {HADOOP_WORKDIR}/data.csv | python {HADOOP_WORKDIR}/mapper.py | sort | python {HADOOP_WORKDIR}/reducer.py"

aggregate 24/365 ROI	55203.26
aggregate 24/365 action-items	47835.03
aggregate 24/365 applications	66141.85
aggregate 24/365 architectures	25810.76
aggregate 24/365 bandwidth	34889.38
aggregate 24/365 channels	76876.38
aggregate 24/365 communities	69184.74
aggregate 24/365 content	65201.65
aggregate 24/365 convergence	52115.42
aggregate 24/365 deliverables	61094.78
aggregate 24/365 e-business	37171.19
aggregate 24/365 e-commerce	59049.71
aggregate 24/365 e-markets	60727.07
aggregate 24/365 e-services	76176.81
aggregate 24/365 e-tailers	29795.80
aggregate 24/365 experiences	33032.57
aggregate 24/365 eyeballs	102892.45
aggregate 24/365 functionalities	40739.93
aggregate 24/365 info-mediaries	99122.62
aggregate 24/365 infrastructures	53643.53
aggregate 24/365 initiatives	49705.78
aggregate 24/365 interfaces	78742.74
aggregate 24/365 markets	54573.95
aggregate 24/365 methodologies	15548.80
aggregate 24/365 metrics	57916.54
aggregate 24/365 mindshare	32958.49
aggregate 24/365 models	67352.6

# Hadoop map reduce

In [32]:
# 1. Ensure the output directory is clean
!docker exec namenode hdfs dfs -rm -r -f {HADOOP_HDFS_DATADIR}/output

# 2. Submit the job from the ResourceManager to Nodemanagers
!docker exec resourcemanager yarn jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.4.2.jar \
    -D mapred.reduce.tasks=2 \
    -D mapreduce.map.memory.mb=1024 \
    -D mapreduce.reduce.memory.mb=1024 \
    -files {HADOOP_WORKDIR}/mapper.py,{HADOOP_WORKDIR}/reducer.py \
    -mapper "python mapper.py" \
    -reducer "python reducer.py" \
    -input {HADOOP_HDFS_DATADIR}/input/data.csv \
    -output {HADOOP_HDFS_DATADIR}/output

# 3. Show output file
!docker exec namenode hdfs dfs -ls {HADOOP_HDFS_DATADIR}/output

packageJobJar: [/tmp/hadoop-unjar7699853238621970470/] [] /tmp/streamjob7500863092519910609.jar tmpDir=null
2026-01-10 06:39:48 INFO  DefaultNoHARMFailoverProxyProvider:64 - Connecting to ResourceManager at /10.15.20.2:8032
2026-01-10 06:39:48 INFO  DefaultNoHARMFailoverProxyProvider:64 - Connecting to ResourceManager at /10.15.20.2:8032
2026-01-10 06:39:49 INFO  JobResourceUploader:907 - Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/hadoop/.staging/job_1768027115623_0001
2026-01-10 06:39:50 INFO  FileInputFormat:267 - Total input files to process : 1
2026-01-10 06:39:50 INFO  NetworkTopology:156 - Adding a new node: /default-rack/172.19.0.5:9886
2026-01-10 06:39:50 INFO  NetworkTopology:156 - Adding a new node: /default-rack/172.19.0.3:9876
2026-01-10 06:39:50 INFO  NetworkTopology:156 - Adding a new node: /default-rack/172.19.0.4:9866
2026-01-10 06:39:50 INFO  JobSubmitter:203 - number of splits:2
2026-01-10 06:39:50 INFO  deprecation:1462 - mapred.reduce.tasks is depre

In [33]:
# 4. Merge and sort output
!docker exec namenode hdfs dfs -getmerge {HADOOP_HDFS_DATADIR}/output {HADOOP_WORKDIR}/output.csv
!docker exec namenode bash -c "cat {HADOOP_WORKDIR}/output.csv | sort > {HADOOP_WORKDIR}/output_sorted.csv"