In [3]:
!ls

[1m[36mkmeans_from_CMSC_12300[m[m   mkeans-mrjob.ipynb       mkeans-mrjob.slides.html


## Kmeans - main program

In [1]:
# %load kmeans.py
# CMSC 12300 - Computer Science with Applications 3
# Borja Sotomayor, 2013
#
#  Master file: uses Runners to fire off map-reduce steps.

import sys
import random
import numpy
import pickle

In [2]:
from mrjob.job import MRJob
from kmeans_centroid_selector import MRKMeansChooseInitialCentroids
from kmeans_centroid_updater import MRKMeansUpdateCentroids

ImportError: No module named kmeans_centroid_selector

Read the output of the runner and return it as a list of vectors (value)

runner.stream_output is explained [here](http://mrjob.readthedocs.org/en/latest/runners-runner.html#mrjob.runner.MRJobRunner.stream_output)

job.parse_output_line is explained [here](http://mrjob.readthedocs.org/en/latest/job.html?highlight=parse_output_line#mrjob.job.MRJob.parse_output_line)

In [None]:
def extract_centroids(job, runner):
    c = []
    for line in runner.stream_output():
        key, value = job.parse_output_line(line)
        print key, value
        c.append(value)
    return c

In [None]:
def write_centroids_to_disk(centroids, fname):
    f = open(fname, "w")
    pickle.dump(centroids, f)
    f.close()

In [None]:
# compute the largest distance traveled between the centroids in 'centroids' and 'new_centroids'
# used as part of a termination criterion
def get_biggest_diff(centroids,new_centroids):
    distances = [numpy.linalg.norm(numpy.array(c1) - c2) for c1,c2 in zip(centroids,new_centroids)]
    max_d = max(distances)
    return max_d

In [None]:
CENTROIDS_FILE="/tmp/emr.kmeans.centroids"

The following code is based on the pattern "Running your job programmatically"
Which is explained [here](http://mrjob.readthedocs.org/en/latest/guides/runners.html#running-your-job-programmatically)

In [None]:
if __name__ == '__main__':
    args = sys.argv[1:]

    # initialize job
    choose_centroids_job = MRKMeansChooseInitialCentroids(args=args)
    # create a runner for the job
    with choose_centroids_job.make_runner() as choose_centroids_runner:
        choose_centroids_runner.run() #run the "choose centroids" map-reduce job

        # Extract the centrids from stdout
        centroids = extract_centroids(choose_centroids_job, choose_centroids_runner)
        write_centroids_to_disk(centroids, CENTROIDS_FILE)

        i = 1
        while True:
            print "Iteration #%i" % i
            # Initialize job
            update_centroids_job = MRKMeansUpdateCentroids(args=args + ['--centroids='+CENTROIDS_FILE])
            # Create runner
            with update_centroids_job.make_runner() as update_centroids_runner:
                # run UpdateCentroids job
                update_centroids_runner.run()

                new_centroids = extract_centroids(update_centroids_job, update_centroids_runner)
                write_centroids_to_disk(new_centroids, CENTROIDS_FILE)

                diff = get_biggest_diff(centroids, new_centroids)

                if diff > 10.0:
                    centroids = new_centroids
                else:
                    break

                i+=1

## Kmeans - Initialize Centroids

In [None]:
# %load kmeans_centroid_selector.py
# CMSC 12300 - Computer Science with Applications 3
# Borja Sotomayor, 2013
#

import sys
import random
import numpy
import pickle

from mrjob.job import MRJob


class MRKMeansChooseInitialCentroids(MRJob):

    #initialize (using parent's __init__
    def __init__(self, args):
        MRJob.__init__(self, args)

**Configure_options** is a mechanism for passing command line arguments to a MRjob job and use/parse them internally.

It is explained [here](https://pythonhosted.org/mrjob/job.html#mrjob.job.MRJob.configure_options)

In [None]:
    def configure_options(self):
        super(MRKMeansChooseInitialCentroids, self).configure_options()
        self.add_passthrough_option(
            '--k', type='int', help='Number of clusters')

In [None]:
    # Mapper
    def get_coordinates(self, _, line):
        l = line.split()
        if len(l) == 1:
            return
        
        yield None, [int(x) for x in l[:-1]]
    
    # Combiner
    # compute for each coordinate the minimum and the maximum value attainged.
    def find_ranges(self, _, points):
        # extract the array from the generator "points"
        minp = maxp = numpy.array(points.next())
        for p in points:
            minp = numpy.minimum(minp, p)
            maxp = numpy.maximum(maxp, p)

        yield None, minp.tolist()
        yield None, maxp.tolist()
    
    # Reducer
    def select_centroids(self, _, minmax):
        minp = maxp = numpy.array(minmax.next(), dtype=float)
        for p in minmax:
            minp = numpy.minimum(minp, p)
            maxp = numpy.maximum(maxp, p)

        # Define the centroids to be the k points arranged linearly between minp and maxp.
        
        k = self.options.k
        step = (maxp-minp) / k
        
        for i in range(k):
            yield None, (minp + step*i).tolist()

    def steps(self):
        return [self.mr(mapper=self.get_coordinates,
                        combiner=self.find_ranges,
                        reducer=self.select_centroids)]

if __name__ == '__main__':
    MRKMeansChooseInitialCentroids.run()

## Kmeans - Update Centroids

In [None]:
# %load kmeans_centroid_updater.py
# CMSC 12300 - Computer Science with Applications 3
# Borja Sotomayor, 2013
#

import sys
import random
import numpy
import pickle

from mrjob.job import MRJob

class MRKMeansUpdateCentroids(MRJob):

**add_file_optio**n is the way to refer to a file that is either local, on hdfs or on s3

For more details read [this](http://mrjob.readthedocs.org/en/latest/guides/writing-mrjobs.html#file-options)

In [None]:
    def configure_options(self):
        super(MRKMeansUpdateCentroids, self).configure_options()
        self.add_passthrough_option(
            '--k', type='int', help='Number of clusters')
        self.add_file_option('--centroids')

In [None]:
    # mapper_init
    def get_centroids(self):
        f = open(self.options.centroids)
        self.centroids = pickle.load(f)
        f.close()
        return centroids

    # mapper
    def assign_cluster(self, _, line):
        l = line.split()
        if len(l) == 1:
            return

        point = numpy.array([float(x) for x in l[:-1]])
        
        distances = [numpy.linalg.norm(point - c) for c in self.centroids]
        cluster = numpy.argmin(distances)

        yield int(cluster), point.tolist()

    # combiner
    def partial_sum(self, cluster, points):
        s = numpy.array(points.next())
        n = 1
        for p in points:
            s += p
            n += 1

        yield cluster, (s.tolist(), n)

    # reducer
    def compute_average(self, cluster, partial_sums):
        SUM, N = partial_sums.next()
        SUM = numpy.array(SUM)
        for ps, n in partial_sums:
            SUM += ps
            N += n

        yield cluster, (SUM / N).tolist()

    def steps(self):
        return [self.mr(mapper_init=self.get_centroids,
                        mapper=self.assign_cluster,
                        combiner=self.partial_sum,
                        reducer=self.compute_average)]


if __name__ == '__main__':
    MRKMeansUpdateCentroids.run()