In [None]:
Question 1. Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.
Solution:- # Python program to read a Hadoop configuration file
    
    Code
    
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
    Read and get your Hadoop configuration as Python objects.
    For example: core-site.xml, yarn-site.xml, hdfs-site.xml
"""

from .core import get_hadoop_conf

__version__ = "1.0"

# Python program to  display the core components of Hadoop

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
    Implementation

    Usage:

    >>> from hadoopconf import get_hadoop_conf
    >>> get_hadoop_conf()
"""


#  String list of what we can import from the lib
import os
import re
import xml.etree.ElementTree as Et

__all__ = ['get_hadoop_conf']


class NoHadoopConfDir(Exception):
    pass


def get_conf_dir(dirname=None):
    """
    Return the Hadoop configuration directory absolute path. Raise a NoHadoopConfDir exception if
    no environment variable defined it (HADOOP_CONF_DIR or HADOOP_HOME) or if dirname is not a valid directory path
    :param dirname: Optional absolute path of a directory that should contains *-site.xml files
    :return:
    """
    if dirname:
        if os.path.exists(dirname) and os.path.isdir(dirname):
            return dirname
        else:
            NoHadoopConfDir()
    if 'HADOOP_CONF_DIR' in os.environ:
        return os.environ['HADOOP_CONF_DIR']
    if 'HADOOP_HOME' in os.environ:
        dirname = os.path.join(os.environ['HADOOP_HOME'], 'etc', 'hadoop')
        if os.path.exists(dirname) and os.path.isdir(dirname):
            return dirname
        else:
            NoHadoopConfDir()
    else:
        raise NoHadoopConfDir()


def get_conf_files(dirname):
    """
    Generator of all *-site.xml files in a directory
    :param dirname: absolute path of a directory that should contains *-site.xml files
    :return:
    """
    for file in os.listdir(dirname):
        if re.match(r'.+-site\.xml', file):
            yield os.path.join(dirname, file)


def parse_file(file):
    """
    Parse an xml Hadoop property file
    :param file: absolute path
    :return: a dictionary (propertyName -> propertyValueInString)
    """
    tree = Et.parse(file)
    root = tree.getroot()
    result = dict()
    for child in root.findall('property'):  # Find all properties names an values
        result[child.find('name').text] = child.find('value').text
    return result


def get_hadoop_conf(dirname=None):
    """
    Return a dict of all properties find in all *-site.xml files on your Hadoop configuration directory.
    It will search in your env variable HADOOP_CONF_DIR next in HADOOP_HOME/etc/hadoop or in dirname if you specify it
    :param dirname: optional directory absolute path that contains *-site.xml file
    :return: a dictionary (propertyName -> propertyValueInString)
    """
    dirname = get_conf_dir(dirname)
    files = get_conf_files(dirname)
    all_conf = dict()
    for file in files:
        all_conf.update(parse_file(file))  # Merge dictionnaries
    return all_conf


if __name__ == '__main__':
    conf = get_hadoop_conf()
    print(conf['yarn.resourcemanager.webapp.address'])


In [None]:
Question 2. Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.

Solution:- import subprocess
import sys, getopt
import re

# Dictionnary to store the total size by ip node
size_by_ip_dict = {}

# Print size in a human readable way
def human_readable_size(size):
    for unit in ['','K','M','G','T','P','E','Z']:
        if abs(size) < 1024.0:
            return "%3.1f%s%s" % (size, unit, ' B')
        size /= 1024.0
    return "%.1f%s%s" % (size, 'Y', ' B') #Should never happen

# Sum the given block_size to the ip entry in the size_by_ip_dict
def update_size_by_ip_dict(ip, block_size):
    if ip not in size_by_ip_dict.keys(): 
        size_by_ip_dict[ip] = block_size
    else:
        size_by_ip_dict[ip] += block_size

