1. Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.







In [1]:
import configparser

def read_hadoop_config(config_file):
    # Create a ConfigParser object
    config = configparser.ConfigParser()

    # Read the Hadoop configuration file
    config.read(config_file)

    # Get the core section from the configuration file
    if 'core-site' in config:
        core_section = config['core-site']
        print("Core Components of Hadoop:")
        for key in core_section:
            print(key)
    else:
        print("No core-site section found in the configuration file.")

# Provide the path to your Hadoop configuration file
config_file_path = '/path/to/hadoop-config-file.xml'

# Call the function to read and display the core components
read_hadoop_config(config_file_path)


No core-site section found in the configuration file.


2. Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.


In [None]:
from hdfs import InsecureClient

def calculate_directory_size(hdfs_url, directory_path):
    # Create an HDFS client
    client = InsecureClient(hdfs_url)

    # Get the file status of the directory
    dir_status = client.status(directory_path)

    # Check if the path is a directory
    if not dir_status['type'] == 'DIRECTORY':
        print("Error: The specified path is not a directory.")
        return

    # Initialize the total size to 0
    total_size = 0

    # Recursively calculate the size of each file in the directory
    def calculate_file_sizes(directory):
        nonlocal total_size
        for item in client.list(directory):
            item_path = directory + '/' + item['name']
            item_status = client.status(item_path)
            if item_status['type'] == 'DIRECTORY':
                calculate_file_sizes(item_path)
            else:
                total_size += item_status['length']

    # Call the recursive function to calculate file sizes
    calculate_file_sizes(directory_path)

    # Print the total file size
    print("Total file size in directory:", total_size, "bytes")

# Provide the HDFS URL and directory path
hdfs_url = 'http://localhost:50070'
directory_path = '/path/to/hdfs/directory'

# Call the function to calculate the directory size
calculate_directory_size(hdfs_url, directory_path)


3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.


In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_REGEX = re.compile(r"[\w']+")

class TopNWords(MRJob):

    def configure_args(self):
        super(TopNWords, self).configure_args()
        self.add_passthru_arg('--top', type=int, help='Number of top words to display')

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_top_words)
        ]

    def mapper_get_words(self, _, line):
        words = WORD_REGEX.findall(line)
        for word in words:
            yield word.lower(), 1

    def combiner_count_words(self, word, counts):
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)

    def reducer_find_top_words(self, _, word_count_pairs):
        top_n = self.options.top
        top_words = sorted(word_count_pairs, reverse=True)[:top_n]
        for count, word in top_words:
            yield word, count

if __name__ == '__main__':
    TopNWords.run()

4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.


In [None]:
import requests

# Hadoop cluster information
namenode_url = 'http://namenode:50070'
datanode_urls = ['http://datanode1:50075', 'http://datanode2:50075']  # Add more datanode URLs if necessary

def check_namenode_health():
    # Send a GET request to the Namenode's health endpoint
    response = requests.get(f'{namenode_url}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus')

    # Check the response status code
    if response.status_code == 200:
        data = response.json()
        # Get the health status from the response JSON
        health_status = data['beans'][0]['State']
        print("Namenode health status:", health_status)
    else:
        print("Failed to retrieve Namenode health status.")

def check_datanode_health():
    for datanode_url in datanode_urls:
        # Send a GET request to the Datanode's health endpoint
        response = requests.get(f'{datanode_url}/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo')

        # Check the response status code
        if response.status_code == 200:
            data = response.json()
            # Get the health status from the response JSON
            health_status = data['beans'][0]['DatanodeHealth']
            print(f"Datanode health status ({datanode_url}):", health_status)
        else:
            print(f"Failed to retrieve Datanode health status ({datanode_url}).")

# Check the health status of the NameNode and DataNodes
check_namenode_health()
check_datanode_health()


5. Develop a Python program that lists all the files and directories in a specific HDFS path.


---



In [None]:
from hdfs import InsecureClient

def list_hdfs_path(hdfs_url, path):
    # Create an HDFS client
    client = InsecureClient(hdfs_url)

    # List all files and directories in the given path
    files = client.list(path, status=True)

    # Print the files and directories
    print("Files and Directories in", path)
    for file in files:
        print(file['path'])

# Provide the HDFS URL and path to list
hdfs_url = 'http://localhost:50070'
path = '/path/to/hdfs/directory'

