1. Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.


In [None]:
import configparser

# Create a ConfigParser object
config = configparser.ConfigParser()

# Read the Hadoop configuration file
config.read('/path/to/hadoop/conf/core-site.xml')

# Get the core components from the configuration file
core_components = config['default']['fs.defaultFS']

# Display the core components of Hadoop
print("Core Components of Hadoop:")
print(core_components)

2. Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.

In [None]:
from hdfs import InsecureClient

def calculate_directory_size(hdfs_host, hdfs_port, directory_path):
    # Create an HDFS client
    client = InsecureClient(f"http://{hdfs_host}:{hdfs_port}")

    # Get a list of files in the directory
    file_list = client.list(directory_path, status=True)

    # Calculate the total file size
    total_size = sum(status['length'] for path, status in file_list if not status['type'])

    # Return the total file size
    return total_size


3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import heapq


class TopNWords(MRJob):
    
    def configure_args(self):
        super(TopNWords, self).configure_args()
        self.add_passthru_arg('--N', type=int, default=10, help='Number of top words to display')

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_topN_words)
        ]

    def mapper_get_words(self, _, line):
        for word in line.split():
            yield word.lower(), 1

    def combiner_count_words(self, word, counts):
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)

    def reducer_find_topN_words(self, _, word_count_pairs):
        N = self.options.N
        topN_words = heapq.nlargest(N, word_count_pairs)
        for count, word in topN_words:
            yield word, count

if __name__ == '__main__':
    TopNWords.run()


4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.

In [None]:
import requests

# Hadoop cluster URL and port
hadoop_url = 'http://your_hadoop_cluster_url'
hadoop_port = '50070'  # NameNode port

# Check NameNode health
namenode_url = f'{hadoop_url}:{hadoop_port}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus'
namenode_response = requests.get(namenode_url).json()
namenode_status = namenode_response['beans'][0]['State']

print('NameNode Status:', namenode_status)

# Check DataNode health
datanode_url = f'{hadoop_url}:{hadoop_port}/jmx?qry=Hadoop:service=DataNode,name=FSDatasetState-*'
datanode_response = requests.get(datanode_url).json()
datanode_states = datanode_response['beans']

print('DataNode Status:')
for datanode_state in datanode_states:
    datanode_name = datanode_state['name']
    datanode_status = datanode_state['VolumeInfo']
    print(f'{datanode_name}: {datanode_status}')


5. Develop a Python program that lists all the files and directories in a specific HDFS path

In [None]:
import pyarrow.hdfs as hdfs

def list_hdfs_path_files_and_directories(hdfs_path):
    # Connect to the HDFS file system
    fs = hdfs.connect()

    # List all files and directories in the given HDFS path
    files_and_dirs = fs.ls(hdfs_path)

    # Print the list of files and directories
    for item in files_and_dirs:
        print(item)

# Specify the HDFS path to list
hdfs_path = '/path/to/hdfs/directory'

# Call the function to list files and directories
list_hdfs_path_files_and_directories(hdfs_path)


6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.

In [None]:
import subprocess

def get_data_node_storage_utilization():
    # Execute Hadoop CLI command to retrieve DataNode storage information
    command = "hdfs dfsadmin -report"
    output = subprocess.check_output(command.split()).decode()

    # Parse the output to extract DataNode storage information
    data_nodes = output.split("Name: ")[1:]
    storage_utilization = {}

    # Extract storage information for each DataNode
    for node in data_nodes:
        lines = node.split("\n")
        name = lines[0]
        storage = [line.split(":") for line in lines[1:] if "DFS Used%" in line]
        storage_utilization[name] = int(storage[0][1].strip())

    return storage_utilization

# Get storage utilization of DataNodes
storage_utilization = get_data_node_storage_utilization()

# Identify the node with the highest and lowest storage capacities
highest_capacity_node = max(storage_utilization, key=storage_utilization.get)
lowest_capacity_node = min(storage_utilization, key=storage_utilization.get)

# Print the results
print("Node with the highest storage capacity:", highest_capacity_node)
print("Node with the lowest storage capacity:", lowest_capacity_node)


7. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.

