## Question - 1

In [None]:
def read_hadoop_config(file_path):
    config = {}
    with open(file_path, 'r') as file:
        for line in file:
            line = line.strip()
            if line.startswith('#') or not line:
                continue
            key, value = line.split('=')
            config[key.strip()] = value.strip()
    return config

# Example usage:
config_file_path = '/path/to/hadoop/config.xml'
hadoop_config = read_hadoop_config(config_file_path)
print('Core components of Hadoop:')
print('NameNode:', hadoop_config['dfs.namenode.http-address'])
print('DataNodes:', hadoop_config['dfs.datanode.http-address'])


## Question - 2

In [None]:
import subprocess

def get_hdfs_directory_size(directory):
    try:
        output = subprocess.check_output(['hadoop', 'fs', '-du', '-s', '-h', directory])
        lines = output.decode().split('\n')
        total_size = lines[0].split()[0]
        return total_size
    except subprocess.CalledProcessError:
        return None

# Example usage:
hdfs_directory = '/user/hadoop/data'
total_size = get_hdfs_directory_size(hdfs_directory)
if total_size:
    print('Total size of', hdfs_directory, 'is', total_size)
else:
    print('Failed to retrieve the size of', hdfs_directory)


## Question - 3

In [None]:
from multiprocessing import Pool
from collections import Counter

def mapper(line):
    words = line.strip().split()
    return Counter(words)

def reducer(counters):
    return sum(counters, Counter())

def top_n_frequent_words(filename, n):
    with open(filename, 'r') as file:
        lines = file.readlines()

    with Pool() as pool:
        mapped_results = pool.map(mapper, lines)
        reduced_result = reducer(mapped_results)

    top_words = reduced_result.most_common(n)
    return top_words

# Example usage
filename = 'large_text_file.txt'
n = 10

top_words = top_n_frequent_words(filename, n)
for word, count in top_words:
    print(f'{word}: {count}')


## Question - 4

In [None]:
import requests

def check_hadoop_cluster_status():
    namenode_url = 'http://<namenode-host>:<port>/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo'
    datanode_url = 'http://<datanode-host>:<port>/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo'
    
    try:
        namenode_response = requests.get(namenode_url)
        namenode_data = namenode_response.json()
        namenode_status = namenode_data['beans'][0]['State']
        print('NameNode status:', namenode_status)

        datanode_response = requests.get(datanode_url)
        datanode_data = datanode_response.json()
        datanode_count = len(datanode_data['beans'])
        print('DataNode count:', datanode_count)
        
        return True
    except requests.exceptions.RequestException as e:
        print('Failed to connect to Hadoop cluster:', str(e))
        return False

# Example usage:
check_hadoop_cluster_status()


## Question - 5

In [None]:
import subprocess

def list_hdfs_path(path):
    try:
        output = subprocess.check_output(['hadoop', 'fs', '-ls', path])
        lines = output.decode().split('\n')
        files = [line.split()[-1] for line in lines[1:] if line]
        return files
    except subprocess.CalledProcessError:
        return None

# Example usage:
hdfs_path = '/user/hadoop/data'
result = list_hdfs_path(hdfs_path)
if result:
    print('Files and directories in', hdfs_path, ':')
    for file in result:
        print(file)
else:
    print('Failed to list the path', hdfs_path)


## Question - 6

In [None]:
import requests

def analyze_datanode_storage_utilization():
    datanode_url = 'http://<datanode-host>:<port>/jmx?qry=Hadoop:service=DataNode,name=FSDatasetState'
    
    try:
        datanode_response = requests.get(datanode_url)
        datanode_data = datanode_response.json()
        datanodes = datanode_data['beans'][0]['StorageInfo']
        
        datanode_usages = []
        for datanode in datanodes:
            capacity = datanode['capacity']
            dfsUsed = datanode['dfsUsed']
            datanode_usages.append((datanode['storageID'], dfsUsed / capacity))
        
        datanode_usages.sort(key=lambda x: x[1])
        print('Datanode with highest storage capacity utilization:')
        print('Storage ID:', datanode_usages[-1][0])
        print('Utilization:', datanode_usages[-1][1])

        print('Datanode with lowest storage capacity utilization:')
        print('Storage ID:', datanode_usages[0][0])
        print('Utilization:', datanode_usages[0][1])
        
        return True
    except requests.exceptions.RequestException as e:
        print('Failed to connect to Hadoop cluster:', str(e))
        return False

# Example usage:
analyze_datanode_storage_utilization()


## Question - 7

