# Chapter 11: Parallelization

This notebook shows IPthon parallel capabilities. We will see the commands that allow
us to parallelize tasks, see section Introduction. If properly combined, they can lead to 
interting parallel algorithms, see section A complete example. 


## Introduction

In order to start IPython's parallel capabilities, the simplest way of proceeding 
it to click on the Clusters tab of the notebook dashboard, and press 
Start with the desired number of cores. 
This will automatically run the necessary commands to start the
IPython cluster. We will now be able to send different tasks to
the engines using the web interface.

The next commands allows you to connect to the cluster

In [None]:
from IPython import parallel
engines = parallel.Client()
engines.block = True
print engines.ids

These commands connect to the cluster and output the number of engines in it. 
If an error is shown when running the
commands, the cluster has not been correctly created.

### Direct view of engines

The following commands,
executed on the notebook (i.e., the IPython interpreter), send commands
to the first engine:

In [None]:
engines[0].execute('a = 2') 
engines[0].execute('b = 10') 
engines[0].execute('c = a + b')  
engines[0].pull('c')

Observe that we do not have direct access to the command line of the first engine. Rather, we send commands to it through the client. The result is retrieved by means of the `pull` command.

Since each engine is
an independent process, the operating system may schedule each engine
in a different core and thus execution may be performed in parallel. Take a look at the following example.

In [None]:
engines[0].execute('a = 2') 
engines[0].execute('b = 10') 
engines[1].execute('a = 9') 
engines[1].execute('b = 7') 
engines[0:2].execute('c = a + b')   
engines[0:2].pull('c')

Observe that the operation `c = a + b` is executed on each egine. The latter calculation and does not show the power of parallization. For that purpose we are next perform more computational intenstive tasks. Let us now show that we are really doing computations in parallel. Let us try with something bigger!

In order to simplify the code, let us define the following variable that references the first two engines (even if there are more active engines).

In [None]:
dview2 = engines[0:2]    

We are next going to focus on matrix multiplication, for instance. We begin by doing serialized computations on the notebook and compute the total processing time.

In [None]:
import time
import numpy as np

# Create four 1000x1000 matrix
A0 = np.random.rand(1000,1000)
B0 = np.random.rand(1000,1000)
A1 = np.random.rand(1000,1000)
B1 = np.random.rand(1000,1000)

t0 = time.time() 

C0 = np.dot(A0, B0)
C1 = np.dot(A1, B1)
    
print "Time in seconds (Computations): ", time.time() - t0 

And now we will do computations in parallel.

In [None]:
dview2.execute('import numpy as np')       # We import numpy on both engines!

t0 = time.time()
engines[0].push(dict(A=A0, B=B0))    # We send A0 and B0 to engine 0 
engines[1].push(dict(A=A1, B=B1))    # We send A1 and B1 to engine 1 

t0_computations = time.time()

dview2.execute('C = np.dot(A,B)')
    
print "Computations: ", time.time() - t0_computations

[C0, C1] = dview2.pull('C')
print "Time in seconds: ", time.time() - t0

The total computing time should decrease thanks to the divison of the computation in two tasks. Each task is then manually executed in two different engines and each engine is scheduled by the operating system on two different processors (if the computer has at least two processors).

The previous commands show us how to execute commands on engines as if we were typing 
them directly on the command-line. Indeed, we
have manually sent, executed and retrieved the results of computations.
This procedure may be useful in some cases but in many cases there
will be no need to do so. The `map` function may be used to that purpose

In [None]:
def mul(A, B):
    import numpy as np
    C = np.dot(A, B)
    return C

[C0, C1] = dview2.map(mul,[A0, A1],[B0, B1])

These commands, executed on the client, perform a remote call.
The function `mul` is defined locally. There is no need to use the `push`
and `pull` functions explicitly to send and retrieve the results; it is done
implicitly. Note the `import numpy as np`
inside the `mul` function. This is a common model, to ensure that the appropriate
toolboxes are imported to where the task is run. 

