1. Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.

In [None]:
def read_hadoop_config(file_path):
    core_components = []
    with open(file_path, 'r') as config_file:
        for line in config_file:
            if line.startswith("fs.defaultFS"):
                core_components.append("NameNode")
            elif line.startswith("yarn.resourcemanager.hostname"):
                core_components.append("ResourceManager")
            elif line.startswith("mapreduce.jobhistory.address"):
                core_components.append("JobHistoryServer")
    return core_components

# Example usage
config_file_path = "/path/to/hadoop/conf/hadoop-site.xml"
components = read_hadoop_config(config_file_path)
print("Core Components of Hadoop:")
for component in components:
    print(component)


2. Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.

In [None]:
import subprocess

def calculate_directory_size(directory_path):
    command = f"hadoop fs -du -s {directory_path}"
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    if result.returncode == 0:
        output = result.stdout.strip()
        size_in_bytes = int(output.split()[0])
        return size_in_bytes
    else:
        print("Error occurred while calculating directory size.")
        return None

# Example usage
hdfs_directory = "/user/data"
total_size = calculate_directory_size(hdfs_directory)
print(f"Total size of {hdfs_directory}: {total_size} bytes")


3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.

In [None]:
from collections import Counter

def process_large_text_file(file_path, top_n):
    word_counts = Counter()
    with open(file_path, 'r') as text_file:
        for line in text_file:
            words = line.strip().split()
            word_counts.update(words)
    top_words = word_counts.most_common(top_n)
    return top_words

# Example usage
text_file_path = "/path/to/large_text_file.txt"
top_n = 10
top_words = process_large_text_file(text_file_path, top_n)
print(f"Top {top_n} most frequent words:")
for word, count in top_words:
    print(f"{word}: {count}")


4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.

In [None]:
import requests

def check_hadoop_cluster_health():
    namenode_url = "http://namenode:50070/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"
    datanode_url = "http://datanode:50075/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo"

    response_namenode = requests.get(namenode_url)
    response_datanode = requests.get(datanode_url)

    if response_namenode.status_code == 200 and response_datanode.status_code == 200:
        namenode_status = response_namenode.json()['beans'][0]['State']
        datanode_status = response_datanode.json()['beans'][0]['State']
        print(f"NameNode status: {namenode_status}")
        print(f"DataNode status: {datanode_status}")
    else:
        print("Error occurred while checking Hadoop cluster health.")

# Example usage
check_hadoop_cluster_health()


5. Develop a Python program that lists all the files and directories in a specific HDFS path.

In [None]:
import subprocess

def list_hdfs_files_and_directories(path):
    command = f"hadoop fs -ls {path}"
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    if result.returncode == 0:
        output = result.stdout.strip().split('\n')
        file_paths = [line.split()[-1] for line in output[1:]]  # Ignore the first line (total count)
        return file_paths
    else:
        print("Error occurred while listing files and directories.")
        return None

# Example usage
hdfs_path = "/user/data"
file_paths = list_hdfs_files_and_directories(hdfs_path)
print(f"Files and directories in {hdfs_path}:")
for path in file_paths:
    print(path)


6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.

In [None]:
import requests

def analyze_datanode_storage_utilization():
    datanode_url = "http://datanode:50075/jmx?qry=Hadoop:service=DataNode,name=FSDatasetState"

    response_datanode = requests.get(datanode_url)

    if response_datanode.status_code == 200:
        datanode_stats = response_datanode.json()['beans'][0]
        storage_utilizations = {stats['name']: stats['used'] / stats['capacity'] for stats in datanode_stats['Storage']]
        sorted_utilizations = sorted(storage_utilizations.items(), key=lambda x: x[1])
        highest_utilization_node, highest_utilization = sorted_utilizations[-1]
        lowest_utilization_node, lowest_utilization = sorted_utilizations[0]
        print("DataNode Storage Utilization:")
        for node, utilization in sorted_utilizations:
            print(f"{node}: {utilization:.2%}")
        print(f"\nHighest Utilization: {highest_utilization_node} ({highest_utilization:.2%})")
        print(f"Lowest Utilization: {lowest_utilization_node} ({lowest_utilization:.2%})")
    else:
        print("Error occurred while analyzing DataNode storage utilization.")

