<a href="https://colab.research.google.com/github/deepakk7195/IISC_CDS_DS/blob/DE/AST_03_Hadoop_MapReduce.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Advanced Certification Program in Computational Data Science
## A program by IISc and TalentSprint
### Assignment 3: Hadoop MapReduce

## Learning Objectives

At the end of the experiment, you will be able to

* understand what is Hadoop and its components
* perform various Hadoop-HDFS shell commands
* perform MapReduce operation on data

## Information

### Introduction

Hadoop is an open-source framework that allows the storage and processing of big data in a distributed environment across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.

Apache Software Foundation is the developer of Hadoop, and its co-founders are Doug Cutting and Mike Cafarella.
Its co-founder Doug Cutting named it on his son’s toy elephant. In October 2003 the first paper release was Google File System. In January 2006, MapReduce development started on the Apache Nutch which consisted of around 6000 lines of code for it and around 5000 lines of code for HDFS. In April 2006 Hadoop 0.1.0 was released.

Apache Hadoop is a collection of open-source software utilities that facilitates using a network of many computers to solve problems involving massive amounts of data and computation. It provides a software framework for distributed storage and processing of big data using the MapReduce programming model. Hadoop was originally designed for computer clusters built from commodity hardware, which is still common use. It has since also found use on clusters of higher-end hardware. All the modules in Hadoop are designed with a fundamental assumption that hardware failures are common occurrences and should be automatically handled by the framework.

### Components of Hadoop

The core of Apache Hadoop consists of a storage part, known as Hadoop Distributed File System (HDFS), and a processing part which is a MapReduce programming model. Hadoop splits files into large blocks and distributes them across nodes in a cluster. It then transfers the packaged code into nodes to process the data in parallel. This approach takes advantage of data locality, where nodes manipulate the data they have access to and allows the dataset to be processed faster and more efficiently.

The components of Hadoop are as follows:

* Storage unit - Hadoop HDFS(Hadoop Distributed File System)
* Processing unit - Hadoop MapReduce
* Resource management unit - Hadoop YARN(Yet Another Resource Framework)

#### Hadoop HDFS

Hadoop File System was developed using distributed file system design. It is run on commodity hardware. Unlike other distributed systems, HDFS is highly fault tolerant and designed using low-cost hardware.

It holds very large amount of data and provides easier access. To store such huge data, the files are stored across multiple machines. These files are stored in a redundant fashion to rescue the system from possible data losses in case of failure. HDFS also makes applications available to parallel processing.

**Features of HDFS**

* Suitable for the distributed storage and processing
* Hadoop provides a command interface to interact with HDFS
* The built-in servers of namenode and datanode help users to easily check the status of the cluster
* Streaming access to file system data
* HDFS provides file permissions and authentication

**Name Node**

HDFS consists of only one Name Node that is called the Master Node. The master node can track files, manage the file system and has the metadata of all of the stored data within it. In particular, the name node contains the details of the number of blocks, locations of the data node that the data is stored in, where the replications are stored, and other details. The name node has direct contact with the client.

**Data Node**

A Data Node stores data in it as blocks. This is also known as the slave node and it stores the actual data into HDFS which is responsible for the client to read and write. These are slave daemons. Every Data node sends a Heartbeat message to the Name node every 3 seconds and conveys that it is alive. In this way when Name Node does not receive a heartbeat from a data node for 2 minutes, it will take that data node as dead and start the process of block replications on some other Data node.
<figure>
<img src= 'https://cdn.iisc.talentsprint.com/CDS/Images/Datanode.png' width= 500 px/>
<figure/>

#### Hadoop MapReduce

It is the processing unit of Hadoop. In map reduce approach, the processing is done at the slave nodes, and the final result is sent to the master node.
<figure>
<img src="https://cdn.iisc.talentsprint.com/CDS/Images/MapReduce.jpg" />
<figure/>
    
A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then used as input to the reduce tasks. At the reduce phase, the aggregation takes place, and the final output is obtained.

#### Hadoop YARN

Hadoop YARN stands for Yet Another Resource Negotiator and is the resource management unit of Hadoop, available as a component of Hadoop version 2. It is a file system that is built on top of HDFS and responsible for managing cluster resources to prevent overloading. It performs job scheduling to make sure that the jobs are scheduled in the right place

Whenever a client machine wants to do a query or fetch some code this job request goes to the resource manager (Hadoop Yarn), which is responsible for resource allocation and management.

<figure>
<img src='https://cdn.iisc.talentsprint.com/CDS/Images/yarn_architecture.gif' />
<figure/>
    