The `map` call splits the tasks between the engines associated
with `dview2`. In the previous example, the task `mul(A0,B0)`
is executed on one engine and `mul(A1, B1)` is executed on
the one. Which command is executed on each engine? What happens if
the list of arguments to map includes three or more matrices? We may
see this with the following example:

In [None]:
engines[0].execute('my_id = "engineA"') 
engines[1].execute('my_id = "engineB"')

def sleep_and_return_id(sec):     
    import time     
    time.sleep(sec)                      
    return my_id,sec

dview2.map(sleep_and_return_id, [3,3,3,1,1,1])

Execute the previous code and observe the returned result that indicates us which engine executed the function. You may repeat this experment as many times as you wish, but the result will always be the same. The tasks are distributed in a uniform way among the
engines before executing them no matter which is the delay we pass
as argument to the function `sleep_and_return_id`. This is in fact a characteristic of the direct view interface: the tasks are distributed among the engines before executing them. 

This a good way to proceed if you expect each task to take
the same amount of time. But if not, as is the case in the previous
example, computation time is wasted and so we recommend to use the 
load-balanced view instead.

### Load-balanced view of engines

This interface is simpler and more powerful than the direct interface. We would like to point out, however, that with this interface the user has no direct access to individual engines. It is the IPython scheduler that assignes work to each engine. To create a load-balanced view we may use the following command: 

In [None]:
engines.block = True
lview2 = engines.load_balanced_view(targets=[0,1])

We use the blocking mode since it simplifies the code. The `lview2` is a variable
that references the first two engines.

Our example here will be centered on the `sleep_and_return_id`
function we have seen in the previous subsection:

In [None]:
lview2.map(sleep_and_return_id, [3,3,3,1,1,1])

Observe that rather than using the direct
view interface (`dview2` variable) of the `map` function, we use the associated load-balanced view interface (`lview2` variable).

In this case, the tasks are assigned to the engines in a dynamic way.
The `map` function of the load-balanced view begins by assigning
one task to each engine in the order given by the parameters of the
`map` function. By default, the load-balanced view scheduler
then assigns a new task to an engine when it becomes free

## A complete example: the New York taxi trips database