# Example usage
analyze_datanode_storage_utilization()


7. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.

In [None]:
import requests
import time

def submit_hadoop_job(jar_path, input_path, output_path):
    submit_url = "http://resourcemanager:8088/ws/v1/cluster/apps/new-application"
    response = requests.post(submit_url)
    if response.status_code == 200:
        application_id = response.json()['application-id']
        submit_job_url = f"http://resourcemanager:8088/ws/v1/cluster/apps/{application_id}/app"
        data = {
            "application-id": application_id,
            "application-name": "Hadoop Job",
            "am-container-spec": {
                "commands": {
                    "command": f"hadoop jar {jar_path} {input_path} {output_path}"
                },
                "local-resources": {
                    "entry": [
                        {
                            "key": "AppMaster.jar",
                            "value": {
                                "resource": "file:/path/to/AppMaster.jar",
                                "type": "FILE"
                            }
                        }
                    ]
                }
            }
        }
        response = requests.post(submit_job_url, json=data)
        if response.status_code == 202:
            print(f"Hadoop job submitted. Application ID: {application_id}")
            return application_id
        else:
            print("


8. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.

In [None]:
import requests
import time

def submit_hadoop_job(jar_path, input_path, output_path, memory, vcores):
    submit_url = "http://resourcemanager:8088/ws/v1/cluster/apps/new-application"
    response = requests.post(submit_url)
    if response.status_code == 200:
        application_id = response.json()['application-id']
        submit_job_url = f"http://resourcemanager:8088/ws/v1/cluster/apps/{application_id}/app"
        data = {
            "application-id": application_id,
            "application-name": "Hadoop Job",
            "am-container-spec": {
                "commands": {
                    "command": f"hadoop jar {jar_path} {input_path} {output_path}"
                },
                "local-resources": {
                    "entry": [
                        {
                            "key": "AppMaster.jar",
                            "value": {
                                "resource": "file:/path/to/AppMaster.jar",
                                "type": "FILE"
                            }
                        }
                    ]
                },
                "resources": {
                    "memory": memory,
                    "vCores": vcores
                }
            }
        }
        response = requests.post(submit_job_url, json=data)
        if response.status_code == 202:
            print(f"Hadoop job submitted. Application ID: {application_id}")
            return application_id
        else:
            print("Error occurred while submitting Hadoop job.")
    else:
        print("Error occurred while acquiring new application ID.")

def track_resource_usage(application_id):
    while True:
        app_url = f"http://resourcemanager:8088/ws/v1/cluster/apps/{application_id}"
        response = requests.get(app_url)
        if response.status_code == 200:
            app_info = response.json()['app']
            if app_info['state'] == 'RUNNING':
                resources = app_info['allocatedResources']
                memory = resources['memory']
                vcores = resources['vCores']
                print(f"Resource usage - Memory: {memory}, vCores: {vcores}")
            elif app_info['state'] in ['FINISHED', 'FAILED', 'KILLED']:
                print("Hadoop job execution completed.")
                break
        else:
            print("Error occurred while tracking resource usage.")
        time.sleep(5)

# Example usage
jar_file_path = "/path/to/MapReduceJob.jar"
input_file_path = "/path/to/input"
output_file_path = "/path/to/output"
memory_requirement = 4096
vcores_requirement = 4

application_id = submit_hadoop_job(jar_file_path, input_file_path, output_file_path, memory_requirement, vcores_requirement)
if application_id:
    track_resource_usage(application_id)


9. Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.

In [None]:
import subprocess
import time

def run_mapreduce_job(input_path, output_path, split_size):
    command = f"hadoop jar MapReduceJob.jar {input_path} {output_path} {split_size}"
    start_time = time.time()
    subprocess.run(command, shell=True)
    end_time = time.time()
    execution_time = end_time - start_time
    return execution_time

# Example usage
input_file_path = "/path/to/input"
output_file_path = "/path/to/output"
split_sizes = [64, 128, 256]  # In MB

for split_size in split_sizes:
    execution_time = run_mapreduce_job(input_file_path, output_file_path, split_size)
    print(f"Input split size: {split_size} MB | Execution time: {execution_time} seconds")
