##mrjob##

__mrjob__ is a software package developed by the restaurant recommendation company _Yelp_. 
It's goal is to simplify the deployment of map-reduce jobs based on streaming and python onto different 
frameworks such as Hadoop on a private cluster or hadoop on AWS (called EMR).

* You can read more about mrjob here: https://pythonhosted.org/mrjob/index.html  
* and you can clone it from github here: https://github.com/yelp/mrjob

In this notebook we run a simple word-count example, add to it some logging commands, and look at two modes of running the job.

**mrjob Command line** is described here: https://pythonhosted.org/mrjob/guides/emr-tools.html

In [None]:
import os
import sys
from time import time
from ucsd_bigdata.credentials import Credentials

root_dir = "../../"

# Get the AWS credentials from the User's Vault
credentials = Credentials()
key_id = credentials.aws_access_key_id
secret_key = credentials.aws_secret_access_key
username = credentials.aws_user_name
s3_bucket = credentials.s3_bucket

print s3_bucket,key_id,username

examples_dir = root_dir + '/data/text/'
!ls -l $examples_dir

## Different modes of running a mrjob map-reduce job ##

Once the mapper, combiner and reducer have been written and tested, you can run the job on different types of infrastructure:

1. __inline__ run the job as a single process on the local machine.
1. __local__ run the job on the local machine, but using multiple processes to simulate parallel processing.
1. __EMR__ (Elastic Map Reduce) run the job on a hadoop cluster running on the amazon cloud.

Below we run the same process we ran at the top using __local__ instead of the default __inline__. Observe that in this case the reducers have some non-trivial work to do even when combiners are used.

## Running in local mode

## Setting up configuration

In [None]:
from ucsd_bigdata.find_waiting_flow import find_waiting_flow
flows_dict = find_waiting_flow()
if len(flows_dict) > 0:
    flow_id, node = (flows_dict[0]['flow_id'],flows_dict[0]['node'])
    print flow_id, node 
    input_file = 'hdfs://'+node+':9000/weather.raw_data/ALL.csv'
else:
    print "No flows available"

## Running in EMR mode on existing job flow (hadoop cluster)

In [None]:
import uuid

# Create unique output directory in the student's s3_bucket
output_dir = s3_bucket + str(uuid.uuid4()) + "/"

print output_dir

In [None]:
!python mr_word_freq_count.py -r emr  $examples_dir/Moby-Dick.txt --emr-job-flow-id=$flow_id --output-dir=$output_dir  > counts_emr.txt

In [None]:
!mrjob fetch-logs --list $flow_id

In [None]:
!ls -lrt

In [None]:
!wc counts_emr.txt
!cat counts_emr.txt

In [None]:
!cut -b 2-11 counts_emr.txt > counts_only.txt
!head -100 counts_only.txt