In [None]:
Q1-Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.

# Import the required libraries
import configparser

# Read the Hadoop configuration file
config = configparser.ConfigParser()
config.read('hadoop_config_file.xml')

# Extract the core component details
core_components = config['core']
print('Core Components:')
for component in core_components:
    print(component)
Q2-Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.

# Import the required libraries
from hdfs import InsecureClient

def calculate_directory_size(hdfs_url, directory_path):
    # Create an HDFS client
    client = InsecureClient(hdfs_url)

    # Calculate the total file size in the directory
    total_size = 0
    files = client.list(directory_path, status=True)
    for file in files:
        if file['type'] == 'FILE':
            total_size += file['length']
    
    return total_size

# Usage example
hdfs_url = 'http://localhost:9870'
directory_path = '/user/data'
total_size = calculate_directory_size(hdfs_url, directory_path)
print(f'Total file size in {directory_path}: {total_size} bytes')

Q3-Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.

# Import the required libraries
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

class TopNWords(MRJob):

    def configure_args(self):
        super(TopNWords, self).configure_args()
        self.add_passthru_arg('--n', type=int, help='Number of top words to retrieve')

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_top_n_words)
        ]

    def mapper_get_words(self, _, line):
        words = re.findall(r'\w+', line)
        for word in words:
            yield word.lower(), 1

    def combiner_count_words(self, word, counts):
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)

    def reducer_top_n_words(self, _, word_counts):
        n = self.options.n
        sorted_word_counts = sorted(word_counts, reverse=True)
        for i in range(n):
            yield sorted_word_counts[i][1], sorted_word_counts[i][0]

if __name__ == '__main__':
    TopNWords.run()
Q4-Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.
python
Copy code
# Import the required libraries
import requests
import json

# NameNode health check
namenode_url = 'http://localhost:50070'
namenode_health_url = f'{namenode_url}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo'
namenode_response = requests.get(namenode_health_url)
namenode_data = json.loads(namenode_response.content)
namenode_health_status = namenode_data['beans'][0]['State']

print(f'NameNode Health Status: {namenode_health_status}')

# DataNode health check
datanode_url = 'http://localhost:50075'
datanode_health_url = f'{datanode_url}/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo'
datanode_response = requests.get(datanode_health_url)
datanode_data = json.loads(datanode_response.content)
datanode_health_status = datanode_data['beans'][0]['State']

print(f'DataNode Health Status: {datanode_health_status}')
Q5-Develop a Python program that lists all the files and directories in a specific HDFS path.

# Import the required libraries
from hdfs import InsecureClient

def list_hdfs_path(hdfs_url, directory_path):
    # Create an HDFS client
    client = InsecureClient(hdfs_url)

    # List all files and directories in the HDFS path
    paths = client.list(directory_path)
    for path in paths:
        print(path)

# Usage example
hdfs_url = 'http://localhost:9870'
directory_path = '/user/data'
list_hdfs_path(hdfs_url, directory_path)
Q6-Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.

# Import the required libraries
import requests
import json

# DataNode storage utilization analysis
datanode_url = 'http://localhost:50075'
datanode_info_url = f'{datanode_url}/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo'
datanode_response = requests.get(datanode_info_url)
datanode_data = json.loads(datanode_response.content)
datanode_info = datanode_data['beans'][0]
storage_report = datanode_info['StorageInfo']['storageReport']

# Find the node with the highest storage capacity
max_storage_node = max(storage_report, key=lambda x: x['capacity'])
max_storage_capacity = max_storage_node['capacity']

# Find the node with the lowest storage capacity
min_storage_node = min(storage_report, key=lambda x: x['capacity'])
min_storage_capacity = min_storage_node['capacity']

print(f'Highest Storage Capacity: {max_storage_capacity}')
print(f'Lowest Storage Capacity: {min_storage_capacity}')
Q7-Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.

# Import the required libraries
import requests
import json

# Submit a Hadoop job to YARN
resource_manager_url = 'http://localhost:8088'
submit_job_url = f'{resource_manager_url}/ws/v1/cluster/apps/new-application'
job_submission_response = requests.post(submit_job_url)
job_submission_data = json.loads(job_submission_response.content)
application_id = job_submission_data['application-id']

# Monitor job progress
application_url = f'{resource_manager_url}/ws/v1/cluster/apps/{application_id}'
application_response = requests.get(application_url)
application_data = json.loads(application_response.content)
progress = application_data['app']['progress']

# Retrieve job output
output_url = f'{resource_manager_url}/ws/v1/cluster/apps/{application_id}/state'
output_response = requests.get(output_url)
output_data = json.loads(output_response.content)
final_output = output_data['app']['finalOutput']

print(f'Job Progress: {progress}')
print(f'Job Final Output: {final_output}')
Q8-Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.

# Import the required libraries
import requests
import json

# Submit a Hadoop job to YARN with resource requirements
resource_manager_url = 'http://localhost:8088'
submit_job_url = f'{resource_manager_url}/ws/v1/cluster/apps/new-application'
job_submission_response = requests.post(submit_job_url)
job_submission_data = json.loads(job_submission_response.content)
application_id = job_submission_data['application-id']

# Set resource requirements for the job
resource_requirements = {
    'memory': 2048,
    'vCores': 2
}
resource_requirements_url = f'{resource_manager_url}/ws/v1/cluster/apps/{application_id}'
resource_requirements_response = requests.put(resource_requirements_url, json=resource_requirements)

# Track resource usage during job execution
resource_usage_url = f'{resource_manager_url}/ws/v1/cluster/apps/{application_id}/resource-usage'
resource_usage_response = requests.get(resource_usage_url)
resource_usage_data = json.loads(resource_usage_response.content)
current_memory_usage = resource_usage_data['app']['resourceUsage']['memorySeconds']
current_vcores_usage = resource_usage_data['app']['resourceUsage']['vcoreSeconds']

print(f'Current Memory Usage: {current_memory_usage}')
print(f'Current vCores Usage: {current_vcores_usage}')

Q9-Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.

# Import the required libraries
from mrjob.job import MRJob
from mrjob.conf import combine_dicts
from mrjob.step import MRStep
import time

class PerformanceComparison(MRJob):

    def configure_args(self):
        super(PerformanceComparison, self).configure_args()
        self.add_passthru_arg('--split-size', type=int, help='Input split size in MB')

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer)
        ]

    def mapper(self, _, line):
        # Perform some mapping operation
        time.sleep(0.1)

    def reducer(self, key, values):
        # Perform some reducing operation
        time.sleep(0.1)

if __name__ == '__main__':
    PerformanceComparison.run()