In [None]:
from yarn_api_client import ApplicationMaster, HistoryServer, ResourceManager
from time import sleep

def submit_hadoop_job(jar_path, main_class, input_path, output_path):
    # Connect to the YARN ResourceManager API
    resourcemanager = ResourceManager()

    # Submit the Hadoop job
    application_id = resourcemanager.submit_new_application(jar_path, main_class, input_path, output_path)
    print("Job submitted. Application ID:", application_id)

    return application_id

def monitor_job_progress(application_id):
    # Connect to the YARN ResourceManager API
    resourcemanager = ResourceManager()

    # Monitor the job progress
    while True:
        status = resourcemanager.application_status(application_id)
        print("Job status:", status['state'])

        if status['state'] in ['FINISHED', 'FAILED', 'KILLED']:
            break

        sleep(10)

def retrieve_job_output(application_id):
    # Connect to the YARN HistoryServer API
    historyserver = HistoryServer()

    # Retrieve the final output of the job
    output = historyserver.get_job_output(application_id)
    print("Job output:", output)

# Specify the Hadoop job details


8. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.

In [None]:
from yarn_api_client import ApplicationMaster, ResourceManager
from time import sleep

def submit_hadoop_job(jar_path, main_class, input_path, output_path, num_executors, executor_memory, executor_cores):
    # Connect to the YARN ResourceManager API
    resourcemanager = ResourceManager()

    # Set resource requirements for the Hadoop job
    resource = {
        "memory": executor_memory,
        "vCores": executor_cores
    }

    # Submit the Hadoop job with resource requirements
    application_id = resourcemanager.submit_new_application(jar_path, main_class, input_path, output_path,
                                                            resource, num_executors)
    print("Job submitted. Application ID:", application_id)

    return application_id

def monitor_resource_usage(application_id):
    # Connect to the YARN ApplicationMaster API
    applicationmaster = ApplicationMaster(application_id)

    # Monitor resource usage during job execution
    while True:
        resource_usage = applicationmaster.resource_usage()
        print("Resource usage:", resource_usage)

        if resource_usage['state'] in ['FINISHED', 'FAILED', 'KILLED']:
            break

        sleep(10)

# Specify the Hadoop job details and resource requirements
jar_path = '/path/to/hadoop-job.jar'
main_class = 'com.example.hadoopjob.Main'
input_path = '/path/to/input'
output_path = '/path/to/output'
num_executors = 5
executor_memory = "2g"
executor_cores = 2

# Submit the Hadoop job with resource requirements
application_id = submit_hadoop_job(jar_path, main_class, input_path, output_path, num_executors, executor_memory, executor_cores)

# Monitor resource usage during job execution
monitor_resource_usage(application_id)


9. Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import time

class WordCountJob(MRJob):
    def configure_args(self):
        super(WordCountJob, self).configure_args()
        self.add_passthru_arg('--split-size', type=int, default=64,
                              help='Input split size in megabytes')

    def job_runner_kwargs(self):
        kwargs = super(WordCountJob, self).job_runner_kwargs()
        kwargs['hadoop_extra_args'] = ['-D', 'mapreduce.input.fileinputformat.split.maxsize=' + str(self.options.split_size * 1024 * 1024)]
        return kwargs

    def mapper(self, _, line):
        for word in line.split():
            yield word.lower(), 1

    def reducer(self, word, counts):
        yield word, sum(counts)

    def steps(self):
        return [
            MRStep(mapper=self.mapper, reducer=self.reducer)
        ]

# Specify the input file path
input_file = '/path/to/input.txt'

# Specify the different split sizes to compare
split_sizes = [32, 64, 128]  # in megabytes

# Run the job with different split sizes and measure execution time
for split_size in split_sizes:
    start_time = time.time()

    # Construct the command-line arguments for the job
    args = ['--split-size', str(split_size), input_file]

    # Run the job
    job = WordCountJob(args=args)
    with job.make_runner() as runner:
        runner.run()

    execution_time = time.time() - start_time

    # Print the execution time for each split size
    print(f"Split Size: {split_size} MB, Execution Time: {execution_time} seconds")