In the node section, each of the nodes has its node managers. These node managers manage the nodes and monitor the resource usage in the node. The containers contain a collection of physical resources, which could be RAM, CPU, or hard drives. Whenever a job request comes in, the application master requests the container from the node manager. Once the node manager gets the resource, it goes back to the Resource Manager.

### Setup Steps:

In [None]:
#@title Please enter your registration id to start: { run: "auto", display-mode: "form" }
Id = "" #@param {type:"string"}

In [None]:
#@title Please enter your password (your registered phone number) to continue: { run: "auto", display-mode: "form" }
password = "" #@param {type:"string"}

In [None]:
#@title Run this cell to complete the setup for this Notebook
from IPython import get_ipython

ipython = get_ipython()

notebook= "M6_AST_03_Hadoop_MapReduce_C" #name of the notebook

def setup():
#  ipython.magic("sx pip3 install torch")
    ipython.magic("sx wget https://cdn.iisc.talentsprint.com/CDSE_experiments_data/ecommerce_uci.csv")
    ipython.magic("sx wget http://qwone.com/~jason/20Newsgroups/20news-18828.tar.gz")
    ipython.magic("sx tar -xzvf 20news-18828.tar.gz")
    from IPython.display import HTML, display
    display(HTML('<script src="https://dashboard.talentsprint.com/aiml/record_ip.html?traineeId={0}&recordId={1}"></script>'.format(getId(),submission_id)))
    print("Setup completed successfully")
    return

def submit_notebook():
    ipython.magic("notebook -e "+ notebook + ".ipynb")

    import requests, json, base64, datetime

    url = "https://dashboard.talentsprint.com/xp/app/save_notebook_attempts"
    if not submission_id:
      data = {"id" : getId(), "notebook" : notebook, "mobile" : getPassword()}
      r = requests.post(url, data = data)
      r = json.loads(r.text)

      if r["status"] == "Success":
          return r["record_id"]
      elif "err" in r:
        print(r["err"])
        return None
      else:
        print ("Something is wrong, the notebook will not be submitted for grading")
        return None

    elif getAnswer() and getComplexity() and getAdditional() and getConcepts() and getComments() and getMentorSupport():
      f = open(notebook + ".ipynb", "rb")
      file_hash = base64.b64encode(f.read())

      data = {"complexity" : Complexity, "additional" :Additional,
              "concepts" : Concepts, "record_id" : submission_id,
              "answer" : Answer, "id" : Id, "file_hash" : file_hash,
              "notebook" : notebook,
              "feedback_experiments_input" : Comments,
              "feedback_mentor_support": Mentor_support}
      r = requests.post(url, data = data)
      r = json.loads(r.text)
      if "err" in r:
        print(r["err"])
        return None
      else:
        print("Your submission is successful.")
        print("Ref Id:", submission_id)
        print("Date of submission: ", r["date"])
        print("Time of submission: ", r["time"])
        print("View your submissions: https://cds-iisc.talentsprint.com/notebook_submissions")
        #print("For any queries/discrepancies, please connect with mentors through the chat icon in LMS dashboard.")
        return submission_id
    else: submission_id


def getAdditional():
  try:
    if not Additional:
      raise NameError
    else:
      return Additional
  except NameError:
    print ("Please answer Additional Question")
    return None

def getComplexity():
  try:
    if not Complexity:
      raise NameError
    else:
      return Complexity
  except NameError:
    print ("Please answer Complexity Question")
    return None

def getConcepts():
  try:
    if not Concepts:
      raise NameError
    else:
      return Concepts
  except NameError:
    print ("Please answer Concepts Question")
    return None


# def getWalkthrough():
#   try:
#     if not Walkthrough:
#       raise NameError
#     else:
#       return Walkthrough
#   except NameError:
#     print ("Please answer Walkthrough Question")
#     return None

def getComments():
  try:
    if not Comments:
      raise NameError
    else:
      return Comments
  except NameError:
    print ("Please answer Comments Question")
    return None


def getMentorSupport():
  try:
    if not Mentor_support:
      raise NameError
    else:
      return Mentor_support
  except NameError:
    print ("Please answer Mentor support Question")
    return None

def getAnswer():
  try:
    if not Answer:
      raise NameError
    else:
      return Answer
  except NameError:
    print ("Please answer Question")
    return None


def getId():
  try:
    return Id if Id else None
  except NameError:
    return None

def getPassword():
  try:
    return password if password else None
  except NameError:
    return None

submission_id = None
### Setup
if getPassword() and getId():
  submission_id = submit_notebook()
  if submission_id:
    setup()