In [None]:
import requests
import time

def submit_hadoop_job(jar_path, input_path, output_path):
    submit_url = 'http://<resourcemanager-host>:<port>/ws/v1/cluster/apps/new-application'
    submit_response = requests.post(submit_url)
    if submit_response.status_code == 200:
        application_id = submit_response.json()['application-id']
        print('Submitted Hadoop job. Application ID:', application_id)
        
        # Submit job
        job_submit_url = f'http://<resourcemanager-host>:<port>/ws/v1/cluster/apps/{application_id}/appmaster/submit'
        job_submit_data = {
            'applicationId': application_id,
            'jarPath': jar_path,
            'inputPath': input_path,
            'outputPath': output_path
        }
        requests.post(job_submit_url, json=job_submit_data)
        
        # Monitor progress
        while True:
            time.sleep(5)
            job_info_url = f'http://<resourcemanager-host>:<port>/ws/v1/cluster/apps/{application_id}/appmaster/info'
            job_info_response = requests.get(job_info_url)
            job_info = job_info_response.json()
            if job_info['state'] == 'FINISHED':
                print('Job finished successfully.')
                break
            elif job_info['state'] == 'FAILED':
                print('Job failed.')
                break
            else:
                print('Job is still running. Progress:', job_info['progress'])
    else:
        print('Failed to submit the Hadoop job.')

# Example usage:
jar_path = '/path/to/hadoop-job.jar'
input_path = '/input/path'
output_path = '/output/path'
submit_hadoop_job(jar_path, input_path, output_path)


## Question - 8

In [None]:
import requests
import time

def submit_hadoop_job_with_resource_constraints(jar_path, input_path, output_path, memory, vcores):
    submit_url = 'http://<resourcemanager-host>:<port>/ws/v1/cluster/apps/new-application'
    submit_response = requests.post(submit_url)
    if submit_response.status_code == 200:
        application_id = submit_response.json()['application-id']
        print('Submitted Hadoop job. Application ID:', application_id)
        
        # Submit job with resource requirements
        job_submit_url = f'http://<resourcemanager-host>:<port>/ws/v1/cluster/apps/{application_id}/appmaster/submit'
        job_submit_data = {
            'applicationId': application_id,
            'jarPath': jar_path,
            'inputPath': input_path,
            'outputPath': output_path,
            'memory': memory,
            'vcores': vcores
        }
        requests.post(job_submit_url, json=job_submit_data)
        
        # Monitor resource usage
        while True:
            time.sleep(5)
            job_info_url = f'http://<resourcemanager-host>:<port>/ws/v1/cluster/apps/{application_id}/appmaster/info'
            job_info_response = requests.get(job_info_url)
            job_info = job_info_response.json()
            if job_info['state'] == 'FINISHED':
                print('Job finished successfully.')
                break
            elif job_info['state'] == 'FAILED':
                print('Job failed.')
                break
            else:
                resources_used = job_info['resourcesUsed']
                print('Resources used:', resources_used['memory'], 'MB', resources_used['vcores'], 'vcores')
    else:
        print('Failed to submit the Hadoop job.')

# Example usage:
jar_path = '/path/to/hadoop-job.jar'
input_path = '/input/path'
output_path = '/output/path'
memory = 4096  # 4GB
vcores = 2
submit_hadoop_job_with_resource_constraints(jar_path, input_path, output_path, memory, vcores)


## Question - 9

In [None]:
import subprocess
import time

def run_mapreduce_job(input_path, output_path, split_size):
    try:
        start_time = time.time()
        subprocess.check_output(['hadoop', 'jar', 'path/to/hadoop-streaming.jar',
                                 '-D', f'mapreduce.input.fileinputformat.split.maxsize={split_size}',
                                 '-input', input_path,
                                 '-output', output_path,
                                 '-mapper', 'cat',
                                 '-reducer', 'wc',
                                 '-numReduceTasks', '1'])
        end_time = time.time()
        execution_time = end_time - start_time
        return execution_time
    except subprocess.CalledProcessError:
        return None

# Example usage:
input_path = '/path/to/input_file.txt'
output_path = '/output/path'
split_sizes = [100000000, 500000000, 1000000000]  # Split sizes in bytes
for split_size in split_sizes:
    execution_time = run_mapreduce_job(input_path, output_path, split_size)
    if execution_time:
        print('Split size:', split_size, 'bytes')
        print('Execution time:', execution_time, 'seconds')
    else:
        print('Failed to execute the MapReduce job with split size', split_size)
