In [1]:
# Hadoop Setup for Windows

This notebook demonstrates how to set up and work with Hadoop on Windows systems.

## Prerequisites for Windows
- Java 8 or 11 (OpenJDK or Oracle JDK)
- Hadoop binaries for Windows
- Python 3.x for PySpark integration

## Alternative Setup Methods
1. **Docker** (Recommended for Windows)
2. **WSL2 with Ubuntu**
3. **Native Windows installation**
4. **Cloud-based solutions** (AWS EMR, Azure HDInsight, Google Dataproc)

'apt-get' is not recognized as an internal or external command,
operable program or batch file.
'apt-get' is not recognized as an internal or external command,
operable program or batch file.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open 'hadoop-3.3.6.tar.gz'


In [None]:
# Check if Java is installed
import subprocess
import sys
import os

def check_java():
    try:
        result = subprocess.run(['java', '-version'], capture_output=True, text=True)
        if result.returncode == 0:
            print("Java is installed:")
            print(result.stderr)  # Java version info goes to stderr
            return True
        else:
            print("Java is not installed or not in PATH")
            return False
    except FileNotFoundError:
        print("Java is not installed or not in PATH")
        return False

def check_python():
    print(f"Python version: {sys.version}")
    
# Check system requirements
print("=== System Requirements Check ===")
check_java()
check_python()
print(f"Operating System: {os.name}")
print(f"Current working directory: {os.getcwd()}")

## Docker-based Hadoop Setup (Recommended)

The easiest way to run Hadoop on Windows is using Docker. This provides a complete Hadoop ecosystem without complex Windows-specific configurations.

In [None]:
# Docker commands for Hadoop setup (run these in PowerShell/Command Prompt)
docker_commands = """
# Pull Hadoop Docker image
docker pull apache/hadoop:3

# Run Hadoop container with port mapping
docker run -it -p 9870:9870 -p 8088:8088 -p 19888:19888 --name hadoop-container apache/hadoop:3

# Alternative: Use docker-compose for complete big data stack
# Create docker-compose.yml file with Hadoop, Spark, Hive, etc.
"""

print("Docker setup commands:")
print(docker_commands)

# Check if Docker is available
def check_docker():
    try:
        result = subprocess.run(['docker', '--version'], capture_output=True, text=True)
        if result.returncode == 0:
            print("Docker is available:")
            print(result.stdout)
            return True
        else:
            print("Docker is not available")
            return False
    except FileNotFoundError:
        print("Docker is not installed")
        return False

check_docker()

## Matrix Multiplication with Hadoop MapReduce

Let's implement matrix multiplication using Hadoop MapReduce. This example will work with any Hadoop setup (Docker, WSL2, or cloud).

In [None]:
# Matrix Multiplication MapReduce implementation in Python
# This can be used with Hadoop Streaming

# Create sample matrices
import numpy as np
import json

# Generate sample matrices
np.random.seed(42)
matrix_A = np.random.randint(1, 10, (3, 4))
matrix_B = np.random.randint(1, 10, (4, 3))

print("Matrix A (3x4):")
print(matrix_A)
print("\nMatrix B (4x3):")
print(matrix_B)
print("\nExpected result (NumPy):")
print(np.dot(matrix_A, matrix_B))

# Convert matrices to format suitable for MapReduce
def matrix_to_mapreduce_format(matrix, matrix_name):
    """Convert matrix to (key, value) format for MapReduce"""
    rows, cols = matrix.shape
    result = []
    for i in range(rows):
        for j in range(cols):
            # Format: matrix_name,row,col,value
            result.append(f"{matrix_name},{i},{j},{matrix[i,j]}")
    return result

# Prepare input data
matrix_a_data = matrix_to_mapreduce_format(matrix_A, "A")
matrix_b_data = matrix_to_mapreduce_format(matrix_B, "B")

print("\nMatrix A in MapReduce format:")
for line in matrix_a_data[:5]:  # Show first 5 lines
    print(line)
    
print("\nMatrix B in MapReduce format:")
for line in matrix_b_data[:5]:  # Show first 5 lines
    print(line)

In [None]:
# Write the mapper and reducer scripts to files
# These can be used with Hadoop Streaming

mapper_code = '''#!/usr/bin/env python3
import sys

def mapper():
    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue
            
        parts = line.split(',')
        if len(parts) != 4:
            continue
            
        matrix_name, row, col, value = parts
        row, col, value = int(row), int(col), float(value)
        
        if matrix_name == 'A':
            # For matrix A, emit (row, k) for all possible k values
            # We need to know the dimensions of matrix B
            # Assuming matrix B has columns 0, 1, 2 (can be parameterized)
            for k in range(3):  # B has 3 columns
                print(f"{row},{k}\\tA,{col},{value}")
        elif matrix_name == 'B':
            # For matrix B, emit (i, col) for all possible i values
            # Assuming matrix A has rows 0, 1, 2 (can be parameterized)
            for i in range(3):  # A has 3 rows
                print(f"{i},{col}\\tB,{row},{value}")

if __name__ == "__main__":
    mapper()
'''