else:
  print ("Please complete Id and Password cells before running setup")



**Note: Hadoop installation in this assignment is performed on top of the linux file system available in Google Colab, therefore it is strictly recommended to execute it only on Google Colab and it may not work in any local environment.**

### Installing Hadoop

In [None]:
# Downloading the hadoop zip file
!wget -qq https://archive.apache.org/dist/hadoop/common/hadoop-3.3.3/hadoop-3.3.3.tar.gz

In [None]:
# Unzipping the hadoop zip file
!tar -xzvf hadoop-3.3.3.tar.gz

Since Colaboratory is built on top of Ubuntu. All the Ubuntu files are available in the Colab file section (left panel of Colab notebook).

Copying the Hadoop file to `user/local` directory

In [None]:
# copy  hadoop file to user/local
!cp -r hadoop-3.3.3/ /usr/local/

### Configuring Hadoop’s Java Home

Hadoop requires that you set the path to Java, either as an environment variable or in the Hadoop configuration file.

In [None]:
#To find the default Java path
!readlink -f /usr/bin/java | sed "s:bin/java::"

set the java environmental variable using os

In [None]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64/'

### Running Hadoop

From the `user/local` directory where we copied the hadoop file, run the hadoop command.

In [None]:
#Running Hadoop
!/usr/local/hadoop-3.3.3/bin/hadoop

### Perform various Hadoop-HDFS shell commands

#### Check the version of Hadoop



In [None]:
!/usr/local/hadoop-3.3.3/bin/hadoop version

#### List all the files/directories for the given hdfs destination path

In [None]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -ls /

#### Display free space at given hdfs destination

In [None]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -df /

#### HDFS Command to create the directory in HDFS

In [None]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -mkdir /hadoop

#### HDFS command to remove the entire directory and all of its content from HDFS

In [None]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -rm -r /hadoop

### Perform MapReduce operation

We will use Ecommerce sales dataset from the UCI Machine Learning Repository  containing real-life transaction data from a UK retailer.

Here we will perform MapReduce operation to calculate total sales for each country.

In [None]:
import pandas as pd
df = pd.read_csv('ecommerce_uci.csv')
df.head()

### Hadoop Streaming

Here we will use HadoopStreaming for helping us pass data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output). We will simply use python’s sys.stdin to read input data and print output to sys.stdout. That’s all we need to do and HadoopStreaming will take care of everything else.

In [None]:
!find / -name 'hadoop-streaming*.jar'

### Write the mapper and reducer files

Mapper will

* read the data
* convert it into a proper format
* print output as key-value pair i.e, Country Name, Sales.

In [None]:
%%writefile mapper.py
import sys
line_number = 1
# csv is file is passed as command line input
for line in sys.stdin:
    line = line.strip()           # remove trailing spaces
    InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Total_Sales, Country = line.split(',')
    #print(InvoiceNo)
    if line_number>=2:
        print('{0}\t{1}'.format(Country.replace(' ','_'),Total_Sales))
    line_number += 1


Reducer will

* read input from the mapper
* check for existing country key in the dictionary
* add the total to an existing value
* print all key-value pairs

In [None]:
%%writefile reducer.py
from operator import itemgetter
import sys
word2count = {}
# csv is file is passed as command line input
for line in sys.stdin:
    line = line.strip()
    try:
        country_name, sales = line.split('\t')
        sales = float(sales)
        # get() method takes maximum of two parameters: Value(0) to be returned if the key is not found
        word2count[country_name] = word2count.get(country_name, 0) + sales
    except ValueError:
        pass
# itemgetter(1) will specify that we need to sort based on dictionary values
sorted_word2count = sorted(word2count.items(), key=itemgetter(1), reverse=True)

for word, count in sorted_word2count:
    print('{0}\t{1}'.format(word, count))

In [None]:
# Restoring the access permissions for user including read, write and execute
!chmod u+rwx /content/mapper.py
!chmod u+rwx /content/reducer.py

#### Running the Python Code on Hadoop cluster

Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As mentioned earlier, we use Hadoop streaming for passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output).

In [None]:
!/usr/local/hadoop-3.3.3/bin/hadoop jar /usr/local/hadoop-3.3.3/share/hadoop/tools/lib/hadoop-streaming-3.3.3.jar -input /content/ecommerce_uci.csv -output /content/output1 -file /content/mapper.py  -file /content/reducer.py  -mapper 'python mapper.py'  -reducer 'python reducer.py'

In [None]:
# Locating output directory
!ls /content/output1

In [None]:
# Display output
!cat /content/output1/part-00000

