In [1]:
import os
from scipy.ndimage import imread
import codecs
import json
from tqdm import tqdm

input_dir = 'Cifar Pictures/'
output_file = 'cifar_data.txt'

with codecs.open(output_file, 'a+', encoding='utf-8') as f:
    for image_name in tqdm(os.listdir(input_dir)):
        image_data = imread(input_dir + image_name)
        image_dict = {image_name: image_data.tolist()}
    
        json.dump(image_dict, f, separators=(',', ':'))
        f.write('\n')

100%|███████████████████████████████████████████████████████████| 50000/50000 [12:29<00:00, 66.73it/s]


In [2]:
!wc -l $output_file
print(len(os.listdir(input_dir)))

50000 cifar_data.txt
50000


In [3]:
%%writefile mr_cifar.py
import json
import numpy as np
from mrjob.job import MRJob


class MyMR(MRJob):
    
    def mapper(self, _, line):
        try:
            image_dict = json.loads(line)
            ((image_name, image_data),) = image_dict.items()            
        except:
            assert False, 'something went wrong' # print statements go into mapper output in MRJob
            return

        color_averages = np.array(image_data).mean(axis=(0, 1))
        max_color_channel = np.argmax(color_averages)
        yield (int(max_color_channel), (image_name, color_averages[max_color_channel]))
        # key has to be int, not np.int64
    
    def reducer(self, max_color_channel, max_color_intensities):
        yield max_color_channel, sorted(max_color_intensities, key=lambda tup: (-tup[1], tup[0]))

        
if __name__ == '__main__': 
    MyMR.run()

Writing mr_cifar.py


In [4]:
%%time
!python mr_cifar.py < cifar_data.txt > temp_MRJob.txt
# python cifar_map_reduce.py --mapper < cifar_data.txt # just run mapper
# sort temp.txt > temp0.txt
# python cifar_map_reduce.py --reducer < temp0.txt > temp.txt

No configs found; falling back on auto-configuration
Creating temp directory c:\users\eugene\appdata\local\temp\mr_cifar.Eugene.20170718.041451.859000
Running step 1 of 1...
reading from STDIN
Streaming final output from c:\users\eugene\appdata\local\temp\mr_cifar.Eugene.20170718.041451.859000\output...
Removing temp directory c:\users\eugene\appdata\local\temp\mr_cifar.Eugene.20170718.041451.859000...


Wall time: 1min 3s


In [5]:
%%time
# just run mapper and reducer separately
!python mr_cifar.py --mapper < cifar_data.txt > temp_mapper_output.txt
!sort temp_mapper_output.txt > temp_mapper_output_sorted.txt
!python mr_cifar.py --reducer < temp_mapper_output_sorted.txt > temp_reducer_output.txt
!diff temp_reducer_output.txt temp_MRJob.txt

# 1 liner
#!cat cifar_data.txt | python cifar_map_reduce.py --mapper | \
#sort | python cifar_map_reduce.py --reducer > temp_reducer_output.txt

Wall time: 1min 9s


In [6]:
color_channels_and_pictures = dict()
with open('temp_MRJob.txt') as f:
    for line in f:
        color_channel, images_and_intensities = line.split('\t')
        images_and_intensities = eval(images_and_intensities)
        image_names = [image_name for image_name, color_intensity in images_and_intensities]        
        color_channels_and_pictures[color_channel] = image_names

In [7]:
import matplotlib.pyplot as plt
%matplotlib
from scipy.ndimage import imread
import numpy as np

def plots(ims, interp=False, titles=None):
    ims=np.array(ims)
    mn,mx=ims.min(),ims.max()
    f = plt.figure(figsize=(12,24))
    for i in range(len(ims)):
        sp=f.add_subplot(1, len(ims), i+1)
        if not titles is None: sp.set_title(titles[i], fontsize=18)
        plt.imshow(ims[i], interpolation=None if interp else 'none', vmin=mn,vmax=mx)

def plot(im, interp=False):
    f = plt.figure(figsize=(3,6), frameon=True)
    plt.imshow(im, interpolation=None if interp else 'none')

plt.gray()
plt.close()

Using matplotlib backend: Qt4Agg


In [8]:
input_dir = 'Cifar Pictures/'

channel_images = []
for image_name in color_channels_and_pictures['1'][:10]: # key becomes string
    channel_images.append(imread(input_dir + image_name))

plots(channel_images)