def main(argv):
    path = sys.argv[1]

    # Call the hdfs fsck command
    out = subprocess.Popen(['hdfs', 'fsck', path, '-files' ,'-blocks', '-locations'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout,stderr = out.communicate()
    lines = stdout.decode("utf-8").split("\n")
    # Filter the lines corresponding to a HDFS block information
    hdfs_block_lines = [line for line in lines if "DatanodeInfoWithStorage" in line]

    # Regex to get the length and replication factor of a HDFS block
    regex_len_rep = r'.* len=(\d*) repl=(\d*).*'

    for hdfs_block_line in hdfs_block_lines:
        match_object_len_rep = re.match(regex_len_rep, hdfs_block_line)
        block_size = int(match_object_len_rep.group(1))
        replication_factor = int(match_object_len_rep.group(2))
        # Regex to get all the ips on which a HDFS block is present
        regex_ips = r'.*' + '.*DatanodeInfoWithStorage\[([\w.]*)' * replication_factor
        match_object_ips = re.match(regex_ips, hdfs_block_line)
        for i in range(0, replication_factor): 
            ip = match_object_ips.group(i + 1)
            update_size_by_ip_dict(ip, block_size)

    total_size = 0
    # Calculate the total size
    for ip, size in size_by_ip_dict.items():
        total_size += size

    print("Total size = ", human_readable_size(total_size))
    print("Data by node")
    sorted_size_by_ip_list = sorted(size_by_ip_dict.items(), key=lambda x: x[1])
    # Print the size by node sorted by size
    for ip, size in sorted_size_by_ip_list:
        print(ip, ":", human_readable_size(size), "(", "%.2f" % (size / total_size * 100), "%)")

if __name__ == "__main__":
   main(sys.argv[1:])

In [None]:
Question 3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.

Solution:- mapper.py
    
    #!/usr/bin/python
  
# import sys because we need to read and write data to STDIN and STDOUT
import sys
  
# reading entire line from STDIN (standard input)
for line in sys.stdin:
    # to remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
      
    # we are looping over the words array and printing the word
    # with the count of 1 to the STDOUT
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        print('%s\t%s' % (word, 1))
        
        
reducer.py

#!/usr/bin/python
  
from operator import itemgetter
import sys
  
current_word = None
current_count = 0
word = None
  
# read the entire line from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # slpiting the data on the basis of tab we have provided in mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
  
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word
  
# do not forget to output the last word if needed!
if current_word == word:
    print('%s\t%s' % (current_word, current_count))
    

demo.txt

hello hi bye ok
ok bye nice hello
tata bye bye

hadoop-streaming-2.4.0.jar->needed

hdfs_commands.txt

# In hdfs file system / is the root

# Command to check the files inside root hdfs directory
hadoop fs -ls /

# Command to create directory in hdfs
hadoop fs -mkdir /input_data


# Copy data from local file system to Hdfs
hadoop fs -put test_demo/trees.csv /input_data

# Copy from HDFS path to local file system
hadoop fs -copyToLocal /input_data/trees.csv ./

# Command to execute map reduce code
hadoop jar hadoop-streaming-2.4.0.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input /test/demo.txt -output /output


    

In [None]:
Question 4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.

Solution:- # Python script that checks the health status of the NameNode and DataNodes
    #! /usr/bin/python -v

import os
import subprocess

f = os.popen("hdfs haadmin -getServiceState nn2")
now = f.read()
status = "active"
if now == status:
        print "success"
else:
        print 'error'
        




In [None]:
Question 5. Develop a Python program that lists all the files and directories in a specific HDFS path.

Solution:-  hdfs.py
    
    """
API for interacting with the file system on Hops (HopsFS).

It is a wrapper around pydoop together with utility functions that are Hops-specific.
"""
import socket
from six import string_types
import shutil
import fnmatch
import os
import errno

from hops import constants
from hops.service_discovery import ServiceDiscovery
import sys
import subprocess

import ntpath
import importlib

import logging
log = logging.getLogger(__name__)

# Compatibility with SageMaker
pydoop_available = True
try:
    import pydoop.hdfs as hdfs
    import pydoop.hdfs.path as path
    import pydoop.hdfs.fs as hdfs_fs
except:
    pydoop_available = False

import re
from xml.dom import minidom

tls_enabled = None
webhdfs_address = None

if pydoop_available: 
    # Replace Pydoop split method to be able to support hopsfs:// schemes
    class _HopsFSPathSplitter(hdfs.path._HdfsPathSplitter):
        @classmethod
        def split(cls, hdfs_path, user):
            if not hdfs_path:
                cls.raise_bad_path(hdfs_path, "empty")
            scheme, netloc, path = cls.parse(hdfs_path)
            if not scheme:
                scheme = "file" if hdfs_fs.default_is_local() else "hdfs"
            if scheme == "hdfs" or scheme == "hopsfs":
                if not path:
                    cls.raise_bad_path(hdfs_path, "path part is empty")
                if ":" in path:
                    cls.raise_bad_path(
                        hdfs_path, "':' not allowed outside netloc part"
                    )
                hostname, port = cls.split_netloc(netloc)
                if not path.startswith("/"):
                    path = "/user/%s/%s" % (user, path)
            elif scheme == "file":
                hostname, port, path = "", 0, netloc + path
            else:
                cls.raise_bad_path(hdfs_path, "unsupported scheme %r" % scheme)
            return hostname, port, path

    hdfs.path._HdfsPathSplitter = _HopsFSPathSplitter

def get_plain_path(abs_path):
    """
    Convert absolute HDFS/HOPSFS path to plain path (dropping prefix and ip)

    Example use-case:

    >>> hdfs.get_plain_path("hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/")
    >>> # returns: "/Projects/demo_deep_learning_admin000/Models/"

     Args:
         :abs_path: the absolute HDFS/hopsfs path containing prefix and/or ip

    Returns:
          the plain path without prefix and ip
    """
    return path.split(path.abspath(abs_path))[2]

def project_id():
    """
    Get the Hopsworks project id from environment variables

    Returns: the Hopsworks project id

    """
    return os.environ[constants.ENV_VARIABLES.HOPSWORKS_PROJECT_ID_ENV_VAR]

def project_user():
    """
    Gets the project username ("project__user") from environment variables

    Returns:
        the project username
    """

    try:
        hops_user = os.environ[constants.ENV_VARIABLES.HADOOP_USER_NAME_ENV_VAR]
    except:
        hops_user = os.environ[constants.ENV_VARIABLES.HDFS_USER_ENV_VAR]
    return hops_user

def project_name():
    """
    Extracts the project name from the project username ("project__user") or from the environment if available

    Returns:
        project name
    """
    try:
        return os.environ[constants.ENV_VARIABLES.HOPSWORKS_PROJECT_NAME_ENV_VAR]
    except:
        pass

    hops_user = project_user()
    hops_user_split = hops_user.split("__")  # project users have username project__user
    project = hops_user_split[0]
    return project

def project_path(project=None, exclude_nn_addr=False):
    """ Get the path in HopsFS where the HopsWorks project is located. To point to a particular dataset, this path should be
    appended with the name of your dataset.

    >>> from hops import hdfs
    >>> project_path = hdfs.project_path()
    >>> print("Project path: {}".format(project_path))

    Args:
        :project: If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

    Returns:
        returns the project absolute path
    """

    if project is None:
        project = project_name()

    # abspath means "hdfs://namenode:port/ is preprended
    abspath = hdfs.path.abspath("/Projects/" + project + "/")
    if exclude_nn_addr:
        abspath = re.sub(r"\d+.\d+.\d+.\d+:\d+", "", abspath)
    return abspath


def get():
    """ Get a handle to pydoop hdfs using the default namenode (specified in hadoop config)

    Returns:
        Pydoop hdfs handle
    """
    return hdfs.hdfs('default', 0, user=project_user())


def get_fs():
    """ Get a handle to pydoop fs using the default namenode (specified in hadoop config)

    Returns:
        Pydoop fs handle
    """
    return hdfs.fs.hdfs('default', 0, user=project_user())


def _expand_path(hdfs_path, project="", exists=True):
    """
    Expands a given path. If the path is /Projects.. hdfs:// is prepended.
    If the path is ../ the full project path is prepended.

    Args:
        :hdfs_path the path to be expanded
        :exists boolean flag, if this is true an exception is thrown if the expanded path does not exist.

    Raises:
        IOError if exists flag is true and the path does not exist

    Returns:
        path expanded with HDFS and project
    """
    if not isinstance(hdfs_path, string_types):
        hdfs_path = hdfs_path.decode()
    if project == "":
        project = project_name()
    # Check if a full path is supplied. If not, assume it is a relative path for this project - then build its full path and return it.
    if hdfs_path.startswith("/Projects/") or hdfs_path.startswith("/Projects"):
        hdfs_path = "hdfs://" + hdfs_path
    elif not (hdfs_path.startswith("hdfs://") or hdfs_path.startswith("hopsfs://")):
        # if the file URL type is not HDFS, throw an error
        if "://" in hdfs_path:
            raise IOError("path %s must be a full hopsfs path or a relative path" % hdfs_path)
        proj_path = project_path(project)
        hdfs_path = proj_path + hdfs_path
    if exists == True and not hdfs.path.exists(hdfs_path):
        raise IOError("path %s not found" % hdfs_path)
    return hdfs_path

def copy_to_hdfs(local_path, relative_hdfs_path, overwrite=False, project=None):
    """
    Copies a path from local filesystem to HDFS project (recursively) using relative path in $CWD to a path in hdfs (hdfs_path)

    For example, if you execute:

    >>> copy_to_hdfs("data.tfrecords", "/Resources", project="demo")

    This will copy the file data.tfrecords to hdfs://Projects/demo/Resources/data.tfrecords

    Args:
        :local_path: Absolute or local path on the local filesystem to copy
        :relative_hdfs_path: a path in HDFS relative to the project root to where the local path should be written
        :overwrite: a boolean flag whether to overwrite if the path already exists in HDFS
        :project: name of the project, defaults to the current HDFS user's project
    """
    if project == None:
        project = project_name()

    # Absolute path
    if os.path.isabs(local_path):
        full_local = local_path
    else:
        full_local = os.getcwd() + '/' + local_path

    hdfs_path = _expand_path(relative_hdfs_path, project, exists=False)

    if overwrite:
        hdfs_path = hdfs_path + "/" + os.path.basename(full_local)
        if exists(hdfs_path):
            # delete hdfs path since overwrite flag was set to true
            delete(hdfs_path, recursive=True)

    log.debug("Started copying local path {} to hdfs path {}\n".format(local_path, hdfs_path))

    # copy directories from local path to HDFS project path
    hdfs.put(full_local, hdfs_path)

    log.debug("Finished copying\n")


def delete(hdfs_path, recursive=False):
    """
    Deletes path, path can be absolute or relative.
    If recursive is set to True and path is a directory, then files will be deleted recursively.

    For example

    >>> delete("/Resources/", recursive=True)

    will delete all files recursively in the folder "Resources" inside the current project.

    Args:
        :hdfs_path: the path to delete (project-relative or absolute)

    Returns:
        None

    Raises:
        IOError when recursive is False and directory is non-empty
    """
    hdfs_path = _expand_path(hdfs_path)
    hdfs_handle = get()
    if hdfs_handle.exists(hdfs_path):
        hdfs_handle.delete(hdfs_path, recursive=recursive)


def copy_to_local(hdfs_path, local_path="", overwrite=False, project=None):
    """
    Copies a directory or file from a HDFS project to a local private scratch directory. If there is not enough space on the local scratch directory, an exception is thrown.
    If the local file exists, and the hdfs file and the local file are the same size in bytes, return 'ok' immediately.
    If the local directory tree exists, and the hdfs subdirectory and the local subdirectory have the same files and directories, return 'ok' immediately.

    For example, if you execute:

    >>> copy_to_local("Resources/my_data")

    This will copy the directory my_data from the Resources dataset in your project to the current working directory on the path ./my_data

    Raises:
      IOError if there is not enough space to localize the file/directory in HDFS to the scratch directory ($PDIR)

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :local_path: the relative or full path to a directory on the local filesystem to copy to (relative to a scratch directory $PDIR), defaults to $CWD
        :overwrite: a boolean flag whether to overwrite if the path already exists in the local scratch directory.
        :project: name of the project, defaults to the current HDFS user's project

    Returns:
        the full local pathname of the file/dir
    """

    if project == None:
        project = project_name()

    if os.path.isabs(local_path):
        local_dir = local_path
    else:
        local_dir = os.getcwd() + '/' + local_path

    if not os.path.isdir(local_dir):
        raise IOError("You need to supply the path to a local directory. This is not a local dir: %s" % local_dir)

    filename = path.basename(hdfs_path)
    full_local = local_dir + "/" + filename

    project_hdfs_path = _expand_path(hdfs_path, project=project)

    # Get the amount of free space on the local drive
    stat = os.statvfs(local_dir)
    free_space_bytes = stat.f_bsize * stat.f_bavail

    hdfs_size = path.getsize(project_hdfs_path)

    if os.path.isfile(full_local) and not overwrite:
        sz = os.path.getsize(full_local)
        if hdfs_size == sz:
            log.info("File " + project_hdfs_path + " is already localized, skipping download...")
            return full_local
        else:
            os.remove(full_local)

    if os.path.isdir(full_local) and not overwrite:
        try:
            localized = _is_same_directory(full_local, project_hdfs_path)
            if localized:
                log.info("Full directory subtree already on local disk and unchanged. Set overwrite=True to force download")
                return full_local
            else:
                shutil.rmtree(full_local)
        except Exception as e:
            log.error("Failed while checking directory structure to avoid re-downloading dataset, falling back to downloading")
            log.error(e)
            shutil.rmtree(full_local)

    if hdfs_size > free_space_bytes:
        raise IOError("Not enough local free space available on scratch directory: %s" % local_path)

    if overwrite:
        if os.path.isdir(full_local):
            shutil.rmtree(full_local)
        elif os.path.isfile(full_local):
            os.remove(full_local)

    log.debug("Started copying " + project_hdfs_path + " to local disk on path " + local_dir + "\n")

    hdfs.get(project_hdfs_path, local_dir)

    log.debug("Finished copying\n")

    return full_local


def _is_same_directory(local_path, hdfs_path):
    """
    Validates that the same occurrence and names of files exists in both hdfs and local
    """
    local_file_list = []
    for root, dirnames, filenames in os.walk(local_path):
        for filename in fnmatch.filter(filenames, '*'):
            local_file_list.append(filename)
        for dirname in fnmatch.filter(dirnames, '*'):
            local_file_list.append(dirname)
    local_file_list.sort()

    hdfs_file_list = glob(hdfs_path + '/*', recursive=True)
    hdfs_file_list = [path.basename(str(r)) for r in hdfs_file_list]
    hdfs_file_list.sort()

    if local_file_list == hdfs_file_list:
        return True
    else:
        return False

def cp(src_hdfs_path, dest_hdfs_path, overwrite=False):
    """
    Copy the contents of src_hdfs_path to dest_hdfs_path.

    If src_hdfs_path is a directory, its contents will be copied recursively.
    Source file(s) are opened for reading and copies are opened for writing.

    Args:
        :src_hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :dest_hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :overwrite: boolean flag whether to overwrite destination path or not.

    """
    src_hdfs_path = _expand_path(src_hdfs_path)
    dest_hdfs_path = _expand_path(dest_hdfs_path, exists=False)

    if overwrite and exists(dest_hdfs_path):
        # delete path since overwrite flag was set to true
        delete(dest_hdfs_path, recursive=True)

    hdfs.cp(src_hdfs_path, dest_hdfs_path)

def glob(hdfs_path, recursive=False, project=None):
    """
    Finds all the pathnames matching a specified pattern according to the rules used by the Unix shell, although results are returned in arbitrary order.

    Globbing gives you the list of files in a dir that matches a supplied pattern

    >>> glob('Resources/*.json')
    >>> ['Resources/1.json', 'Resources/2.json']

    glob is implemented as  os.listdir() and fnmatch.fnmatch()
    We implement glob as hdfs.ls() and fnmatch.filter()

    Args:
     :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS.
     :project: If the supplied hdfs_path is a relative path, it will look for that file in this project's subdir in HDFS.

    Raises:
        IOError if the supplied hdfs path does not exist

    Returns:
      A possibly-empty list of path names that match pathname, which must be a string containing a path specification. pathname can be either absolute
    """

    # Get the full path to the dir for the input glob pattern
    # "hdfs://Projects/jim/blah/*.jpg" => "hdfs://Projects/jim/blah"
    # Then, ls on 'hdfs://Projects/jim/blah', then filter out results
    if project == None:
        project = project_name()
    lastSep = hdfs_path.rfind("/")
    inputDir = hdfs_path[:lastSep]
    inputDir = _expand_path(inputDir, project)
    pattern = hdfs_path[lastSep + 1:]
    if not hdfs.path.exists(inputDir):
        raise IOError("Glob path %s not found" % inputDir)
    dirContents = hdfs.ls(inputDir, recursive=recursive)
    return fnmatch.filter(dirContents, pattern)


def ls(hdfs_path, recursive=False, project=None):
    """
    Returns all the pathnames in the supplied directory.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
        :recursive: if it is a directory and recursive is True, the list contains one item for every file or directory in the tree rooted at hdfs_path.
        :project: If the supplied hdfs_path is a relative path, it will look for that file in this project's subdir in HDFS.

    Returns:
      A possibly-empty list of path names stored in the supplied path.
    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)
    return hdfs.ls(hdfs_path, recursive=recursive)


def lsl(hdfs_path, recursive=False, project=None):
    """
    Returns all the pathnames in the supplied directory.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
        :recursive: if it is a directory and recursive is True, the list contains one item for every file or directory in the tree rooted at hdfs_path.
        :project: If the supplied hdfs_path is a relative path, it will look for that file in this project's subdir in HDFS.

    Returns:
        A possibly-empty list of path names stored in the supplied path.
    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)
    return hdfs.lsl(hdfs_path, recursive=recursive)


def rmr(hdfs_path, project=None):
    """
    Recursively remove files and directories.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
        :project: If the supplied hdfs_path is a relative path, it will look for that file in this project's subdir in HDFS.

    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)
    return hdfs.rmr(hdfs_path)


def mkdir(hdfs_path, project=None):
    """
    Create a directory and its parents as needed.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
        :project: If the supplied hdfs_path is a relative path, it will look for that file in this project's subdir in HDFS.

    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project, exists=False)
    return hdfs.mkdir(hdfs_path)


