# 1. Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.


import configparser

def read_hadoop_config(file_path):
    # Create a ConfigParser object
    config = configparser.ConfigParser()

    # Read the configuration file
    config.read(file_path)

    # Get the sections in the configuration file
    sections = config.sections()

    # Display the core components
    if 'core-site' in sections:
        print("Core Components:")
        print("----------------")
        for key, value in config.items('core-site'):
            print(f"{key} = {value}")
        print("----------------")
    else:
        print("No core components found in the configuration file.")

    # You can add similar code for other components like 'hdfs-site', 'yarn-site', etc.

    # Specify the path to your Hadoop configuration file
file_path = '/path/to/hadoop/conf/core-site.xml'

    # Call the function to read and display the core components
read_hadoop_config(file_path)


# 2. Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.


from pywebhdfs.webhdfs import PyWebHdfsClient

def calculate_total_file_size(hdfs_host, hdfs_port, hdfs_user, hdfs_directory):
    # Create a PyWebHdfsClient object
    hdfs = PyWebHdfsClient(host=hdfs_host, port=hdfs_port, user_name=hdfs_user)

    # Get the file status for the HDFS directory
    directory_status = hdfs.list_status(hdfs_directory)

    total_size = 0

    # Iterate over the file status entries
    for entry in directory_status['FileStatuses']['FileStatus']:
        if entry['type'] == 'FILE':
            total_size += entry['length']

    return total_size

    # Specify the HDFS host, port, user, and directory path
hdfs_host = 'localhost'
hdfs_port = 50070
hdfs_user = 'hadoop'
hdfs_directory = '/user/hadoop/data'

    # Call the function to calculate the total file size
total_size = calculate_total_file_size(hdfs_host, hdfs_port, hdfs_user, hdfs_directory)

    # Display the total file size in bytes
print(f"Total file size in {hdfs_directory}: {total_size} bytes")


# 3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class TopNWords(MRJob):

    def configure_args(self):
        super(TopNWords, self).configure_args()
        self.add_passthru_arg('--topN', type=int, default=10, help='Number of top words to display')

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_top_words)
        ]

    def mapper_get_words(self, _, line):
        words = WORD_RE.findall(line)
        for word in words:
            yield word.lower(), 1

    def combiner_count_words(self, word, counts):
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)

    def reducer_find_top_words(self, _, word_count_pairs):
        topN = self.options.topN
        sorted_word_count_pairs = sorted(word_count_pairs, reverse=True)
        for i in range(topN):
            count, word = sorted_word_count_pairs[i]
            yield f'Top {i+1}', (word, count)

if __name__ == '__main__':
    TopNWords.run()


# 4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.


import requests
import json

def check_hadoop_health(namenode_host, namenode_port):
    # Check NameNode health status
    
    namenode_url = f"http://{namenode_host}:{namenode_port}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"
    try:
        response = requests.get(namenode_url)
        response.raise_for_status()
        namenode_status = json.loads(response.text)['beans'][0]['State']
        print("NameNode Health Status:", namenode_status)
    except requests.exceptions.RequestException as e:
        print("Error occurred while checking NameNode health status:", str(e))

    # Check DataNode health status
    
    datanode_url = f"http://{namenode_host}:{namenode_port}/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo"
    try:
        response = requests.get(datanode_url)
        response.raise_for_status()
        datanodes = json.loads(response.text)['beans']
        for datanode in datanodes:
            datanode_status = datanode['State']
            datanode_host = datanode['Host']
            print(f"DataNode Health Status ({datanode_host}):", datanode_status)
    except requests.exceptions.RequestException as e:
        print("Error occurred while checking DataNode health status:", str(e))

    # Specify the NameNode host and port
namenode_host = 'localhost'
namenode_port = 9870

    # Call the function to check Hadoop health status
check_hadoop_health(namenode_host, namenode_port)


# 5. Develop a Python program that lists all the files and directories in a specific HDFS path.

from pywebhdfs.webhdfs import PyWebHdfsClient

def list_hdfs_path(hdfs_host, hdfs_port, hdfs_user, hdfs_path):
    # Create a PyWebHdfsClient object
    hdfs = PyWebHdfsClient(host=hdfs_host, port=hdfs_port, user_name=hdfs_user)

    # List files and directories in the specified HDFS path
    listing = hdfs.list_dir(hdfs_path)['FileStatuses']['FileStatus']

    # Print the listing
    for entry in listing:
        name = entry['pathSuffix']
        is_directory = entry['type'] == 'DIRECTORY'
        print(f"{name} {'(directory)' if is_directory else ''}")

    # Specify the HDFS host, port, user, and path
hdfs_host = 'localhost'
hdfs_port = 50070
hdfs_user = 'hadoop'
hdfs_path = '/user/hadoop/data'

    # Call the function to list files and directories
