## Python program to read a Hadoop configuration file and display the core components of Hadoop

### CODE

In [None]:
import configparser

# Read the Hadoop configuration file
config = configparser.ConfigParser()
config.read('hadoop.conf')

# Retrieve the core components of Hadoop
core_components = config.get('core-site', 'fs.defaultFS')

# Display the core components of Hadoop
print("Core components of Hadoop:")
print(core_components)


### EXPLAINATION

This program uses the configparser module to read a Hadoop configuration file (hadoop.conf). It retrieves the value of the fs.defaultFS property from the [core-site] section, which represents the core components of Hadoop. Finally, it prints the core components to the console.

## 2. Python function to calculate the total file size in a Hadoop Distributed File System (HDFS) directory

### CODE

In [None]:
import subprocess

def get_directory_size(directory):
    # Run the hdfs dfs -du command to get the directory size
    command = ['hdfs', 'dfs', '-du', '-s', '-h', directory]
    result = subprocess.check_output(command).decode().strip().split('\t')

    # Extract the size value from the command output
    size = result[0]

    return size

# Example usage
directory = '/user/example'
total_size = get_directory_size(directory)
print(f"Total file size in {directory}: {total_size}")


### EXPLAINATION

This Python function uses the subprocess module to run the hdfs dfs -du -s -h command, which calculates the total size of a directory in the Hadoop Distributed File System (HDFS). The function takes a directory path as input, executes the command using subprocess.check_output(), and extracts the size value from the command output. The function returns the total file size as a string.

## Python program to extract and display the top N most frequent words from a large text file using the MapReduce approach


### CODE

In [None]:
from collections import Counter
import multiprocessing

def mapper(chunk):
    # Split the chunk into words and count their occurrences
    words = chunk.split()
    word_counts = Counter(words)

    # Emit each word with its count
    return word_counts.items()

def reducer(mapped_items):
    # Aggregate the counts of the same word across different chunks
    word_counts = Counter()
    for item in mapped_items:
        word, count = item
        word_counts[word] += count

    return word_counts

def get_top_n_words(filename, n):
    # Read the file and split it into chunks
    with open(filename, 'r') as file:
        chunks = file.read().split('\n')

    # Process the chunks in parallel using multiple processes
    pool = multiprocessing.Pool()
    mapped_items = pool.map(mapper, chunks)
    word_counts = reducer(mapped_items)

    # Get the top N most frequent words
    top_n_words = word_counts.most_common(n)

    return top_n_words

# Example usage
filename = 'large_text_file.txt'
n = 10
top_words = get_top_n_words(filename, n)
print(f"Top {n} most frequent words:")
for word, count in top_words:
    print(f"{word}: {count}")


### EXPLAINATION

This program demonstrates a simple MapReduce approach to extract and display the top N most frequent words from a large text file. The mapper function takes a chunk of text, splits it into words, and counts their occurrences using the Counter class. It returns the word-count pairs. The reducer function aggregates the word counts from different chunks. The get_top_n_words function reads the large text file, splits it into chunks, and processes them in parallel using multiple processes. It applies the mapper function to each chunk, then reduces the mapped items using the reducer function. Finally, it retrieves the top N most frequent words using the most_common method of the Counter object and returns them

## Python script to check the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API

### CODE

In [None]:
import requests

def get_node_health(node_type, hostname, port):
    # Make a GET request to the Hadoop REST API
    url = f"http://{hostname}:{port}/jmx?qry=Hadoop:service=NameNode,name={node_type}Info"
    response = requests.get(url)

    # Extract the health status from the API response
    health_status = response.json()['beans'][0]['State']

    return health_status

# Example usage
namenode_hostname = 'namenode.example.com'
datanode_hostname = 'datanode.example.com'
port = 50070

namenode_status = get_node_health('NameNode', namenode_hostname, port)
datanode_status = get_node_health('DataNode', datanode_hostname, port)

print(f"NameNode status: {namenode_status}")
print(f"DataNode status: {datanode_status}")


### EXPLAINATION

This Python script uses the requests module to make a GET request to the Hadoop NameNode and DataNode REST APIs. The get_node_health function takes the node type (e.g., 'NameNode' or 'DataNode'), hostname, and port as input. It constructs the URL for the respective API endpoint and retrieves the health status from the response. Finally, the script calls the get_node_health function for the NameNode and DataNode, and prints their respective health statuses.



## Python program to list all the files and directories in a specific HDFS path:

### CODE

In [None]:
import subprocess

def list_hdfs_path(path):
    # Run the hdfs dfs -ls command to list the files and directories in the HDFS path
    command = ['hdfs', 'dfs', '-ls', path]
    result = subprocess.check_output(command).decode().strip().split('\n')

    # Extract the file and directory names from the command output
    items = [line.split()[-1] for line in result[1:]]

    return items

# Example usage
hdfs_path = '/user/example'
items = list_hdfs_path(hdfs_path)
print(f"Files and directories in {hdfs_path}:")
for item in items:
    print(item)


### EXPLAINATION

This Python program uses the subprocess module to run the hdfs dfs -ls command, which lists all the files and directories in a specific HDFS path. The program takes the HDFS path as input, executes the command using subprocess.check_output(), and extracts the file and directory names from the command output. Finally, it prints the list of files and directories to the console.



## Python program to analyze the storage utilization of DataNodes in a Hadoop cluster and identify the nodes with the highest and lowest storage capacities

### CODE

In [None]:
import requests