Now we will conssider 20-newsgroups dataset. This dataset is a collection of newsgroup documents. There are 20 files that contain all of the documents, one document per newsgroup. In this dataset, duplicate messages have been removed and the original messages only contain "From" and "Subject" headers (18828 messages total).

Each newsgroup file in the bundle represents a single newsgroup. Each message in a file is the text of some newsgroup document that was posted to that newsgroup.

Some of the newsgroups are as follows:

* comp.graphics
* comp.os.ms-windows.misc
* comp.sys.ibm.pc.hardware
* rec.motorcycles
* rec.sport.baseball
* rec.sport.hockey sci.crypt
* sci.electronics
* sci.med
* misc.forsale talk.politics.misc
* talk.politics.guns
* talk.politics.mideast talk.religion.misc
* alt.atheism
* soc.religion.christian

Here we will be using `alt.atheism` newsgroup and perform MapReduce operation to calculate the count of words in it.

To know more about the dataset click [here](http://qwone.com/~jason/20Newsgroups/).

In [None]:
!cat /content/20news-18828/alt.atheism/49960

#### Write the Mapper file

In [None]:
%%writefile mapper_news.py
import sys
import io
import re
import nltk
nltk.download('stopwords',quiet=True)
from nltk.corpus import stopwords
import string
# list possible punctuations
punctuations = string.punctuation

# configure english stopwords
stop_words = set(stopwords.words('english'))
# convert text to lines of string
input_stream = io.TextIOWrapper(sys.stdin.buffer, encoding='latin1')
for line in input_stream:
  line = line.strip()                      # remove trailing spaces
  line = re.sub(r'[^\w\s]', '',line)       # replace apostrophe with empty space
  line = line.lower()                      # convert line to lowercase
  for x in line:
    if x in punctuations:
      line=line.replace(x, " ")           # replace punctuations with space

  words=line.split()                      # split line into words
  for word in words:
    if word not in stop_words:
      print('%s\t%s' % (word, 1))

#### Write the Reducer file

In [None]:
%%writefile reducer_news.py
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    line=line.lower()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    try:
      count = int(count)
    except ValueError:
      #count was not a number, so silently
      #ignore/discard this line
      continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print( '%s\t%s' % (current_word, current_count))

In [None]:
# Restoring the access permissions for user including read, write and execute
!chmod u+rwx /content/mapper_news.py
!chmod u+rwx /content/reducer_news.py

#### Running the Python Code on Hadoop cluster

Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As mentioned earlier, we use Hadoop streaming for passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output).

In [None]:
!/usr/local/hadoop-3.3.3/bin/hadoop jar /usr/local/hadoop-3.3.3/share/hadoop/tools/lib/hadoop-streaming-3.3.3.jar -input /content/20news-18828/alt.atheism/49960 -output /content/output_news -file /content/mapper_news.py  -file /content/reducer_news.py  -mapper 'python mapper_news.py'  -reducer 'python reducer_news.py'

In [None]:
# Locating output directory
!ls /content/output_news

In [None]:
# Display output
!cat /content/output_news/part-00000

### Please answer the questions below to complete the experiment:




In [None]:
# @title Which of the following takes a set of data and converts it into another set of data, where individual elements are broken down into tuples? { run: "auto", form-width: "500px", display-mode: "form" }
Answer = "" #@param ["","Reduce","Node","Map"]

In [None]:
#@title How was the experiment? { run: "auto", form-width: "500px", display-mode: "form" }
Complexity = "" #@param ["","Too Simple, I am wasting time", "Good, But Not Challenging for me", "Good and Challenging for me", "Was Tough, but I did it", "Too Difficult for me"]


In [None]:
#@title If it was too easy, what more would you have liked to be added? If it was very difficult, what would you have liked to have been removed? { run: "auto", display-mode: "form" }
Additional = "" #@param {type:"string"}


In [None]:
#@title Can you identify the concepts from the lecture which this experiment covered? { run: "auto", vertical-output: true, display-mode: "form" }
Concepts = "" #@param ["","Yes", "No"]


In [None]:
#@title  Text and image description/explanation and code comments within the experiment: { run: "auto", vertical-output: true, display-mode: "form" }
Comments = "" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [None]:
#@title Mentor Support: { run: "auto", vertical-output: true, display-mode: "form" }
Mentor_support = "" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [None]:
#@title Run this cell to submit your notebook for grading { vertical-output: true }
try:
  if submission_id:
      return_id = submit_notebook()
      if return_id : submission_id = return_id
  else:
      print("Please complete the setup first.")
except NameError:
  print ("Please complete the setup first.")