### 1. Here's a Python program to read a Hadoop configuration file and display the core components of Hadoop:

def display_hadoop_core_components():
    with open('hadoop-config.xml', 'r') as file:
        config_lines = file.readlines()

    core_components = []

    for line in config_lines:
        if '<name>fs.defaultFS</name>' in line:
            core_components.append('HDFS (NameNode and DataNode)')
        elif '<name>yarn.resourcemanager.hostname</name>' in line:
            core_components.append('YARN (ResourceManager)')
        elif '<name>mapreduce.framework.name</name>' in line:
            core_components.append('MapReduce (JobTracker and TaskTracker)')

    print("Core components of Hadoop:")
    for component in core_components:
        print(component)


### 2. Here's an example of a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory using the hdfs library:

from hdfs import InsecureClient

def calculate_hdfs_directory_size(directory):
    client = InsecureClient('http://localhost:50070', user='hadoop')

    total_size = 0

    for root, dirs, files in client.walk(directory):
        for file in files:
            file_path = root + '/' + file
            file_info = client.status(file_path)
            file_size = file_info['length']
            total_size += file_size

    return total_size


### 3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.

from collections import Counter
import re

def extract_top_frequent_words(file_path, top_n):
    with open(file_path, 'r') as file:
        text = file.read()

    # Preprocess the text by removing non-alphanumeric characters and converting to lowercase
    text = re.sub('[^a-zA-Z0-9 ]', '', text.lower())

    # Split the text into words
    words = text.split()

    # Count the frequency of each word using Counter
    word_counts = Counter(words)

    # Get the top N most frequent words
    top_frequent_words = word_counts.most_common(top_n)

    # Display the top N most frequent words
    print(f"Top {top_n} most frequent words:")
    for word, count in top_frequent_words:
        print(f"{word}: {count}")




### 4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.

import requests

def check_service_health(service_url):
    response = requests.get(service_url)
    data = response.json()
    if "healthStatus" in data:
        return data["healthStatus"]
    return None

#Check the health status of the NameNode
namenode_url = "http://localhost:9870/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo"
namenode_status = check_service_health(namenode_url)
if namenode_status:
    print("NameNode status:", namenode_status)
else:
    print("Failed to retrieve NameNode status.")

#Check the health status of DataNodes
datanodes_url = "http://localhost:9870/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo"
response = requests.get(datanodes_url)
data = response.json()
if "beans" in data:
    datanode_statuses = [node["state"] for node in data["beans"]]
    print("DataNode statuses:")
    for i, status in enumerate(datanode_statuses):
        print(f"DataNode {i+1}: {status}")
else:
    print("Failed to retrieve DataNode statuses.")



### 5. Develop a Python program that lists all the files and directories in a specific HDFS path.

from hdfs import InsecureClient

def list_hdfs_path_contents(directory):
    client = InsecureClient('http://localhost:9870', user='hadoop')

    contents = client.list(directory, status=True)

    print("Files and directories in", directory + ":")
    for path, status in contents:
        print("Path:", path)
        print("Type:", 'File' if status['type'] == 'FILE' else 'Directory')
        print("Size:", status['length'] if status['type'] == 'FILE' else '-')
        print()



### 6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.

from hdfs import InsecureClient

def analyze_datanode_storage_utilization():
    client = InsecureClient('http://localhost:9870', user='hadoop')

    datanodes = client.list('/datanodes', status=True)

    if not datanodes:
        print("No DataNodes found.")
        return

    #Sort DataNodes by storage utilization
    sorted_datanodes = sorted(datanodes, key=lambda node: node[1]['capacity'], reverse=True)

    print("DataNode storage utilization analysis:")
    print("Highest Storage Capacity:")
    print("DataNode:", sorted_datanodes[0][0])
    print("Capacity:", sorted_datanodes[0][1]['capacity'])
    print()

    print("Lowest Storage Capacity:")
    print("DataNode:", sorted_datanodes[-1][0])
    print("Capacity:", sorted_datanodes[-1][1]['capacity'])
    print()