We next present a real application of the parallel capabilities
of IPython and the discussion of several approaches to it. The dataset
is a database of taxi trips in New York and it has been obtained through
a Freedom of Information Law (FOIL) request from the New York City
Taxi & Limousine Commission (NYCT&L) by University of Illinois at
Urbana-Champaign (http://publish.illinois.edu/dbwork/open-data/).
The dataset consists in $12\times2$GBytes Comma Separated Files (CSV)
files. Each file has approximately $14$ million entries (lines) and
is already cleaned. Thus no special preprocessing is needed to be
able to process it. For our purposes we are interested only in the
following information from each entry: 

+ `pickup_datetime`: start time of the trip, mm-dd-yyyy hh24:mm:ss
EDT. 
+ `pickup_longitude` and `pickup_latitude`: GPS coordinates
at the start of the trip. 

Our objective is to perform an analysis of this data in order to answer
the following questions: for each district, how many pickups are performed
during week days and how many during weekends? And how many pickups
are performed in the morning? For that issue the New York city is
arbitrarily divided into nine districts: ChinaTown, WTC, Soho, Harlem,
UpperTown, MidTown, DownTown, UpperEastSide, UpperWestSide and Financial. 

Implementing the previous classification is rather simple since it
only requires checking, for each entry, the GPS coordinates of the
start of the trip and the pickup datetime. Performing this task in
a sequential may take a rather large amount of time since the number
of entries, for a single CSV file, is rather large. In addition, special
care has to be taken when reading the file since a 2GByte file may
not fully fit into the computer's memory. 

We may take advantage of the parallelization capabilities in order
to reduce the processing time. The idea is to divide the input data
into chunks so that each engine takes care of classifying the entries
of their corresponding chunks. We propose here an approach which 
is based on implementing a producer-consumer paradigm
in order to distribute the tasks. The producer, associated to the
client, reads the chunks from disc and distributes them among the
engines using a round robin technique. No explicit `map` function
is used in this case. Rather, we simulate the behavior of the `map`
function in order to have fine control of the parallel problem. Recall
that each engine is an independent process. Since we assign different
tasks to each engine, the operating system will try to execute each
engine on a different process.

For further deails, please see corresponding chapter in the book.

## The source code

We begin by initializing the engines:

In [None]:
%reset -f

from IPython import parallel
from itertools import islice
from itertools import cycle
from collections import Counter
import sys
import time

#Connect to the Ipython cluster    
engines = parallel.Client()

#Create a DirectView to all engines
dview = engines.direct_view()

print "The number of engines in the cluster is: " + str(len(engines.ids))

We next declare the functions that will be executed on the engines. We do this thanks to the `%%px` parallel magic command.

In [None]:
%%px

# The %%px magic executes the code of this cell on each engine.

from datetime import datetime
from collections import Counter

import pandas as pd
import numpy as np

# A Counter object to store engine's local result
local_total = Counter();

def dist(p0, p1):
    "Returns the distance**2 between two points"
    # We compute the squared distance. Since we only want to compare
    # distances there is no need to compute the square root (sqrt) 
    return (p0[0] - p1[0])**2 + (p0[1] - p1[1])**2

# Coordinates (latitude, longitude) of diferent points of the island
district_dict = { 
    'Financial': [40.724863, -73.994718], 
    'Midtown': [40.755905, -73.984997],
    'Chinatown': [40.716224, -73.995925],
    'WTC': [40.711724, -74.012888],
    'Harlem': [40.810469, -73.943318],
    'Uppertown': [40.826381, -73.943964],
    'Soho': [40.723783, -74.001237],
    'UpperEastSide': [40.773861, -73.956329],
    'UpperWestSide': [40.787347, -73.975267]
    }

# Computes the distance to each district center and obtains the one that
# gives minimum distance
def get_district(coors):
    "Given a coordinate inn latitude and longitude, returns the district in Manhatan"   
    #If dist^2 is bigger than 0.0005, the district is 'None'.
    dist_min = 0.0005
    district = None
    for key in district_dict.iterkeys():
        d = dist(coors, district_dict[key])
        if dist_min > d:
            dist_min = d
            district = key
    return district

def is_morning(d):
    "Given a datetime, returns if it was on morning or not"
    h = datetime.strptime(d, "%Y-%m-%d %H:%M:%S").hour
    return 0 <= h and h < 12

def is_weekend(d):
    "Given a datetime, returns if it was on weekend or not"
    wday = datetime.strptime(d, "%Y-%m-%d %H:%M:%S").weekday() #strptime transforms str to date
    return 4 < wday <= 6

#Function that classifies a single data
def classify(x):
    "Given a tuple with a datetime, latitude and longitude, returns the group where it fits"
    date, lat, lon = x
    latitude = float(lat)
    longitude = float(lon)
    return is_weekend(date), is_morning(date), get_district([latitude, longitude])

# Function that given a dictionary (data), applies classify function on each element
# and returns an histogram in a Counter object
def process(b):
    #Recives a block (list of strings) and updates result in global var local_total()
    global local_total
    
    #Create an empty df. Preallocate the space we need by providing the index (number of rows)
    df = pd.DataFrame(index=np.arange(0,len(b)), columns=('datetime','latitude','longitude'))
    
    # Data is a list of lines, containing datetime at col 5 and latitude at row 11.
    # Allocate in the dataFrame the datetime and latitude and longitude dor each line in data
    count = 0
    for line in b:
        elements = line.split(",")
        df.loc[count] = elements[5], elements[11], elements[10]
        count += 1
        
    #Delete NaN values from de DF
    df.dropna(thresh=(len(df.columns) - 1), axis=0)
    
    #Apply classify function to the dataFrame
    cdf = df.apply(classify, axis=1)
    
    #Increment the global variable local_total
    local_total += Counter(cdf.value_counts().to_dict())

# Initialization function
def init():
    #Reset total var
    global local_total
    local_total = Counter()


Next we show the code executed by the client. The next code performs the next task

+ It reads a chunk of `lines_per_block` lines form the file. The chunk is assigned to an engine which performs the classification. The result of the classification is updated on a local variable on each engine. This process is repeated until all chunks have been processed by the engines.
+ Once finished, the client retrieves the local variable of each engine and computes the overall result.

This is the principle of the **MapReduce** programming model: a MapReduce program is composed of a Map() procedure that performs filtering and sorting (such as counting the number of times each word appears in a file) and a Reduce() procedure that performs a summary operation (that is, taking each of the results and computing the overall result).

In [None]:
# This is the main code executed on the client
t0 = time.time() 

#File to be processed
filename = 'trip_data.csv'

def get_chunk(f,N):
    """ Returns blocks of nl lines from the file descriptor fd"""
    #Deletes first line on first chunk (header line)
    first = 1
    while True:
        new_chunk = list(islice(f, first, N))
        if not new_chunk:
            break
        first = 0
        yield new_chunk

# A simple counter to verify execution
chunk_n = 0

# Number of lines to be sent to each engine at a time. Use carefully!
lines_per_block = 20

# Create an emty list of async tasks. One element for each engine
async_tasks = [None] * len(engines.ids)

# Cycle Object to get an infinite iterator over the list of engines
c_engines = cycle(engines.ids)

# Initialize each engine. Observe that the execute is performed
# in a non-blocking fashion.
for i in engines.ids:
    async_tasks[i] = engines[i].execute('init()', block=False)

# The variable to store results
global_result = Counter()

# Open the file in ReadOnly mode
try:
    f = open(filename, 'r') #iterable
except IOError:
    sys.exit("Could not open input file!")

# Used to show the progress
print "Beginning to send chunks"
sys.stdout.flush()

# While the generator returns new chunk, sent them to the engines
for new_chunk in get_chunk(f,lines_per_block):
    
    #After the first loop, first_chunk is False. 
    first_chunk = False
    
    #Decide the engine to be used to classify the new chunk
    run_engine = c_engines.next()
    
    # Wait until the engine is ready
    while ( not async_tasks[run_engine].ready() ):
        time.sleep(1)
    
    #Send data to the assigned engine.
    mydict = dict(data = new_chunk)
    
    # The data is sent to the engine in blocking mode. The push function does not return
    # until the engine has received the data. 
    engines[run_engine].push(mydict,block=True)

    # We execute the classification task on the engine. Observe that the task is executed
    # in non-blocking mode. Thus the execute function reurns immediately. 
    async_tasks[run_engine] = engines[run_engine].execute('process(data)', block=False)
    
    # Increase the counter    
    chunk_n += 1

    # Update the progress
    if chunk_n % 1000 == 0:
        print "Chunks sent until this moment: " + str(chunk_n)
        sys.stdout.flush()

print "All chunks have been sent"
sys.stdout.flush()
# Get the results from each engine and accumulate in global_result
for engine in engines.ids:
    # Be sure that all async tasks are finished
    while ( not async_tasks[engine].ready() ):
        time.sleep(1)
    global_result += engines[engine].pull('local_total', block=True)

#Close the file
f.close()

print "Total number of chunks processed: " + str(chunk_n)
print "---------------------------------------------"
print "Agregated dictionary"
print "---------------------------------------------"
print dict(global_result)

print "Time in seconds: ", time.time() - t0
sys.stdout.flush()

The results of the experiments performed with this code can be seen in the corresponding chapter of the book.