# Manual EMR Steps

In [1]:
%%writefile EMR_cifar_mapper.py
#!/usr/bin/python
# OUTPUT: max_color_channel \t image_name \t max_color_intensity

import sys
import json
import numpy as np

for line in sys.stdin:
    image_dict = json.loads(line)    
    try:
        image_dict = json.loads(line)
        ((image_name, image_data),) = image_dict.items()            
    except:
        assert False, 'something went wrong'

    color_averages = np.array(image_data).mean(axis=(0, 1))
    max_color_channel = np.argmax(color_averages)
    print("{}\t{}\t{}".format(
        int(max_color_channel), 
        image_name, 
        color_averages[max_color_channel]))
        # key has to be int, not np.int64

Overwriting EMR_cifar_mapper.py


In [2]:
%%writefile EMR_cifar_reducer.py
#!/usr/bin/python
# OUTPUT: color_channel \t sorted_image_name_color_intensity_tuple

import sys

current_color_channel = None
image_names_color_intensities = None

for line in sys.stdin: # everything read in is a string!
    max_color_channel, image_name, max_color_intensity = line.strip().split('\t')
    if current_color_channel == max_color_channel:
        image_names_color_intensities.append(
            (image_name, float(max_color_intensity))
            )
    else:
        if current_color_channel: # if channel changes, print results
            print("{}\t{}".format(
                current_color_channel, 
                sorted(image_names_color_intensities, key=lambda tup: (-tup[1], tup[0])) # for breaking ties
                ))
        current_color_channel = max_color_channel
        image_names_color_intensities = []
        image_names_color_intensities.append(
            (image_name, float(max_color_intensity))
        )
        
if current_color_channel: # for last color channel
    print("{}\t{}".format(current_color_channel, 
        sorted(image_names_color_intensities, key=lambda tup: (-tup[1], tup[0])) # for breaking ties
         ))
    


Overwriting EMR_cifar_reducer.py


In [3]:
%%time
#!python EMR_mapper.py < cifar_data.txt > temp1.txt
#!sort temp1.txt > temp2.txt
#!python EMR_reducer.py < temp2.txt > temp3.txt

# 1 liner
!cat cifar_data.txt | python EMR_cifar_mapper.py | sort | python EMR_cifar_reducer.py > temp_cifar_EMR_manual.txt

Wall time: 1min 23s


In [4]:
# to put on EMR if you are on Windows, convert Windows file to Linux compatible
!dos2unix EMR_cifar_mapper.py EMR_cifar_mapper.py
!dos2unix EMR_cifar_reducer.py EMR_cifar_reducer.py

# optional
!chmod +x EMR_cifar_mapper.py
!chmod +x EMR_cifar_reducer.py

dos2unix: converting file EMR_cifar_mapper.py to Unix format...
dos2unix: converting file EMR_cifar_mapper.py to Unix format...
dos2unix: converting file EMR_cifar_reducer.py to Unix format...
dos2unix: converting file EMR_cifar_reducer.py to Unix format...


# AWS Elastic MapReduce Notes

## Sorting
* When there exist ties, the EMR sort might not be identical to MRJob or manual sort. Hence, you want to sort in a way that is deterministic to always break ties--if consistency is your goal.

## Cluster Creation under "Advanced Options",
* On "Software Configuration", set "Step type" to "Streaming program"  
* On "Hardware Configuration", set "EC2 Subnet" to subregion that has the EC2 instances that you want, i.e. us-east-1e. Also, select number of EC2 instances you want for Master and Core nodes 
* On "General Options", name your cluster and select correct folder for S3 folder Logging
* On "Security Options", select your EC2 key pair if you want to ssh into it

## Cluster Debugging

* To ssh, `ssh -i key.pem hadoop@ec2-54-236-17-44.compute-1.amazonaws.com` where the instance is shown under "Master public DNS". If you have any problems ssh into Hadoop instance, perhaps you have to open port 22
* EMR instances have Python 2 and 3 installed along with many libraries. You can ssh into to see which ones are available
* When setting up "Steps" with "Add step", when things break, look under 'syslog'.
* For seeing how the cluster is doing, inside 'steps', look at 'View jobs' to see where things are progressing. Can even look at "View tasks".
* When "Adding step", select the input folder is better than a single file, unless you want only that 1 file.
* For output_folder, have to create a new one every single time. Hence, navigate to where you want to set up folder. Then, type an inner folder name that doesn't exist yet.

