1. Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.


In [None]:
import configparser

def read_hadoop_config(config_file_path):
    config = configparser.ConfigParser()
    config.read(config_file_path)
    
    core_components = config.sections()
    
    return core_components

hadoop_config_file = 'hadoop-config-file.conf'
core_components = read_hadoop_config(hadoop_config_file)

print("Core Components of Hadoop:")
for component in core_components:
    print(component)


2. Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.

In [None]:
from hdfs import InsecureClient

def calculate_total_file_size(hdfs_url, hdfs_directory):
    client = InsecureClient(hdfs_url)

    total_size = 0

    def get_directory_size(directory):
        nonlocal total_size
        contents = client.list(directory)
        for item in contents:
            item_path = f"{directory}/{item}"
            if client.status(item_path)['type'] == 'DIRECTORY':
                get_directory_size(item_path)
            else:
                total_size += client.status(item_path)['length']

    get_directory_size(hdfs_directory)

    return total_size

hdfs_url = 'hdfs://localhost:9000'
hdfs_directory = '/user/data'

total_size = calculate_total_file_size(hdfs_url, hdfs_directory)
print(f"Total file size in HDFS directory '{hdfs_directory}': {total_size} bytes")


3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import heapq

class TopNWords(MRJob):

    def configure_args(self):
        super(TopNWords, self).configure_args()
        self.add_passthru_arg('--top-n', type=int, help='Number of top words to display')

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer_init=self.reducer_init,
                   reducer=self.reducer_find_top_n)
        ]

    def mapper_get_words(self, _, line):
        for word in line.strip().split():
            yield word.lower(), 1

    def combiner_count_words(self, word, counts):
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)

    def reducer_init(self):
        self.top_n = self.options.top_n
        self.heap = []

    def reducer_find_top_n(self, _, count_word_pairs):
        for count, word in count_word_pairs:
            heapq.heappush(self.heap, (count, word))
            if len(self.heap) > self.top_n:
                heapq.heappop(self.heap)

        while self.heap:
            count, word = heapq.heappop(self.heap)
            yield word, count

    def reducer_final(self):
        for word, count in self.heap:
            yield word, count

if __name__ == '__main__':
    TopNWords.run()


4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.

In [None]:
import requests

def check_namenode_health(namenode_url):
    url = f"{namenode_url}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo"
    response = requests.get(url)
    data = response.json()

    if data['beans'][0]['State'] == 'active':
        print("NameNode is active and healthy.")
    else:
        print("NameNode is not active or healthy.")

def check_datanode_health(datanode_url):
    url = f"{datanode_url}/jmx?qry=Hadoop:service=DataNode,name=FSDatasetState-0"
    response = requests.get(url)
    data = response.json()

    if data['beans'][0]['FSDatasetState'] == 'NORMAL':
        print("DataNode is active and healthy.")
    else:
        print("DataNode is not active or healthy.")


namenode_url = 'http://namenode:50070'
check_namenode_health(namenode_url)


datanode_url = 'http://datanode:50075'
check_datanode_health(datanode_url)


5. Develop a Python program that lists all the files and directories in a specific HDFS path.

In [None]:
def list_hdfs_path(hdfs_url, hdfs_path):
    client = InsecureClient(hdfs_url)
    contents = client.list(hdfs_path, status=True)

    print(f"Listing contents of HDFS path: {hdfs_path}")
    for item in contents:
        path = item['path']
        if item['type'] == 'DIRECTORY':
            print(f"[DIR] {path}")
        else:
            print(f"[FILE] {path}")

hdfs_url = 'hdfs://localhost:9000'
hdfs_path = '/user/data'

list_hdfs_path(hdfs_url, hdfs_path)


6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.


In [None]:
import requests