def move(src, dest):
    """
    Move or rename src to dest.

    Args:
        :src: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :dest: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).

    """
    src = _expand_path(src, project_name())
    dest = _expand_path(dest, project_name(), exists=False)
    return hdfs.move(src, dest)


def rename(src, dest):
    """
    Rename src to dest.

    Args:
        :src: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :dest: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
    """
    src = _expand_path(src, project_name())
    dest = _expand_path(dest, project_name(), exists=False)
    return hdfs.rename(src, dest)


def chown(hdfs_path, user, group, project=None):
    """
    Change file owner and group.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to the given project path in HDFS).
        :user: New hdfs username
        :group: New hdfs group
        :project: If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)
    return hdfs.chown(hdfs_path, user, group)


def chmod(hdfs_path, mode, project=None):
    """
    Change file mode bits.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :mode: File mode (user/group/world privilege) bits
        :project: If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)
    return hdfs.chmod(hdfs_path, mode)


def stat(hdfs_path, project=None):
    """
    Performs the equivalent of os.stat() on path, returning a StatResult object.

    Args:
        :hdfs_path: If this value is not specified, it will get the path to your project. You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :project: If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

    Returns:
        StatResult object
    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)
    return hdfs.stat(hdfs_path)


def access(hdfs_path, mode, project=None):
    """
    Perform the equivalent of os.access() on path.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :mode: File mode (user/group/world privilege) bits
        :project: If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

    Returns:
        True if access is allowed, False if not.
    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)
    return hdfs.access(hdfs_path, mode)


