# Parallel processing
## Lecture objectives

1. Demonstrate how to make use of multiple cores on your computer, through running processes in parallel
2. Introduce (conceptually) the MapReduce framework

## How to parallelize locally
Even basic laptops usually come with multiple cores. Some functions, like the `scikit-learn` machine learning models, have a parameter that you can set to use more than one core (e.g. `n_jobs=-1` in `RandomForestClassifier`). 

Postgres and other databases can also make use of multiple cores to do CPU-intensive operations such as spatial joins. They also write intermediate results to disk so help with memory too (a SSD helps). We'll look at databases in the next lecture.

In other cases, you'll need to do the parallelization yourself. Python helps through the `multiprocessing` library. The key is to set up discrete *functions* that can then be run in parallel on different subsets of the data.

Let's recreate the example we used in Module 4, to calculate the distance from every census tract to the *third* closest food pantry. As a reminder, this is a common measure of accessibility. See the lecture, *Advanced spatial joins*, for a refresher.

In [None]:
import pygris
import requests
import json
import geopandas as gpd
import pandas as pd

# get the census tract boundaries for LA
tracts = pygris.tracts(state='06',county='037', year=2019)

# get the food pantries
pantryDf = pd.read_csv('../data/Food_Resources_in_California.csv')
pantryDf = pantryDf[pantryDf.County=='Los Angeles']
pantries = gpd.GeoDataFrame(
    pantryDf, geometry=gpd.points_from_xy(pantryDf.Longitude, pantryDf.Latitude, 
                                          crs='EPSG:4326'))
# project both to state plane
pantries.to_crs('EPSG:3497', inplace=True)
tracts.to_crs('EPSG:3497', inplace=True)
pantries.info()

Let's slow things down by making 100 duplicates of the `pantries` geodataframe. 

Here, we use a list comprehension to create a list of 50 duplicates. Then, we use `pd.concat()` to stack (concatenate) them into a single geodataframe.

In [None]:
pantries = pd.concat([pantries for i in range(100)]).reset_index()
pantries.info()

In [None]:
# this is the function that we had before to return the distance of the closest pantry to each tract
def get_3rd_closest_dist(geom):
    distances = pantries.distance(geom)
    third_closest = distances.sort_values().iloc[2]
    return third_closest

%time tracts['dist_third_closest'] = tracts.geometry.apply(get_3rd_closest_dist)
tracts.head()

That was pretty fast, but again, think of a situation when we are doing this for (say) all census tracts and all grocery stores in the country.

Let's see if parallel processing can work. Don't worry too much the syntax - there are lots of examples online if and when you need to use this. But I want you to understand it conceptually.

One complication of multiprocessing in Jupyter Lab is that you have to import the function from disk. You can't define it in a cell. So I saved `parallelization_example.py` in the repository, which contains the function above. It's the same as above, except there is a second argument, `pantries`. Basically, a multiprocessed function has to be self-contained - you have to pass all the arguments that it will need, rather than relying on them being available as global variables.

In [None]:
%%time
import multiprocessing as mp
from functools import partial

# import the function we saved to disk
from parallelization_example import get_3rd_closest_dist

# use 4 cores
with mp.Pool(processes=4) as pool:
    # this splits the function up, so that pantries is always passed
    # see https://stackoverflow.com/questions/15331726/how-does-functools-partial-do-what-it-does
    func = partial(get_3rd_closest_dist, pantries)
    # now we pass the geometries to our function, but split up over multiple cores
    results = pool.map(func, tracts.geometry)

Quite a nice speed up. Note that there is some overhead, so using 4 cores only means a ~67% speedup in this case.

What's happening? Each row of `tracts` is being passed to a pool. As soon as one of the 4 cores is free, it accepts the next job from the pool.

Multiprocessing works when *none of the jobs are dependent on each other*. It's hard to pass information between different jobs in the pool.

But how do we access our results? Let's look at the `results` object that we created.

In [None]:
type(results)

In [None]:
# look at the first few elements
results[:5]

In [None]:
len(results)

This is the same length as our original `tracts` dataframe. We had 749 geometries to process, and our function returned a distance from each one. So we can put the results back into our dataframe.

In [None]:
tracts['dist_third_closest'] = results
tracts.head()

Let's check - yes, this is the same result as we got without parallelization (see above).

## MapReduce
MapReduce is a related concept to parallel processing. [See the details of the algorithm here.](https://en.wikipedia.org/wiki/MapReduce) In short, there is a *map* stage (filtering and sorting), and a *reduce* stage (reducing the input data to something smaller). Each of the mapper and reducer nodes can run in parallel.

A canonical example is word counting.

![mapreduce](https://upload.wikimedia.org/wikipedia/commons/0/03/WordCountFlow.JPG)

Map Reduce is normally done in the cloud (there are specialist tools like Hadoop), but you could do it on a local machine in Python as well. Conceptually, think about what are the discrete, independent chunks that you can split your tasks into.

## Outgrowing your local machine
Almost all of my own data science work is run locally on my office computer. Currently, I have a 2018 Mac Mini, and my previous desktop was from 2008, so it's not like you need a lot of computing power to do most tasks. You might need a large external hard drive.

Sometimes it takes a few days to a week, running in the background. Sometimes I have to parallelize, or be judicious about data types. But for me, it's often more convenient than uploading to (and paying for) a cloud service.

For some jobs, however, you will outgrow your laptop. We won't say much about other options in this course, but in general there are three options:
* Beg, steal, borrow. Can you get access to a computer in the GIS lab to run over the weekend or holidays?
* UCLA's Hoffman2 cluster. You can run pretty much any Python code as a "batch job," meaning that your job will be queued until there are resources available for it to run. [See the details here.](https://www.hoffman2.idre.ucla.edu) 
* Cloud-based Python services like [DeepNote](https://deepnote.com/education) (basic plan free for educational users) or Python Anywhere. The amount of memory and compute power varies, but generally you pay for what you use.
* More general cloud services like AWS, Google Cloud, Microsoft Azure, etc. These let you run Python and pretty much any other open-source tool, including databases.

These options are changing all the time, so keep an eye out for new firms and services that are being offered.

<div class="alert alert-block alert-info">
<h3>Key Takeaways</h3>
<ul>
  <li>Parallel processing works when you are applying an identical function to many different inputs - census tracts, PDF files, large dataframes, etc. There cannot be interdependencies.</li>
  <li>You will normally be able to run any dataset on a standard laptop by using these tools, but the UCLA Hoffman2 cluster and cloud service providers are there if you need them.</li>
</ul>
</div>