1. Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.
2. Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.
3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.
4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.
5. Develop a Python program that lists all the files and directories in a specific HDFS path.
6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.
7. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.
8. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.
9. Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.



In [None]:
def read_hadoop_config(config_file):
    with open(config_file, 'r') as file:
        for line in file:
            line = line.strip()
            if line.startswith("<name>core-site"):
                print(line)
            elif line.startswith("<name>hdfs-site"):
                print(line)

# Usage:
config_file = "hadoop.conf"
read_hadoop_config(config_file)


In [None]:
import subprocess

def get_directory_size(directory):
    command = f"hdfs dfs -du -s {directory}"
    output = subprocess.check_output(command, shell=True).decode("utf-8").strip()
    size = int(output.split()[0])
    return size

# Usage:
directory = "/user/hadoop/data"
total_size = get_directory_size(directory)
print(f"Total file size in {directory}: {total_size} bytes")


In [None]:
from collections import Counter

def find_top_words(file_path, n):
    words = []
    with open(file_path, 'r') as file:
        for line in file:
            words.extend(line.strip().split())
    
    word_counts = Counter(words)
    top_words = word_counts.most_common(n)
    
    for word, count in top_words:
        print(f"{word}: {count}")

# Usage:
file_path = "large_text_file.txt"
n = 10
find_top_words(file_path, n)


In [None]:
import requests

def check_hadoop_health(nn_url, dn_url):
    nn_status = requests.get(nn_url)
    dn_status = requests.get(dn_url)

    if nn_status.ok:
        print("NameNode is healthy.")
    else:
        print("NameNode is not healthy.")

    if dn_status.ok:
        print("DataNodes are healthy.")
    else:
        print("DataNodes are not healthy.")

# Usage:
nn_url = "http://namenode:50070/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"
dn_url = "http://datanode:50075/jmx?qry=Hadoop:service=DataNode,name=DataNodeStatus"
check_hadoop_health(nn_url, dn_url)


In [None]:
import subprocess

def list_hdfs_path(path):
    command = f"hdfs dfs -ls {path}"
    output = subprocess.check_output(command, shell=True).decode("utf-8").strip()
    files = output.split("\n")
    
    for file_info in files:
        print(file_info)

# Usage:
path = "/user/hadoop/data"
list_hdfs_path(path)


In [None]:
import requests

def analyze_data_node_storage(dn_url):
    response = requests.get(dn_url)
    data = response.json()

    storage_reports = data["beans"][0]["StorageReport"]
    storage_reports.sort(key=lambda x: x["capacity"], reverse=True)
    
    highest_capacity_node = storage_reports[0]
    lowest_capacity_node = storage_reports[-1]

    print(f"Highest Capacity Node: {highest_capacity_node['name']}")
    print(f"Capacity: {highest_capacity_node['capacity']}")
    
    print(f"\nLowest Capacity Node: {lowest_capacity_node['name']}")
    print(f"Capacity: {lowest_capacity_node['capacity']}")

# Usage:
dn_url = "http://datanode:50075/jmx?qry=Hadoop:service=DataNode,name=FSDatasetState-UndefinedStorageId"
analyze_data_node_storage(dn_url)


In [None]:
import requests

def submit_hadoop_job(job_url, job_params):
    response = requests.post(job_url, json=job_params)
    job_id = response.json()["jobId"]
    print(f"Submitted Hadoop job with ID: {job_id}")

    progress_url = f"{job_url}/{job_id}/progress"
    while True:
        progress = requests.get(progress_url).json()
        if progress["state"] == "SUCCEEDED":
            break
        elif progress["state"] == "FAILED":
            print("Job execution failed.")
            break
        else:
            print(f"Job progress: {progress['progress']}")
    
    if progress["state"] == "SUCCEEDED":
        output_url = f"{job_url}/{job_id}/output"
        response = requests.get(output_url)
        output = response.json()["output"]
        print(f"\nFinal output: {output}")

# Usage:
job_url = "http://resourcemanager:8088/ws/v1/cluster/apps"
job_params = {
    "name": "WordCountJob",
    "jar": "wordcount.jar",
    "input": "/user/hadoop/input",
    "output": "/user/hadoop/output"
}
submit_hadoop_job(job_url, job_params)


In [None]:
import requests

def submit_hadoop_job_with_resources(job_url, job_params, resources):
    response = requests.post(job_url, json=job_params)
    job_id = response.json()["jobId"]
    print(f"Submitted Hadoop job with ID: {job_id}")

    resource_url = f"{job_url}/{job_id}/resource"
    for resource in resources:
        requests.post(resource_url, json=resource)
    
    progress_url = f"{job_url}/{job_id}/progress"
    while True:
        progress = requests.get(progress_url).json()
        if progress["state"] == "SUCCEEDED":
            break
        elif progress["state"] == "FAILED":
            print("Job execution failed.")
            break
        else:
            print(f"Job progress: {progress['progress']}")
            
        resource_usage_url = f"{job_url}/{job_id}/resourceusage"
        resource_usage = requests.get(resource_usage_url).json()
        print(f"Resource usage: {resource_usage}")
    
    if progress["state"] == "SUCCEEDED":
        output_url = f"{job_url}/{job_id}/output"
        response = requests.get(output_url)
        output = response.json()["output"]
        print(f"\nFinal output: {output}")

# Usage:
job_url = "http://resourcemanager:8088/ws/v1/cluster/apps"
job_params = {
    "name": "WordCountJob",
    "jar": "wordcount.jar",
    "input": "/user/hadoop/input",
    "output": "/user/hadoop/output"
}
resources = [
    {"resource": "memory", "amount": "4096"},
    {"resource": "vcores", "amount": "2"}
]
submit_hadoop_job_with_resources(job_url, job_params, resources)


In [None]:
import subprocess

def run_mapreduce_job(input_file, split_size):
    command = f"hadoop jar mapreduce.jar JobName {input_file} output_dir -D mapreduce.input.fileinputformat.split.maxsize={split_size}"
    subprocess.run(command, shell=True)

# Usage:
input_file = "large_input_file.txt"
split_sizes = [64, 128, 256, 512]

for split_size in split_sizes:
    print(f"Running MapReduce job with split size: {split_size}")
    run_mapreduce_job(input_file, split_size)