# Download EMR results from S3

In [1]:
import boto

AWS_credentials = {}
with open('rootkey.csv') as f:
    for line in f:
        if 'AWSAccessKeyId' in line:
            AWS_credentials['aws_access_key_id'] = line.strip().split('=')[1]
        elif 'AWSSecretKey' in line:
            AWS_credentials['aws_secret_access_key'] = line.strip().split('=')[1]

conn = boto.connect_s3(**AWS_credentials)
bucket = conn.get_bucket('map-reduce-practice')

In [2]:
import os

output_dir = 'output_data/cifar/'

if not os.path.isdir(output_dir):
    os.makedirs(output_dir)

for key in bucket.list():
    if output_dir in key.key:
        print key.key
        file_name = key.key.split('/')[-1]
        key.get_contents_to_filename(output_dir + file_name)

output_data/cifar/_SUCCESS
output_data/cifar/part-00000
output_data/cifar/part-00001
output_data/cifar/part-00002
output_data/cifar/part-00003
output_data/cifar/part-00004
output_data/cifar/part-00005
output_data/cifar/part-00006


# Check if MRJob, manual map-reduce, EMR results match

In [5]:
color_channels_and_pictures = dict()
with open('temp_MRJob.txt') as f:
    for line in f:
        color_channel, images_and_intensities = line.split('\t')
        images_and_intensities = eval(images_and_intensities)
        image_names = [image_name for image_name, color_intensity in images_and_intensities]        
        color_channels_and_pictures[color_channel] = image_names

In [6]:
color_channels_and_pictures_manual = dict()
with open('temp_cifar_EMR_manual.txt') as f:
    for line in f:
        color_channel, images_and_intensities = line.split('\t')
        images_and_intensities = eval(images_and_intensities)
        image_names = [image_name for image_name, color_intensity in images_and_intensities]        
        color_channels_and_pictures_manual[color_channel] = image_names

In [7]:
print(color_channels_and_pictures == color_channels_and_pictures_manual)

True


In [36]:
import os

output_dir = 'output_data/cifar/'
color_channels_and_pictures_EMR = dict()

for file_name in os.listdir(output_dir):
    with open(output_dir + file_name) as f:
        for line in f:
            color_channel, images_and_intensities = line.split('\t')
            images_and_intensities = eval(images_and_intensities)
            print(cheat_sort(images_and_intensities) == color_channels_and_pictures[color_channel])
            image_names = [image_name for image_name, color_intensity in images_and_intensities]        
            color_channels_and_pictures_EMR[color_channel] = image_names

False
False
False


In [4]:
print(color_channels_and_pictures == color_channels_and_pictures_EMR)

False


In [None]:
import os

output_dir = 'output_data/cifar/'
color_channels_and_pictures_EMR = dict()

for file_name in os.listdir(output_dir):
    with open(output_dir + file_name) as f:
        for line in f:
            color_channel, images_and_intensities = line.split('\t')
            images_and_intensities = eval(images_and_intensities)
            image_names = [image_name for image_name, color_intensity in images_and_intensities]        

In [42]:
import numpy as np

disagreement = np.where(np.array(color_channels_and_pictures_manual['2'][:1000]) != 
                        np.array(color_channels_and_pictures_EMR['2'][:1000]))[0]
print disagreement

[229 230 487 488 683 684 719 720 876 877 899 900]


In [43]:
print np.array(color_channels_and_pictures_manual['2'])[disagreement]
print np.array(color_channels_and_pictures_EMR['2'])[disagreement]

['22642.png' '42242.png' '20380.png' '4553.png' '29079.png' '47747.png'
 '1146.png' '47382.png' '21119.png' '45345.png' '12312.png' '45011.png']
['42242.png' '22642.png' '4553.png' '20380.png' '47747.png' '29079.png'
 '47382.png' '1146.png' '45345.png' '21119.png' '45011.png' '12312.png']


In [44]:
from collections import defaultdict

def cheat_sort(lst):
    re_sort = defaultdict(list)
    for tup in lst:
        re_sort[tup[1]].append(tup[0])
        
    results = []
    for intensity in sorted(re_sort, reverse=True):
        results.extend(re_sort[intensity])
    return results

In [46]:
cheat_sort(images_and_intensities)[:100] == color_channels_and_pictures_manual['2'][:100]

True