# Call the function to list the files and directories
list_hdfs_path(hdfs_url, path)


6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.


In [None]:
import requests

# Hadoop cluster information
namenode_url = 'http://namenode:50070'

def analyze_storage_utilization():
    # Send a GET request to the DataNodes endpoint to retrieve information about the DataNodes
    response = requests.get(f'{namenode_url}/jmx?qry=Hadoop:service=NameNode,name=DataNodeInfo')

    # Check the response status code
    if response.status_code == 200:
        data = response.json()
        datanodes = data['beans']

        # Create a dictionary to store the storage capacities of DataNodes
        storage_capacities = {}

        # Iterate over the DataNodes and extract their storage capacities
        for datanode in datanodes:
            node_name = datanode['DatanodeHostName']
            capacity = datanode['Capacity']
            storage_capacities[node_name] = capacity

        # Find the DataNode with the highest storage capacity
        highest_capacity_node = max(storage_capacities, key=storage_capacities.get)

        # Find the DataNode with the lowest storage capacity
        lowest_capacity_node = min(storage_capacities, key=storage_capacities.get)

        # Print the results
        print("DataNode with the highest storage capacity:", highest_capacity_node)
        print("DataNode with the lowest storage capacity:", lowest_capacity_node)
    else:
        print("Failed to retrieve DataNode information.")

# Analyze the storage utilization of DataNodes
analyze_storage_utilization()


7. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.


In [None]:
import requests
import time

# YARN ResourceManager information
resourcemanager_url = 'http://resourcemanager:8088'
application_id = None

# Submit a Hadoop job
def submit_hadoop_job():
    headers = {'Content-Type': 'application/json'}

    # Specify the Hadoop job details
    job_payload = {
        "application": {
            "applicationName": "MyHadoopJob",
            "amResource": {
                "vCores": 1,
                "memory": 1024
            },
            "resource": {
                "vCores": 1,
                "memory": 1024
            },
            "priority": 0,
            "queue": "default",
            "unmanagedAM": False,
            "keepContainersAcrossApplicationAttempts": False,
            "maxAppAttempts": 2,
            "applicationType": "MAPREDUCE",
            "applicationTags": ""
        },
        "amContainerSpec": {
            "commands": {
                "command": "hadoop jar myjob.jar input.txt output"
            }
        },
        "applicationTimeouts": {
            "timeout": 0
        },
        "attemptFailuresValidityInterval": -1,
        "logAggregationContext": {
            "logAggregationEnabled": False,
            "logAggregationContext": {
                "logIncludePattern": "",
                "logExcludePattern": ""
            }
        }
    }

    # Send a POST request to submit the Hadoop job
    response = requests.post(f'{resourcemanager_url}/ws/v1/cluster/apps', json=job_payload, headers=headers)

    # Check the response status code
    if response.status_code == 202:
        data = response.json()
        global application_id
        application_id = data['application-id']
        print("Hadoop job submitted successfully. Application ID:", application_id)
    else:
        print("Failed to submit the Hadoop job.")

# Monitor job progress and retrieve final output
def monitor_job_progress():
    headers = {'Content-Type': 'application/json'}

    while True:
        # Send a GET request to retrieve job status
        response = requests.get(f'{resourcemanager_url}/ws/v1/cluster/apps/{application_id}', headers=headers)

        # Check the response status code
        if response.status_code == 200:
            data = response.json()
            status = data['app']['finalStatus']
            if status == 'SUCCEEDED':
                print("Hadoop job completed successfully.")
                break
            elif status == 'FAILED':
                print("Hadoop job failed.")
                break
            else:
                print("Job is still running...")
        else:
            print("Failed to retrieve job status.")
            break

        # Wait for 5 seconds before checking the job status again
        time.sleep(5)

    # Retrieve the final output
    if status == 'SUCCEEDED':
        # Modify the output URL as per your Hadoop configuration
        output_url = f'{resourcemanager_url}/proxy/{application_id}/ws/v1/mapreduce/jobs/{application_id}/jobattempts'
        response = requests.get(output_url)

        # Check the response status code
        if response.status_code == 200:
            data = response.json()
            print("Final output:")
            print(data)
        else:
            print("Failed to retrieve the final output.")

# Submit the Hadoop job
submit_hadoop_job()

# Monitor the job progress and retrieve the final output
if application_id:
    monitor_job_progress()


8. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.


In [None]:
import requests
import time

# YARN ResourceManager information
resourcemanager_url = 'http://resourcemanager:8088'
application_id = None

# Submit a Hadoop job with resource requirements
def submit_hadoop_job():
    headers = {'Content-Type': 'application/json'}

    # Specify the Hadoop job details
    job_payload = {
        "application": {
            "applicationName": "MyHadoopJob",
            "amResource": {
                "vCores": 1,
                "memory": 1024
            },
            "resource": {
                "vCores": 2,
                "memory": 2048
            },
            "priority": 0,
            "queue": "default",
            "unmanagedAM": False,
            "keepContainersAcrossApplicationAttempts": False,
            "maxAppAttempts": 2,
            "applicationType": "MAPREDUCE",
            "applicationTags": ""
        },
        "amContainerSpec": {
            "commands": {
                "command": "hadoop jar myjob.jar input.txt output"
            }
        },
        "applicationTimeouts": {
            "timeout": 0
        },
        "attemptFailuresValidityInterval": -1,
        "logAggregationContext": {
            "logAggregationEnabled": False,
            "logAggregationContext": {
                "logIncludePattern": "",
                "logExcludePattern": ""
            }
        }
    }

    # Send a POST request to submit the Hadoop job
    response = requests.post(f'{resourcemanager_url}/ws/v1/cluster/apps', json=job_payload, headers=headers)

    # Check the response status code
    if response.status_code == 202:
        data = response.json()
        global application_id
        application_id = data['application-id']
        print("Hadoop job submitted successfully. Application ID:", application_id)
    else:
        print("Failed to submit the Hadoop job.")

# Track resource usage during job execution
def track_resource_usage():
    headers = {'Content-Type': 'application/json'}

    while True:
        # Send a GET request to retrieve resource usage
        response = requests.get(f'{resourcemanager_url}/ws/v1/cluster/apps/{application_id}/appattempts', headers=headers)

        # Check the response status code
        if response.status_code == 200:
            data = response.json()
            attempts = data['appAttempts']
            if attempts:
                # Get the latest attempt ID
                latest_attempt_id = attempts[-1]['appAttemptId']

                # Send a GET request to retrieve the attempt details
                response = requests.get(f'{resourcemanager_url}/ws/v1/cluster/apps/{application_id}/appattempts/{latest_attempt_id}', headers=headers)
                if response.status_code == 200:
                    data = response.json()
                    resource_usage = data['appAttempt']['allocatedResources']
                    print("Resource usage:", resource_usage)
                else:
                    print("Failed to retrieve resource usage.")
                    break
            else:
                print("Job attempt not found.")
                break
        else:
            print("Failed to retrieve job attempts.")
            break

        # Wait for 5 seconds before checking resource usage again
        time.sleep(5)

# Submit the Hadoop job
submit_hadoop_job()

# Track resource usage during job execution
if application_id:
    track_resource_usage()


9. Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import time

# Define the MapReduce job
class MapReduceJob(MRJob):

    def configure_args(self):
        super(MapReduceJob, self).configure_args()
        self.add_passthru_arg('--split-size', type=int, help='Input split size in bytes')

    def mapper(self, _, line):
        yield line.split()[0], 1

    def reducer(self, key, values):
        yield key, sum(values)

    def steps(self):
        return [
            MRStep(mapper=self.mapper, reducer=self.reducer)
        ]

# Test the MapReduce job with different input split sizes
def compare_job_performance():
    input_data = [
        "apple orange",
        "banana apple",
        "orange apple",
        "banana orange",
        "apple banana",
        "orange banana"
    ]

    # Varying input split sizes to test
    split_sizes = [100, 200, 300]

    for split_size in split_sizes:
        # Create an instance of the MapReduce job
        mr_job = MapReduceJob(args=['--split-size', str(split_size)])

        # Start the timer
        start_time = time.time()

        # Run the MapReduce job
        with mr_job.make_runner() as runner:
            runner.run()

            # Collect and print the output
            output = [line.strip() for line in runner.cat_output()]
            print("Output for split size:", split_size)
            for line in output:
                print(line)

        # Calculate and print the execution time
        execution_time = time.time() - start_time
        print("Execution time for split size", split_size, ":", execution_time, "seconds")
        print()

# Compare the performance of the MapReduce job
compare_job_performance()