reducer_code = '''#!/usr/bin/env python3
import sys
from collections import defaultdict

def reducer():
    current_key = None
    a_values = {}  # col -> value
    b_values = {}  # row -> value
    
    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue
            
        try:
            key, value = line.split('\\t')
            matrix_info, pos, val = value.split(',')
            pos, val = int(pos), float(val)
            
            if key != current_key:
                # Process previous key
                if current_key is not None:
                    result = 0
                    for col in a_values:
                        if col in b_values:
                            result += a_values[col] * b_values[col]
                    if result != 0:  # Only output non-zero results
                        print(f"{current_key}\\t{result}")
                
                # Reset for new key
                current_key = key
                a_values = {}
                b_values = {}
            
            # Store values
            if matrix_info == 'A':
                a_values[pos] = val
            elif matrix_info == 'B':
                b_values[pos] = val
                
        except ValueError:
            continue
    
    # Process last key
    if current_key is not None:
        result = 0
        for col in a_values:
            if col in b_values:
                result += a_values[col] * b_values[col]
        if result != 0:
            print(f"{current_key}\\t{result}")

if __name__ == "__main__":
    reducer()
'''

# Write mapper and reducer to files
with open('matrix_mapper.py', 'w') as f:
    f.write(mapper_code)
    
with open('matrix_reducer.py', 'w') as f:
    f.write(reducer_code)

print("Created mapper and reducer files:")
print("- matrix_mapper.py")
print("- matrix_reducer.py")

In [None]:
# Create input data file for Hadoop
input_data = matrix_a_data + matrix_b_data

with open('matrix_input.txt', 'w') as f:
    for line in input_data:
        f.write(line + '\\n')

print(f"Created input file 'matrix_input.txt' with {len(input_data)} lines")

# Test the MapReduce logic locally (simulation)
def simulate_mapreduce():
    print("\\n=== Simulating MapReduce locally ===")
    
    # Simulate mapper output
    mapper_output = []
    for line in input_data:
        parts = line.split(',')
        if len(parts) != 4:
            continue
            
        matrix_name, row, col, value = parts
        row, col, value = int(row), int(col), float(value)
        
        if matrix_name == 'A':
            for k in range(3):  # B has 3 columns
                mapper_output.append(f"{row},{k}\\tA,{col},{value}")
        elif matrix_name == 'B':
            for i in range(3):  # A has 3 rows
                mapper_output.append(f"{i},{col}\\tB,{row},{value}")
    
    # Sort mapper output by key (simulating shuffle phase)
    mapper_output.sort()
    
    print("Sample mapper output:")
    for line in mapper_output[:10]:
        print(line)
    
    # Simulate reducer
    from collections import defaultdict
    
    grouped = defaultdict(list)
    for line in mapper_output:
        key, value = line.split('\\t')
        grouped[key].append(value)
    
    print("\\nReducer results:")
    result_matrix = {}
    
    for key, values in grouped.items():
        a_values = {}
        b_values = {}
        
        for value in values:
            matrix_info, pos, val = value.split(',')
            pos, val = int(pos), float(val)
            
            if matrix_info == 'A':
                a_values[pos] = val
            elif matrix_info == 'B':
                b_values[pos] = val
        
        result = 0
        for col in a_values:
            if col in b_values:
                result += a_values[col] * b_values[col]
        
        if result != 0:
            print(f"{key}\\t{result}")
            i, j = map(int, key.split(','))
            result_matrix[(i, j)] = result
    
    # Reconstruct result matrix
    print("\\nResult matrix:")
    for i in range(3):
        row = []
        for j in range(3):
            row.append(result_matrix.get((i, j), 0))
        print(row)

simulate_mapreduce()

## Running on Hadoop

To run this on actual Hadoop, you would use the following commands in your Hadoop environment:

In [None]:
# Hadoop commands to run matrix multiplication
hadoop_commands = """
# Step 1: Copy input data to HDFS
hdfs dfs -mkdir -p /user/input
hdfs dfs -put matrix_input.txt /user/input/

# Step 2: Run Hadoop Streaming job
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \\
    -files matrix_mapper.py,matrix_reducer.py \\
    -mapper matrix_mapper.py \\
    -reducer matrix_reducer.py \\
    -input /user/input/matrix_input.txt \\
    -output /user/output/matrix_result

# Step 3: View results
hdfs dfs -cat /user/output/matrix_result/part-00000

# Step 4: Copy results back to local filesystem
hdfs dfs -get /user/output/matrix_result/part-00000 result.txt

# Clean up for re-run
hdfs dfs -rm -r /user/output/matrix_result
"""

print("Hadoop execution commands:")
print(hadoop_commands)

# Also create a shell script for easy execution
with open('run_matrix_mapreduce.sh', 'w') as f:
    f.write('#!/bin/bash\\n')
    f.write(hadoop_commands)

print("\\nCreated 'run_matrix_mapreduce.sh' script")