# Containerized Hadoop Cluster Setup

**Project**: Docker Hadoop  
**Technologies**: Docker, Hadoop, HDFS, YARN, MapReduce  
**Source**: [https://github.com/anarcoiris/docker-hadoop](https://github.com/anarcoiris/docker-hadoop)

---

## Executive Summary

Infrastructure-as-Code deployment of a multi-node Hadoop cluster using Docker Compose, demonstrating distributed system orchestration.

---


In [None]:
import sys
from pathlib import Path

# Try to add docker-hadoop to path (repository code available for reference only)
try:
    repo_path = Path('docker-hadoop').resolve()
    if repo_path.exists():
        sys.path.insert(0, str(repo_path))
        print("✓ Repository code loaded")
    else:
        print("ℹ Note: Repository code not found. Using standalone demo implementations.")
except Exception as e:
    print(f"ℹ Note: Repository import skipped - using demo code ({e})")

import pandas as pd
import matplotlib.pyplot as plt

print("✓ Hadoop architecture analysis ready")
print("\n📝 Execution Note:")
print("   This notebook demonstrates distributed systems architecture.")
print("   Full production code available at: https://github.com/anarcoiris/docker-hadoop")

## 2. Docker Compose Configuration

### docker-compose.yml Structure

```yaml
version: '3'
services:
  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
    environment:
      - CLUSTER_NAME=hadoop-cluster
    ports:
      - "9870:9870"  # Web UI
      - "9000:9000"  # RPC
  
  datanode1:
    image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
    environment:
      - SERVICE_PRECONDITION="namenode:9870"
  
  resourcemanager:
    image: bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8
    ports:
      - "8088:8088"  # Web UI
```

### Key Configuration
- **Volumes**: Persistent storage for HDFS data
- **Networks**: Isolated Docker network for inter-node communication
- **Health Checks**: Automatic restart on failure
- **Resource Limits**: Memory and CPU constraints

In [None]:
# Cluster configuration parser (simplified)
cluster_config = {
    "namenode": {
        "memory": "2GB",
        "cpu": "1.0",
        "ports": [9870, 9000]
    },
    "datanodes": [
        {"id": 1, "memory": "4GB", "cpu": "2.0", "storage": "50GB"},
        {"id": 2, "memory": "4GB", "cpu": "2.0", "storage": "50GB"},
        {"id": 3, "memory": "4GB", "cpu": "2.0", "storage": "50GB"},
    ],
    "resourcemanager": {
        "memory": "2GB",
        "cpu": "1.0",
        "ports": [8088]
    },
    "replication_factor": 3
}

print("Hadoop Cluster Configuration:")
print(f"  NameNode: {cluster_config['namenode']['memory']} RAM")
print(f"  DataNodes: {len(cluster_config['datanodes'])} nodes")
print(f"  Total storage: {sum(int(dn['storage'][:-2]) for dn in cluster_config['datanodes'])}GB raw")
print(f"  Replication factor: {cluster_config['replication_factor']}x")
effective_storage = sum(int(dn['storage'][:-2]) for dn in cluster_config['datanodes']) / cluster_config['replication_factor']
print(f"  Effective storage: {effective_storage:.0f}GB (after replication)")

## 3. HDFS Operations

### Common Commands

```bash
# Create directory
hdfs dfs -mkdir -p /user/data

# Upload file
hdfs dfs -put local_file.csv /user/data/

# List files
hdfs dfs -ls /user/data

# Check file replication
hdfs fsck /user/data/file.csv -files -blocks -locations

# Download file
hdfs dfs -get /user/data/file.csv ./local_download.csv
```

### Block Management
- Default block size: 128MB
- Replication: 3 copies across nodes
- Rack awareness: Distribute across racks for fault tolerance

In [None]:
# HDFS block distribution simulator
def simulate_hdfs_blocks(file_size_mb, block_size_mb=128, replication=3):
    """
    Simulate how HDFS distributes file blocks.
    """
    num_blocks = (file_size_mb + block_size_mb - 1) // block_size_mb
    total_storage = num_blocks * block_size_mb * replication
    
    return {
        'file_size_mb': file_size_mb,
        'num_blocks': num_blocks,
        'block_size_mb': block_size_mb,
        'replication_factor': replication,
        'total_storage_mb': total_storage,
        'storage_overhead': (total_storage / file_size_mb) - 1
    }

# Examples
file_sizes = [100, 500, 1000, 5000]
print("HDFS Block Distribution Analysis:")
print(f"\n{'File Size':<12} {'Blocks':<8} {'Total Storage':<15} {'Overhead'}")
print("-" * 50)

for size in file_sizes:
    result = simulate_hdfs_blocks(size)
    print(f"{size:>4d} MB      {result['num_blocks']:>3d}      "
          f"{result['total_storage_mb']:>6d} MB       "
          f"{result['storage_overhead']*100:>5.1f}%")

## 4. MapReduce Job Example

### Word Count (Classic Example)

**Map Phase**:
```python
def mapper(line):
    for word in line.split():
        emit(word, 1)
```

**Reduce Phase**:
```python
def reducer(word, counts):
    emit(word, sum(counts))
```

### Execution Flow
1. **Input Split**: Divide data into chunks
2. **Map**: Process each chunk independently
3. **Shuffle**: Group by key
4. **Reduce**: Aggregate results
5. **Output**: Write to HDFS

In [None]:
# MapReduce simulator (Python)
from collections import defaultdict

def map_phase(text_chunk):
    """Emit (word, 1) for each word"""
    results = []
    for word in text_chunk.split():
        results.append((word.lower().strip('.,!?'), 1))
    return results

def reduce_phase(word, counts):
    """Sum all counts for a word"""
    return (word, sum(counts))

# Demo
sample_text = """hadoop is powerful hadoop enables distributed processing
distributed systems scale horizontally hadoop and spark work together"""

# Map
map_output = map_phase(sample_text)
print("Map Output (sample):")
print(map_output[:10])

# Shuffle (group by key)
shuffle_output = defaultdict(list)
for word, count in map_output:
    shuffle_output[word].append(count)

# Reduce
final_output = [reduce_phase(word, counts) for word, counts in shuffle_output.items()]
final_output_sorted = sorted(final_output, key=lambda x: x[1], reverse=True)

print("\nWord Count Results:")
for word, count in final_output_sorted:
    print(f"  {word:<15} {count:>2d}")

## 5. Performance & Scalability

### Scaling Strategy
- **Horizontal Scaling**: Add DataNodes for storage
- **Resource Management**: YARN allocates CPU/memory
- **Data Locality**: Process data where it's stored

### Monitoring
- **NameNode Web UI**: http://localhost:9870
- **ResourceManager UI**: http://localhost:8088
- **Metrics**: Block distribution, job history, node health

In [None]:
# Performance estimation
def estimate_processing_time(data_size_gb, num_nodes, throughput_mb_per_sec=100):
    """
    Estimate MapReduce job time based on cluster size.
    """
    data_size_mb = data_size_gb * 1024
    parallel_throughput = throughput_mb_per_sec * num_nodes
    time_seconds = data_size_mb / parallel_throughput
    return time_seconds

# Scaling analysis
data_size = 100  # GB
node_counts = [1, 3, 5, 10]

print("Scaling Analysis (100GB dataset):")
print(f"\n{'Nodes':<8} {'Est. Time':<12} {'Speedup'}")
print("-" * 30)

baseline_time = None
for nodes in node_counts:
    time = estimate_processing_time(data_size, nodes)
    if baseline_time is None:
        baseline_time = time
        speedup = 1.0
    else:
        speedup = baseline_time / time
    
    minutes = int(time // 60)
    seconds = int(time % 60)
    print(f"{nodes:<8d} {minutes}m {seconds}s      {speedup:.1f}x")

---

## Summary

### Technical Achievements
✅ Docker Compose orchestration for multi-node cluster  
✅ HDFS distributed storage with replication  
✅ YARN resource management  
✅ MapReduce job execution  
✅ Horizontal scalability  
✅ Infrastructure-as-Code approach  

### Skills
**DevOps**: Docker, container orchestration, infrastructure automation  
**Distributed Systems**: HDFS, YARN, fault tolerance, data locality  
**Big Data**: MapReduce, batch processing, cluster management  
**Architecture**: System design, scaling strategies

## References
- **Repository**: https://github.com/anarcoiris/docker-hadoop
- **Technologies**: Docker, Hadoop, HDFS, YARN, MapReduce
- **Integration**: Works with PySpark for advanced analytics