list_hdfs_path(hdfs_host, hdfs_port, hdfs_user, hdfs_path)


# 6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.

import requests
import json

def analyze_storage_utilization(namenode_host, namenode_port):
    # Get the DataNodes information
    datanode_url = f"http://{namenode_host}:{namenode_port}/jmx?qry=Hadoop:service=NameNode,name=DataNodeInfo"
    try:
        response = requests.get(datanode_url)
        response.raise_for_status()
        datanodes = json.loads(response.text)['beans']
        
        # Sort DataNodes by storage capacity in descending order
        sorted_datanodes = sorted(datanodes, key=lambda d: d['Capacity'], reverse=True)

        # Get the DataNode with the highest storage capacity
        highest_datanode = sorted_datanodes[0]
        highest_datanode_host = highest_datanode['Host']
        highest_datanode_capacity = highest_datanode['Capacity']

        # Get the DataNode with the lowest storage capacity
        lowest_datanode = sorted_datanodes[-1]
        lowest_datanode_host = lowest_datanode['Host']
        lowest_datanode_capacity = lowest_datanode['Capacity']

        print("DataNode Storage Utilization Analysis:")
        print("--------------------------------------")
        print(f"Highest Capacity: {highest_datanode_host} - {highest_datanode_capacity} bytes")
        print(f"Lowest Capacity: {lowest_datanode_host} - {lowest_datanode_capacity} bytes")
    except requests.exceptions.RequestException as e:
        print("Error occurred while analyzing storage utilization:", str(e))

    # Specify the NameNode host and port
namenode_host = 'localhost'
namenode_port = 9870

    # Call the function to analyze storage utilization
analyze_storage_utilization(namenode_host, namenode_port)


# 7. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.

import requests
import json
import time

def submit_hadoop_job(resourcemanager_host, resourcemanager_port, job_properties):

    # Submit the Hadoop job
    
    submit_url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps/new-application"
    try:
        response = requests.post(submit_url)
        response.raise_for_status()
        application_id = json.loads(response.text)['application-id']
        print("Submitted Hadoop job. Application ID:", application_id)
        
    except requests.exceptions.RequestException as e:
        print("Error occurred while submitting Hadoop job:", str(e))
        return None

    # Set the job properties
    
    job_properties['application-id'] = application_id

    # Submit the job properties to start the job
    
    submit_job_url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps"
    
    try:
        response = requests.post(submit_job_url, json=job_properties)
        response.raise_for_status()
        print("Hadoop job submitted successfully.")
    except requests.exceptions.RequestException as e:
        print("Error occurred while submitting Hadoop job properties:", str(e))
        return None

    return application_id

def monitor_job_progress(resourcemanager_host, resourcemanager_port, application_id):
    # Monitor the job progress
    
    status_url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps/{application_id}"
    try:
        while True:
            response = requests.get(status_url)
            response.raise_for_status()
            status = json.loads(response.text)['app']['state']
            print("Job status:", status)
            
            if status == "FINISHED" or status == "FAILED" or status == "KILLED":
                break

            time.sleep(5)  # Wait for 5 seconds before checking the status again
    except requests.exceptions.RequestException as e:
        print("Error occurred while monitoring job progress:", str(e))

def retrieve_job_output(resourcemanager_host, resourcemanager_port, application_id):
    # Retrieve the job output
    
    output_url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps/{application_id}/state"
    try:
        response = requests.get(output_url)
        response.raise_for_status()
        output = json.loads(response.text)['app']['finalStatus']
        print("Job output:", output)
    except requests.exceptions.RequestException as e:
        print("Error occurred while retrieving job output:", str(e))

    # Specify the ResourceManager host and port
resourcemanager_host = 'localhost'
resourcemanager_port = 8088

    # Specify the job properties
job_properties = {
    "application-name": "MyHadoopJob",
    "application-type": "MAPREDUCE",
    "am-container-spec": {
        "commands": {
            "command": "hadoop jar /path/to/hadoop-job.jar input output"
        }
    }
}

    #   Submit the Hadoop job and get the application ID
application_id = submit_hadoop_job(resourcemanager_host, resourcemanager_port, job_properties)

    # Monitor the job progress
if application_id:
    monitor_job_progress(resourcemanager_host, resourcemanager_port, application_id)

    # Retrieve the job output
    retrieve_job_output(resourcemanager_host, resourcemanager_port, application_id)


# 8. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.


import requests
import json
import time