### 7.  Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.

import requests
import time

def submit_hadoop_job(jar_path, input_path, output_path):
    # Submit the Hadoop job
    headers = {'Content-Type': 'application/json'}
    data = {
        "application-id": "my_job",
        "application-name": "My Hadoop Job",
        "am-container-spec": {
            "commands": {
                "command": f"hadoop jar {jar_path} {input_path} {output_path}"
            }
        },
        "container": {
            "vcores": 1,
            "memory": 1024
        },
        "application-type": "MAPREDUCE"
    }
    response = requests.post("http://localhost:8088/ws/v1/cluster/apps", json=data, headers=headers)
    if response.status_code == 202:
        print("Hadoop job submitted successfully.")
    else:
        print("Failed to submit Hadoop job.")

    # Monitor the progress of the Hadoop job
    job_id = response.json()["application-id"]
    job_status_url = f"http://localhost:8088/ws/v1/cluster/apps/{job_id}"
    while True:
        response = requests.get(job_status_url)
        data = response.json()
        state = data["app"]["state"]
        if state == "FINISHED":
            print("Hadoop job completed.")
            break
        elif state == "FAILED":
            print("Hadoop job failed.")
            break
        elif state == "KILLED":
            print("Hadoop job killed.")
            break
        else:
            print("Hadoop job is still running. Progress:", data["app"]["progress"])
            time.sleep(5)

    # Retrieve the final output of the Hadoop job
    final_output_url = f"http://localhost:8088/ws/v1/cluster/apps/{job_id}/appattempts"
    response = requests.get(final_output_url)
    data = response.json()
    final_output = data["appAttempts"]["appAttempt"][0]["logsLink"]

    print("Final output:", final_output)



### 8. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.

import requests
import time

def submit_hadoop_job(jar_path, input_path, output_path, vcores, memory):
    # Submit the Hadoop job
    headers = {'Content-Type': 'application/json'}
    data = {
        "application-id": "my_job",
        "application-name": "My Hadoop Job",
        "am-container-spec": {
            "commands": {
                "command": f"hadoop jar {jar_path} {input_path} {output_path}"
            }
        },
        "container": {
            "vcores": vcores,
            "memory": memory
        },
        "application-type": "MAPREDUCE"
    }
    response = requests.post("http://localhost:8088/ws/v1/cluster/apps", json=data, headers=headers)
    if response.status_code == 202:
        print("Hadoop job submitted successfully.")
    else:
        print("Failed to submit Hadoop job.")

    # Get the application ID
    job_id = response.json()["application-id"]

    # Track resource usage during job execution
    while True:
        response = requests.get(f"http://localhost:8088/ws/v1/cluster/apps/{job_id}")
        data = response.json()
        state = data["app"]["state"]
        if state == "FINISHED":
            print("Hadoop job completed.")
            break
        elif state == "FAILED":
            print("Hadoop job failed.")
            break
        elif state == "KILLED":
            print("Hadoop job killed.")
            break
        else:
            resources_allocated = data["app"]["allocatedMB"]
            vcores_allocated = data["app"]["allocatedVCores"]
            resources_used = data["app"]["memorySeconds"]
            vcores_used = data["app"]["vcoreSeconds"]
            print("Resource usage:")
            print("Allocated Resources (MB):", resources_allocated)
            print("Allocated vCores:", vcores_allocated)
            print("Used Resources (Memory Seconds):", resources_used)
            print("Used vCores Seconds:", vcores_used)
            print()
            time.sleep(5)




### 9.  Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.

import time
import subprocess

def run_mapreduce_job(input_path, split_size):
    # Set the input split size for the job
    hadoop_set_split_size_command = f"hadoop jar hadoop-streaming.jar -D mapred.max.split.size={split_size} -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input {input_path} -output output"
    
    # Run the MapReduce job and measure execution time
    start_time = time.time()
    subprocess.call(hadoop_set_split_size_command, shell=True)
    end_time = time.time()
    
    # Calculate the execution time
    execution_time = end_time - start_time
    
    return execution_time