def analyze_storage_utilization(hostname, port):
    # Make a GET request to the Hadoop REST API
    url = f"http://{hostname}:{port}/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo"
    response = requests.get(url)

    # Extract the storage utilization information from the API response
    data = response.json()['beans'][0]
    total_capacity = data['Capacity']
    used_capacity = data['Used']
    remaining_capacity = data['Remaining']

    # Find the DataNode with the highest and lowest storage capacities
    datanodes = data['DatanodeVolumeInfo']
    datanodes.sort(key=lambda x: x['usedSpace'], reverse=True)
    highest_capacity_node = datanodes[0]['storageID']
    lowest_capacity_node = datanodes[-1]['storageID']

    return total_capacity, used_capacity, remaining_capacity, highest_capacity_node, lowest_capacity_node

# Example usage
datanode_hostname = 'datanode.example.com'
port = 50075

total_capacity, used_capacity, remaining_capacity, highest_capacity_node, lowest_capacity_node = analyze_storage_utilization(datanode_hostname, port)

print(f"Total capacity: {total_capacity}")
print(f"Used capacity: {used_capacity}")
print(f"Remaining capacity: {remaining_capacity}")
print(f"Highest capacity node: {highest_capacity_node}")
print(f"Lowest capacity node: {lowest_capacity_node}")


### EXPLAINATION

This Python program uses the requests module to make a GET request to the Hadoop DataNode REST API. The analyze_storage_utilization function takes the DataNode hostname and port as input. It constructs the URL for the DataNode API endpoint and retrieves the storage utilization information from the response. The program extracts the total capacity, used capacity, and remaining capacity from the API response. It also analyzes the storage capacities of individual DataNodes by sorting them based on their used space. Finally, the program returns the total capacity, used capacity, remaining capacity, and identifies the DataNode with the highest and lowest storage capacities.

##  Python script to interact with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output

### CODE

In [None]:
import requests
import time

def submit_hadoop_job(job_data, resource_manager_url):
    # Submit the Hadoop job using the YARN REST API
    url = f"{resource_manager_url}/ws/v1/cluster/apps/new-application"
    response = requests.post(url)
    application_id = response.json()['application-id']

    url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}/app"
    response = requests.post(url, json=job_data)

    return application_id

def monitor_job_status(application_id, resource_manager_url):
    # Monitor the progress of the Hadoop job
    url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}/appattempts"
    while True:
        response = requests.get(url)
        status = response.json()['appAttempts']['appAttempt'][0]['state']
        if status != 'RUNNING':
            break
        time.sleep(5)

def retrieve_job_output(application_id, resource_manager_url):
    # Retrieve the final output of the Hadoop job
    url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}/appattempts"
    response = requests.get(url)
    output = response.json()['appAttempts']['appAttempt'][0]['appAttemptId']

    return output

# Example usage
job_data = {
    "application-id": "my-hadoop-job",
    "application-name": "My Hadoop Job",
    # Additional job configuration parameters...
}

resource_manager_url = 'http://localhost:8088'

application_id = submit_hadoop_job(job_data, resource_manager_url)
print(f"Job submitted. Application ID: {application_id}")

monitor_job_status(application_id, resource_manager_url)
print("Job completed.")

output = retrieve_job_output(application_id, resource_manager_url)
print(f"Job output: {output}")


### EXPLAINATION

This Python script interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output. The submit_hadoop_job function submits the Hadoop job by making a POST request to the /ws/v1/cluster/apps/new-application and /ws/v1/cluster/apps/{application_id}/app endpoints. It returns the application ID assigned to the submitted job. The monitor_job_status function continuously checks the job status by making a GET request to the /ws/v1/cluster/apps/{application_id}/appattempts endpoint until the job completes. The retrieve_job_output function retrieves the final output of the job by making a GET request to the /ws/v1/cluster/apps/{application_id}/appattempts endpoint. The example usage section demonstrates how to use these functions and print relevant information about the submitted Hadoop job.



## Python script to interact with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution:

### CODE

In [None]:
import requests
import time

def submit_hadoop_job(job_data, resource_manager_url):
    # Submit the Hadoop job using the YARN REST API
    url = f"{resource_manager_url}/ws/v1/cluster/apps/new-application"
    response = requests.post(url)
    application_id = response.json()['application-id']

    url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}/app"
    response = requests.post(url, json=job_data)

    return application_id

def monitor_resource_usage(application_id, resource_manager_url):
    # Monitor the resource usage of the Hadoop job
    url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}/metrics"
    while True:
        response = requests.get(url)
        metrics = response.json()['clusterMetrics']
        allocated_memory = metrics['allocatedMB']
        allocated_vcores = metrics['allocatedVirtualCores']
        print(f"Allocated memory: {allocated_memory} MB")
        print(f"Allocated vCores: {allocated_vcores}")
        if metrics['appsSubmitted'] == metrics['appsCompleted']:
            break
        time.sleep(5)

# Example usage
job_data = {
    "application-id": "my-hadoop-job",
    "application-name": "My Hadoop Job",
    "am-resource": {
        "memory": 2048,
        "vCores": 2
    },
    # Additional job configuration parameters...
}

resource_manager_url = 'http://localhost:8088'

application_id = submit_hadoop_job(job_data, resource_manager_url)
print(f"Job submitted. Application ID: {application_id}")

monitor_resource_usage(application_id, resource_manager_url)
print("Resource usage monitoring completed.")


### EXPLAINATION

This Python script interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution. The submit_hadoop_job function submits the Hadoop job by making a POST request to the /ws/v1/cluster/apps/new-application and /ws/v1/cluster/apps/{application_id}/app endpoints. It returns the application ID assigned to the submitted job. The monitor_resource_usage function continuously checks the resource usage of the job by making a GET request