def _mkdir_p(path):
    """
    Creates path on local filesystem

    Args:
        path to create

    Raises:
        OSError
    """
    try:
        os.makedirs(path)
    except OSError as exc:
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else:
            raise


def open_file(hdfs_path, project=None, flags='rw', buff_size=0):
    """
    Opens an HDFS file for read/write/append and returns a file descriptor object (fd) that should be closed when no longer needed.

    Args:
        hdfs_path: you can specify either a full hdfs pathname or a relative one (relative to your project's path in HDFS)
        flags: supported opening modes are 'r', 'w', 'a'. In addition, a trailing 't' can be added to specify text mode (e.g, 'rt' = open for reading text)
        buff_size: Pass 0 as buff_size if you want to use the "configured" values, i.e the ones set in the Hadoop configuration files.

    Returns:
        A file descriptor (fd) that needs to be closed (fd-close()) when it is no longer needed.

    Raises:
        IOError: If the file does not exist.
    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project, exists=False)
    fs_handle = get_fs()
    fd = fs_handle.open_file(hdfs_path, flags, buff_size=buff_size)
    return fd


def close():
    """
    Closes an the HDFS connection (disconnects to the namenode)
    """
    hdfs.close()


def exists(hdfs_path, project=None):
    """
    Return True if hdfs_path exists in the default HDFS.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :project: If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.


    Returns:
        True if hdfs_path exists.

    Raises: IOError
    """
    if project == None:
        project = project_name()

    try:
        hdfs_path = _expand_path(hdfs_path, project)
    except IOError:
        return False
    return hdfs.path.exists(hdfs_path)


def isdir(hdfs_path, project=None):
    """
    Return True if path refers to a directory.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :project: If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

    Returns:
        True if path refers to a directory.

    Raises: IOError
    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)
    return hdfs.isdir(hdfs_path)