def analyze_storage_utilization(hdfs_url):
    url = f"{hdfs_url}/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo"
    response = requests.get(url)
    data = response.json()

    datanodes = data['beans']
    storage_utilization = []

    for datanode in datanodes:
        node_name = datanode['name'].replace('DataNodeInfo', '')
        capacity = datanode['Capacity']
        remaining = datanode['Remaining']

        storage_utilization.append((node_name, capacity, remaining))

    storage_utilization.sort(key=lambda x: float(x[1]), reverse=True)

    print("Storage Utilization Analysis:")
    print("-" * 40)
    print("Node\t\tCapacity\tRemaining")
    print("-" * 40)

    for node in storage_utilization:
        print(f"{node[0]}\t{node[1]}\t{node[2]}")

    highest_capacity_node = storage_utilization[0]
    lowest_capacity_node = storage_utilization[-1]

    print("\nNode with Highest Capacity:")
    print(f"Node: {highest_capacity_node[0]}")
    print(f"Capacity: {highest_capacity_node[1]}")
    print(f"Remaining: {highest_capacity_node[2]}")

    print("\nNode with Lowest Capacity:")
    print(f"Node: {lowest_capacity_node[0]}")
    print(f"Capacity: {lowest_capacity_node[1]}")
    print(f"Remaining: {lowest_capacity_node[2]}")

# Replace 'http://datanode:50075' with the URL of your DataNode
hdfs_url = 'http://datanode:50075'

analyze_storage_utilization(hdfs_url)


7. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.


In [None]:
import requests
import time

def submit_hadoop_job(resource_manager_url, job_file_path):
    submit_url = f"{resource_manager_url}/ws/v1/cluster/apps/new-application"
    response = requests.post(submit_url)
    data = response.json()

    if 'application-id' in data['application']:
        application_id = data['application']['application-id']
        print(f"Submitted Hadoop job. Application ID: {application_id}")
    else:
        print("Failed to submit Hadoop job.")
        return

    submit_job_url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}/submit"
    files = {'file': open(job_file_path, 'rb')}
    response = requests.post(submit_job_url, files=files)
    data = response.json()

    if 'application-id' in data['app']:
        print("Hadoop job submitted successfully.")
        return application_id
    else:
        print("Failed to submit Hadoop job.")
        return

def monitor_job_progress(resource_manager_url, application_id):
    status_url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}"
    while True:
        response = requests.get(status_url)
        data = response.json()

        if 'app' in data and 'state' in data['app']:
            state = data['app']['state']
            if state == 'FINISHED':
                print("Hadoop job finished.")
                return
            elif state == 'FAILED':
                print("Hadoop job failed.")
                return
            elif state == 'KILLED':
                print("Hadoop job killed.")
                return
            else:
                progress = data['app']['progress']
                print(f"Job progress: {progress}%")
        else:
            print("Failed to retrieve job status.")
            return

        time.sleep(5)

def retrieve_job_output(resource_manager_url, application_id):
    log_url = f"{resource_manager_url}/proxy/{application_id}/logs/"
    response = requests.get(log_url)
    print("Job output:")
    print(response.text)

# Replace 'http://resource_manager:8088' with the URL of your YARN ResourceManager
resource_manager_url = 'http://resource_manager:8088'
# Replace 'path/to/your/job.jar' with the actual path to your Hadoop job JAR file
job_file_path = 'path/to/your/job.jar'

application_id = submit_hadoop_job(resource_manager_url, job_file_path)
if application_id:
    monitor_job_progress(resource_manager_url, application_id)
    retrieve_job_output(resource_manager_url, application_id)


9. Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import time

class WordCountJob(MRJob):

    def configure_args(self):
        super(WordCountJob, self).configure_args()
        self.add_passthru_arg('--input-split-size', type=int, default=64,
                              help='Input split size in MB')

    def mapper(self, _, line):
        for word in line.strip().split():
            yield word.lower(), 1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield word, sum(counts)

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   combiner=self.combiner,
                   reducer=self.reducer)
        ]

if __name__ == '__main__':
    split_sizes = [32, 64, 128, 256]  # Different input split sizes in MB

    for split_size in split_sizes:
        start_time = time.time()

        # Replace '<input_file>' with the actual path to your input file
        input_file = '<input_file>'
        output_dir = f'output_{split_size}'

        job = WordCountJob(args=[input_file, f'--output-dir={output_dir}',
                                 f'--input-split-size={split_size}'])
        with job.make_runner() as runner:
            runner.run()

        execution_time = time.time() - start_time
        print(f"Input Split Size: {split_size} MB | Execution Time: {execution_time} seconds")