def submit_hadoop_job(resourcemanager_host, resourcemanager_port, job_properties):

    # Submit the Hadoop job
    
    submit_url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps/new-application"
    try:
        response = requests.post(submit_url)
        response.raise_for_status()
        application_id = json.loads(response.text)['application-id']
        print("Submitted Hadoop job. Application ID:", application_id)
    except requests.exceptions.RequestException as e:
        print("Error occurred while submitting Hadoop job:", str(e))
        return None

    # Set the job properties
    job_properties['application-id'] = application_id

    # Submit the job properties to start the job
    submit_job_url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps"
    try:
        response = requests.post(submit_job_url, json=job_properties)
        response.raise_for_status()
        print("Hadoop job submitted successfully.")
    except requests.exceptions.RequestException as e:
        print("Error occurred while submitting Hadoop job properties:", str(e))
        return None

    return application_id

def set_resource_requirements(resourcemanager_host, resourcemanager_port, application_id, vcores, memory):

    # Set the resource requirements for the Hadoop job
    
    resource_url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps/{application_id}"
    try:
        response = requests.get(resource_url)
        response.raise_for_status()
        app = json.loads(response.text)['app']
        app['am-container-spec']['resource']['vCores'] = vcores
        app['am-container-spec']['resource']['memoryMB'] = memory

        response = requests.put(resource_url, json=app)
        response.raise_for_status()

        print("Resource requirements set successfully.")
    except requests.exceptions.RequestException as e:
        print("Error occurred while setting resource requirements:", str(e))

def track_resource_usage(resourcemanager_host, resourcemanager_port, application_id):

    # Track the resource usage during job execution
    
    track_url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps/{application_id}/appattempts"
    try:
        while True:
            response = requests.get(track_url)
            response.raise_for_status()
            app_attempts = json.loads(response.text)['appAttempts']

            if len(app_attempts) == 0:
                print("No app attempts found.")
                break

            latest_attempt_id = app_attempts[-1]['appAttemptId']

            attempt_url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps/{application_id}/appattempts/{latest_attempt_id}"
            response = requests.get(attempt_url)
            response.raise_for_status()
            attempt = json.loads(response.text)['appAttempt']

            if 'trackingUrl' in attempt:
                tracking_url = attempt['trackingUrl']
                print("Tracking URL:", tracking_url)
            
            if attempt['appAttemptState'] in ['FINISHED', 'FAILED', 'KILLED']:
                break

            time.sleep(5)  # Wait for 5 seconds before checking the status again
    except requests.exceptions.RequestException as e:
        print("Error occurred while tracking resource usage:", str(e))

    # Specify the ResourceManager host and port
    
resourcemanager_host = 'localhost'
resourcemanager_port = 8088

    # Specify the job properties
    
job_properties = {
    "application-name": "MyHadoopJob",
    "application-type": "MAPREDUCE",
    "am-container-spec": {
        "commands": {
            "command": "hadoop jar /path/to/hadoop-job.jar input output"
        },
        "resource": {
            "vCores": 2,  # Specify the desired number of vCores
            "memoryMB": 4096  # Specify the desired amount of memory in MB
        }
    }
}

    # Submit the Hadoop job and get the application ID
    
application_id = submit_hadoop_job(resourcemanager_host, resourcemanager_port, job_properties)

    # Set the resource requirements
    
if application_id:

    vcores = job_properties['am-container-spec']['resource']['vCores']
    memory = job_properties['am-container-spec']['resource']['memoryMB']
    set_resource_requirements(resourcemanager_host, resourcemanager_port, application_id, vcores, memory)

    # Track the resource usage
    track_resource_usage(resourcemanager_host, resourcemanager_port, application_id)


# 9. Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.


from mrjob.job import MRJob
from mrjob.step import MRStep
import time

class MapReduceJob(MRJob):

    def configure_args(self):
        super(MapReduceJob, self).configure_args()
        self.add_passthru_arg('--split-size', type=int, default=100, help='Input split size in MB')

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer)
        ]

    def mapper(self, _, line):
        # Your mapper logic here
        
        yield None, line

    def reducer(self, key, values):
        # Your reducer logic here
        
        for value in values:
            yield key, value

if __name__ == '__main__':

    start_time = time.time()

    # Specify the input file path
    
    input_file = '/path/to/input_file.txt'

    # Specify the input split size (in MB)
    
    split_size = 100

    # Calculate the number of bytes per split
    
    bytes_per_split = split_size * 1024 * 1024

    # Get the file size
    
    file_size = os.path.getsize(input_file)

    # Calculate the number of splits
    
    num_splits = file_size // bytes_per_split

    # Create an instance of the MapReduce job
    
    mr_job = MapReduceJob(args=[input_file])

    # Run the MapReduce job for each split
    
    for i in range(num_splits + 1):
        start_byte = i * bytes_per_split
        end_byte = start_byte + bytes_per_split

        # Set the input protocol property to control the split size
        mr_job.options.input_protocol = 'raw_value:{},{}'.format(start_byte, end_byte)

        # Run the MapReduce job
        with mr_job.make_runner() as runner:
            runner.run()

    end_time = time.time()

    execution_time = end_time - start_time
    print("Execution Time:", execution_time, "seconds")