def isfile(hdfs_path, project=None):
    """
    Return True if path refers to a file.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :project: If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

    Returns:
        True if path refers to a file.

    Raises: IOError
    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)
    return path.isfile(hdfs_path)

def isdir(hdfs_path, project=None):
    """
    Return True if path refers to a directory.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
        :project: If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

    Returns:
        True if path refers to a file.

    Raises: IOError
    """
    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)
    return path.isdir(hdfs_path)


def capacity():
    """
    Returns the raw capacity of the filesystem

    Returns:
        filesystem capacity (int)
    """
    return hdfs.capacity()


def dump(data, hdfs_path):
    """
    Dumps data to a file

    Args:
        :data: data to write to hdfs_path
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).
    """

    hdfs_path = _expand_path(hdfs_path, exists=False)
    return hdfs.dump(data, hdfs_path)


def load(hdfs_path):
    """
    Read the content of hdfs_path and return it.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).

    Returns:
        the read contents of hdfs_path
    """
    hdfs_path = _expand_path(hdfs_path)
    return hdfs.load(hdfs_path)

def ls(hdfs_path, recursive=False, exclude_nn_addr=False):
    """
    lists a directory in HDFS

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).

    Returns:
        returns a list of hdfs paths
    """
    if exclude_nn_addr:
        hdfs_path = re.sub(r"\d+.\d+.\d+.\d+:\d+", "", hdfs_path)
    hdfs_path = _expand_path(hdfs_path)
    return hdfs.ls(hdfs_path, recursive=recursive)

def stat(hdfs_path):
    """
    Performs the equivalent of os.stat() on hdfs_path, returning a StatResult object.

    Args:
        :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).

    Returns:
        returns a list of hdfs paths
    """
    hdfs_path = _expand_path(hdfs_path)
    return hdfs.stat(hdfs_path)

def abs_path(hdfs_path):
    """
     Return an absolute path for hdfs_path.

     Args:
         :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS).

    Returns:
        Return an absolute path for hdfs_path.
    """
    return _expand_path(hdfs_path)

def add_module(hdfs_path, project=None):
    """
     Add a .py or .ipynb file from HDFS to sys.path

     For example, if you execute:

     >>> add_module("Resources/my_module.py")
     >>> add_module("Resources/my_notebook.ipynb")

     You can import it simply as:

     >>> import my_module
     >>> import my_notebook

     Args:
         :hdfs_path: You can specify either a full hdfs pathname or a relative one (relative to your Project's path in HDFS) to a .py or .ipynb file

     Returns:
        Return full local path to localized python file or converted python file in case of .ipynb file
    """

    localized_deps = os.getcwd() + "/localized_deps"
    if not os.path.exists(localized_deps):
        os.mkdir(localized_deps)
        open(localized_deps + '/__init__.py', mode='w').close()

    if localized_deps not in sys.path:
        sys.path.append(localized_deps)

    if project == None:
        project = project_name()
    hdfs_path = _expand_path(hdfs_path, project)

    if path.isfile(hdfs_path) and hdfs_path.endswith('.py'):
        py_path = copy_to_local(hdfs_path, localized_deps, overwrite=True)
        if py_path not in sys.path:
            sys.path.append(py_path)
        _reload(py_path)
        return py_path
    elif path.isfile(hdfs_path) and hdfs_path.endswith('.ipynb'):
        ipynb_path = copy_to_local(hdfs_path, localized_deps, overwrite=True)
        python_path = sys.executable
        jupyter_binary = os.path.dirname(python_path) + '/jupyter'
        if not os.path.exists(jupyter_binary):
            raise Exception('Could not find jupyter binary on path {}'.format(jupyter_binary))

        converted_py_path = os.path.splitext(ipynb_path)[0] + '.py'
        if os.path.exists(converted_py_path):
            os.remove(converted_py_path)

        conversion = subprocess.Popen([jupyter_binary, 'nbconvert', '--to', 'python', ipynb_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = conversion.communicate()
        if conversion.returncode != 0:
            raise Exception("Notebook conversion to .py failed: stdout: {} \n stderr: {}".format(out, err))

        if not os.path.exists(converted_py_path):
            raise Exception('Could not find converted .py file on path {}'.format(converted_py_path))
        if converted_py_path not in sys.path:
            sys.path.append(converted_py_path)
        _reload(converted_py_path)
        return converted_py_path
    else:
        raise Exception("Given path " + hdfs_path + " does not point to a .py or .ipynb file")

def _reload(path):
    try:
        module_name = ntpath.basename(path).split(".")[0]
        imported_module = importlib.import_module(module_name)
        importlib.reload(imported_module)
    except Exception as err:
        log.error('Failed to automatically reload module on path {} with exception: {}'.format(path, err))

def is_tls_enabled():
    """
    Reads the ipc.server.ssl.enabled property from core-site.xml.

    Returns:
        returns True if ipc.server.ssl.enabled is true. False otherwise.
    """
    global tls_enabled
    if tls_enabled is None:
        hadoop_conf_path = os.environ['HADOOP_CONF_DIR']
        xmldoc = minidom.parse(os.path.join(hadoop_conf_path,'core-site.xml'))
        itemlist = xmldoc.getElementsByTagName('property')
        for item in itemlist:
            name = item.getElementsByTagName("name")[0]
            if name.firstChild.data == "ipc.server.ssl.enabled":
                tls_enabled = item.getElementsByTagName("value")[0].firstChild.data == 'true'
    return tls_enabled

def _get_webhdfs_address():
    """
    Makes an SRV DNS query to get the target and port of NameNode's web interface

    Returns:
        returns webhdfs endpoint
    """
    global webhdfs_address
    if webhdfs_address is None:
        _, port = ServiceDiscovery.get_any_service('http.namenode')
        webhdfs_address = ServiceDiscovery.construct_service_fqdn('http.namenode') + ":" + str(port)
    return webhdfs_address
        
def get_webhdfs_host():
    """
    Makes an SRV DNS query and gets the actual hostname of the NameNode

    Returns:
        returns NameNode's hostname
    """
    return ServiceDiscovery.construct_service_fqdn('http.namenode')

def get_webhdfs_port():
    """
    Makes an SRV DNS query and gets NameNode's port for WebHDFS

    Returns:
        returns NameNode's port for WebHDFS
    """
    return _get_webhdfs_address().split(":")[1]




In [None]:
Question 6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.

Solution:-  mapper.py
    
    #!/usr/bin/python
  
# import sys because we need to read and write data to STDIN and STDOUT
import sys
  
# reading entire line from STDIN (standard input)
for line in sys.stdin:
    # to remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
      
    # we are looping over the words array and printing the word
    # with the count of 1 to the STDOUT
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        print('%s\t%s' % (word, 1))
        
        
reducer.py

#!/usr/bin/python
  
from operator import itemgetter
import sys
  
current_word = None
current_count = 0
word = None
  
# read the entire line from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # slpiting the data on the basis of tab we have provided in mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
  
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word
  
# do not forget to output the last word if needed!
if current_word == word:
    print('%s\t%s' % (current_word, current_count))
    

demo.txt

hello hi bye ok
ok bye nice hello
tata bye bye

hadoop-streaming-2.4.0.jar->needed

hdfs_commands.txt

# In hdfs file system / is the root

# Command to check the files inside root hdfs directory
hadoop fs -ls /

# Command to create directory in hdfs
hadoop fs -mkdir /input_data


# Copy data from local file system to Hdfs
hadoop fs -put test_demo/trees.csv /input_data

# Copy from HDFS path to local file system
hadoop fs -copyToLocal /input_data/trees.csv ./

# Command to execute map reduce code
hadoop jar hadoop-streaming-2.4.0.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input /test/demo.txt -output /output


    


In [None]:
Question 7. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.

Solution:-  mapper.py
    
    #!/usr/bin/python
  
# import sys because we need to read and write data to STDIN and STDOUT
import sys
  
# reading entire line from STDIN (standard input)
for line in sys.stdin:
    # to remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
      
    # we are looping over the words array and printing the word
    # with the count of 1 to the STDOUT
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        print('%s\t%s' % (word, 1))
        
        
reducer.py

#!/usr/bin/python
  
from operator import itemgetter
import sys
  
current_word = None
current_count = 0
word = None
  
# read the entire line from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # slpiting the data on the basis of tab we have provided in mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
  
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word
  
# do not forget to output the last word if needed!
if current_word == word:
    print('%s\t%s' % (current_word, current_count))
    

demo.txt

hello hi bye ok
ok bye nice hello
tata bye bye

hadoop-streaming-2.4.0.jar->needed

hdfs_commands.txt

# In hdfs file system / is the root

# Command to check the files inside root hdfs directory
hadoop fs -ls /

# Command to create directory in hdfs
hadoop fs -mkdir /input_data


# Copy data from local file system to Hdfs
hadoop fs -put test_demo/trees.csv /input_data

# Copy from HDFS path to local file system
hadoop fs -copyToLocal /input_data/trees.csv ./

# Command to execute map reduce code
hadoop jar hadoop-streaming-2.4.0.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input /test/demo.txt -output /output


    

In [None]:
Question 8. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.

Solution:-  mapper.py
    
    #!/usr/bin/python
  
# import sys because we need to read and write data to STDIN and STDOUT
import sys
  
# reading entire line from STDIN (standard input)
for line in sys.stdin:
    # to remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
      
    # we are looping over the words array and printing the word
    # with the count of 1 to the STDOUT
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        print('%s\t%s' % (word, 1))
        
        
reducer.py

#!/usr/bin/python
  
from operator import itemgetter
import sys
  
current_word = None
current_count = 0
word = None
  
# read the entire line from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # slpiting the data on the basis of tab we have provided in mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
  
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word
  
# do not forget to output the last word if needed!
if current_word == word:
    print('%s\t%s' % (current_word, current_count))
    

demo.txt

hello hi bye ok
ok bye nice hello
tata bye bye

hadoop-streaming-2.4.0.jar->needed

hdfs_commands.txt

# In hdfs file system / is the root

# Command to check the files inside root hdfs directory
hadoop fs -ls /

# Command to create directory in hdfs
hadoop fs -mkdir /input_data


# Copy data from local file system to Hdfs
hadoop fs -put test_demo/trees.csv /input_data

# Copy from HDFS path to local file system
hadoop fs -copyToLocal /input_data/trees.csv ./

# Command to execute map reduce code
hadoop jar hadoop-streaming-2.4.0.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input /test/demo.txt -output /output


    

In [None]:
Question 9. Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.

Solution:- mapper.py
    
    #!/usr/bin/python
  
# import sys because we need to read and write data to STDIN and STDOUT
import sys
  
# reading entire line from STDIN (standard input)
for line in sys.stdin:
    # to remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
      
    # we are looping over the words array and printing the word
    # with the count of 1 to the STDOUT
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        print('%s\t%s' % (word, 1))
        
        
reducer.py

#!/usr/bin/python
  
from operator import itemgetter
import sys
  
current_word = None
current_count = 0
word = None
  
# read the entire line from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # slpiting the data on the basis of tab we have provided in mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
  
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word
  
# do not forget to output the last word if needed!
if current_word == word:
    print('%s\t%s' % (current_word, current_count))
    

demo.txt

hello hi bye ok
ok bye nice hello
tata bye bye

hadoop-streaming-2.4.0.jar->needed

hdfs_commands.txt

# In hdfs file system / is the root

# Command to check the files inside root hdfs directory
hadoop fs -ls /

# Command to create directory in hdfs
hadoop fs -mkdir /input_data


# Copy data from local file system to Hdfs
hadoop fs -put test_demo/trees.csv /input_data

# Copy from HDFS path to local file system
hadoop fs -copyToLocal /input_data/trees.csv ./

# Command to execute map reduce code
hadoop jar hadoop-streaming-2.4.0.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input /test/demo.txt -output /output


    