# Porting Existing Code to Distributed Computing

Data scientists run into a scenario where they have port existing Pandas code to Spark or Dask. Either they start with a small data project, and then move it to a larger dataset to run on production, or they have existing code and programs that are struggling to scale. The limitation of Pandas are well documented:

The primary reason is that pandas is single core, and does not take advantage of all available compute resources. A lot of operations also generate [intermediate copies](https://pandas.pydata.org/pandas-docs/stable/user_guide/scale.html#scaling-to-large-datasets) of data, utilizing more memory than necessary. To effectively handle data with pandas, users preferably need to have [5x to 10x times](https://wesmckinney.com/blog/apache-arrow-pandas-internals/) as much RAM as the size of the dataset.

Spark and Dask allow us to split compute jobs across multiple machines. They also can handle datasets that don’t fit into memory by [spilling data](http://distributed.dask.org/en/latest/worker.html#spill-data-to-disk) over to disk in some cases.

## Current Approaches

### Vertical Scaling

The most frequent thing to do is to scale the compute vertically so that there is no re-write of the code needed. Instead of running everything on a 16 GB VM, we can run it on a 32 GB VM. When that isn't good enough anymore, we can run the program on a 64 GB RAM. The problem with this is that it is often not a good use of resources for the following reasons:

1. You likely only need more compute resources for one step out of many. For example, if the dataset is reduced already because Machine Learning Modelling, then you don't need a big VM during the modelling step.
2. Scaling vertically does not automatically mean complete utilization of CPUs. A lot of people frequently scale the underlying virtual machine, but don't introduce parallelism so other cores are not utilized.

### DIY Parallelism

It's also common for people to introduce their own form of parallelism. A common example of this is sharding a CSV or parquet file into several, and then spinning up a Python process for each one by using the multiprocessing library. As an example code snippet:

```python
def logic(file_name):
    shard = pd.read_csv(file_name)
    result = do_something(shard)
    result.to_csv(f"processed-{file_name}")
    return

import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = executor.map(logic, files)
    concurrent.futures.wait(futures)
```

The DIY Parallelism approach can have issues though. The most common issue is resource contention because the memory and CPU consumption of these threads is not fixed. [This is an example StackOverflow post.](https://stackoverflow.com/questions/71151809/python-processpoolexecutor-memory-problems). Basically you have to chunk this yourself. Memory management falls on the user. But how does each process know the overall consumption?

Second, this assumes that everything is on the same compute, but there is room to do things better if we are open to scaling things out.

## Distributed Computing Frameworks

<Insert Spark and Dask logos (and Ray)>

This brings us to distrubted computing frameworks. Distrubted just means we are scaling out over a cluster so the data lives in multiple machines. The [Dask machine learning documentation](https://ml.dask.org/) shows us what the dimensions of scale are. There are compute bound problems and memory bound problems. 

<img src="https://ml.dask.org/_images/dimensions_of_scale.svg" align="middle" width="700"/>


Distributed computing frameworks such as Spark and Dask scale out to a cluster of machines.

There is an image in the Dask repo [issues](https://github.com/dask/dask/issues/4471) that clearly illustrates the distributed computing paradigm. In general, there is a client or master that takes care of the orchestration and final data collection. The client is responsible for scheduling tasks among workers.

Both Spark and Dask have local modes also where they use the cores available on the local machine. This means we can still take advantage of the additional processing without having a cluster available.

In the diagram below, note how:
- package versions and serialization
- reading in files can be optimized
- data actually lives on a physical machine

<img src="https://user-images.githubusercontent.com/11656932/62263986-bbba2f00-b3e3-11e9-9b5c-8446ba4efcf9.png" align="left" width="700"/>

## Introductions to Partitions

In order to understand partitions, we can look at this image showing the way Dask scales Pandas. Each partition is a Pandas DataFrame. A Dask DataFrame is the collection of all of the Pandas DataFrames. Operations are done on each partition, and then aggregated back.

<img src="https://docs.dask.org/en/latest/_images/dask-dataframe.svg" align="center" width="400"/>

## Inconsistencies of Pandas and Spark

One of the first issues is the inconsistencies between Pandas and Spark. Below is a summary of differences.

![img](https://miro.medium.com/max/1400/0*fv0FKyt3jB0ehVrU)

### Setup

In [8]:
import pandas as pd
from pyspark.sql import SparkSession
import dask.dataframe as dd

spark = SparkSession.builder.getOrCreate()

df = pd.DataFrame({"col1": [None, None, "A", "A", "B", "B"], 
                   "col2": [1,2,3,4,5,6]})
ddf = dd.from_pandas(df, npartitions=2)
sdf = spark.createDataFrame(df)

22/08/09 02:24:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Groupby

**Pandas**

In [11]:
df.groupby("col1")["col2"].mean()

col1
A    3.5
B    5.5
Name: col2, dtype: float64

**Dask**

This is consistent with Pandas in dropping the NULL values.

In [9]:
ddf.groupby("col1")["col2"].mean().compute()

col1
A    3.5
B    5.5
Name: col2, dtype: float64

**Spark**

In [6]:
sdf.groupBy("col1").mean("col2").show()

                                                                                

+----+---------+
|col1|avg(col2)|
+----+---------+
|null|      1.5|
|   B|      5.5|
|   A|      3.5|
+----+---------+



**Additional Note**

For those wondering, you can make this consistent with `dropna=False`

In [10]:
df.groupby("col1", dropna=False)["col2"].mean()

col1
A      3.5
B      5.5
NaN    1.5
Name: col2, dtype: float64

### Sorting

**Pandas**

In [12]:
df.sort_values(["col1", "col2"])

Unnamed: 0,col1,col2
2,A,3
3,A,4
4,B,5
5,B,6
0,,1
1,,2


Dask

In [14]:
try:
    ddf.sort_values(["col1", "col2"])
except Exception as e:
    print(e)

Dataframes only support sorting by named columns which must be passed as a string or a list of strings; multi-partition dataframes only support sorting by a single column.
You passed ['col1', 'col2']


Spark

In [17]:
sdf.orderBy(["col1", "col2"]).show()

+----+----+
|col1|col2|
+----+----+
|null|   1|
|null|   2|
|   A|   3|
|   A|   4|
|   B|   5|
|   B|   6|
+----+----+



In [20]:
sdf.orderBy(["col1", "col2"], ascending=[False,True]).show()

+----+----+
|col1|col2|
+----+----+
|   B|   5|
|   B|   6|
|   A|   3|
|   A|   4|
|null|   1|
|null|   2|
+----+----+



**Additional Note**

Pandas has an argument called the `na_position` which lets you decide where to place NA values when sorting. Pandas uses NA first or NA last while Spark uses `None` as smallest value.

In [22]:
df.sort_values(["col1", "col2"], na_position="first")

Unnamed: 0,col1,col2
0,,1
1,,2
2,A,3
3,A,4
4,B,5
5,B,6


## Pitfalls of Distributed Computing

### Inefficient Partitioning

In [19]:
from time import sleep
import numpy as np
import pandas as pd
import dask.dataframe as dd

def delay(df:pd.DataFrame) -> pd.DataFrame:
    sleep(df.shape[0]*3)
    return df.assign(b=df.shape[0])

pdf = pd.DataFrame(range(8), columns=["a"])
pdf

Unnamed: 0,a
0,0
1,1
2,2
3,3
4,4
5,5
6,6
7,7


In [10]:
%%time
ddf = dd.from_pandas(pdf, npartitions=4)
ddf.map_partitions(delay, meta={"a":"int32","b":"int32"}).compute()

CPU times: user 16.2 ms, sys: 5.59 ms, total: 21.8 ms
Wall time: 6.02 s


Unnamed: 0,a,b
0,0,2
1,1,2
2,2,2
3,3,2
4,4,2
5,5,2
6,6,2
7,7,2


In [20]:
%%time
pdf = pd.DataFrame(range(4), columns=["a"])
ddf = dd.from_pandas(pdf, npartitions=4)
ddf.map_partitions(delay, meta={"a":"int32","b":"int32"}).compute()

CPU times: user 13.2 ms, sys: 2.78 ms, total: 16 ms
Wall time: 6.01 s


Unnamed: 0,a,b
0,0,1
1,1,1
2,2,2
3,3,2


### Lineage and Persisting

In [13]:
%%time
def gen_data(df: pd.DataFrame) -> pd.DataFrame:
    sleep(df.shape[0]*3)
    return df.assign(b=np.random.random((df.shape[0], 1)))

pdf = pd.DataFrame([[0],[1],[2],[3],[4],[5],[6],[7]], columns=["a"])
result = gen_data(pdf)
print(result)
print(result.head(3))

   a         b
0  0  0.326501
1  1  0.651844
2  2  0.450825
3  3  0.746542
4  4  0.065295
5  5  0.863471
6  6  0.389724
7  7  0.346177
   a         b
0  0  0.326501
1  1  0.651844
2  2  0.450825
CPU times: user 12.8 ms, sys: 4.94 ms, total: 17.7 ms
Wall time: 24 s


In [15]:
%%time
ddf = dd.from_pandas(pdf, npartitions=4)
result = ddf.map_partitions(gen_data, meta={"a": "int32", "b":"i8"})
print(result.compute())
print(result.head(2))

   a         b
0  0  0.055334
1  1  0.910891
2  2  0.489611
3  3  0.019611
4  4  0.073312
5  5  0.810812
6  6  0.812645
7  7  0.399692
   a         b
0  0  0.276690
1  1  0.846562
CPU times: user 23.4 ms, sys: 5.56 ms, total: 28.9 ms
Wall time: 12 s


In [16]:
%%time
ddf = dd.from_pandas(pdf, npartitions=4)
result = ddf.map_partitions(gen_data, meta={"a": "int32", "b":"i8"})
print(result.compute())
print(result.head(2))

   a         b
0  0  0.618964
1  1  0.302633
2  2  0.396614
3  3  0.268905
4  4  0.919634
5  5  0.057299
6  6  0.708261
7  7  0.199170
   a         b
0  0  0.278573
1  1  0.445568
CPU times: user 20.1 ms, sys: 4.35 ms, total: 24.5 ms
Wall time: 12 s


### Schema Inference

In [21]:
def add_col(df):
    if df["a"].iloc[0] == 7:
        return df.assign(b=None)
    else:
        return df.assign(b=1)
    
pdf = pd.DataFrame(range(8), columns=["a"])
pdf

Unnamed: 0,a
0,0
1,1
2,2
3,3
4,4
5,5
6,6
7,7


In [22]:
pdf.groupby("a").apply(add_col)

Unnamed: 0,a,b
0,0,1.0
1,1,1.0
2,2,1.0
3,3,1.0
4,4,1.0
5,5,1.0
6,6,1.0
7,7,


In [23]:
pdf.groupby("a").apply(add_col).dtypes

a     int64
b    object
dtype: object

In [24]:
ddf = dd.from_pandas(pdf, npartitions=2)
ddf.groupby("a").apply(add_col).dtypes

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  ddf.groupby("a").apply(add_col).dtypes


a    int64
b    int64
dtype: object

**Double execution time**

In [25]:
%%time
def add_col_2(df):
    if df["a"].iloc[0] == 1:
        sleep(5)
    return df.assign(b=1)

ddf.groupby("a").apply(add_col_2).dtypes

CPU times: user 20.1 ms, sys: 2.36 ms, total: 22.5 ms
Wall time: 5.02 s


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


a    int64
b    int64
dtype: object

In [26]:
%%time
ddf.groupby("a").apply(add_col_2).compute()

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


CPU times: user 45.5 ms, sys: 11.2 ms, total: 56.7 ms
Wall time: 10.1 s


Unnamed: 0,a,b
1,1,1
4,4,1
5,5,1
6,6,1
7,7,1
0,0,1
2,2,1
3,3,1


## Pandas-like Frameworks

Pandas-like frameworks offer us the promise of changing the import statement to parallelize our code.

* [Koalas](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) is a way to use the Spark engine with the Pandas interface. This was renamed to PySpark Pandas in PySpark 3.2
* [Modin](https://modin.readthedocs.io/en/stable/) is a way to use Dask or Ray with the Pandas interface

<img src="img/modin_spark.png" align="center" width="800"/>



## Behavior Differences of Pandas-like Frameworks

In [1]:
import os
os.environ["MODIN_ENGINE"] = "ray"

from distributed import Client
Client(processes=True)

import ray
ray.init()

import pandas as pd
import pyspark.pandas as ks
import modin.pandas as mpd
import dask.dataframe as dd
import numpy as np

n = 1000000
colors = ["red", "green", "blue"]
size = ["M", "L"]

df = pd.DataFrame({"a": np.random.choice(colors, n),
                   "b": np.random.choice(size, n),
                   "c": np.random.random(n),
                   "d": np.random.randint(1,1000, n)})

kdf = ks.DataFrame(df)

mdf = mpd.DataFrame(df)

ddf = dd.from_pandas(df, npartitions=2)


2022-08-09 13:03:55,240	INFO services.py:1456 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/09 13:04:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Index Location

In [2]:
from typing import Any
import time

def loc(df: Any) -> None:
    times = []
    times.append(time.time())
    # case 1
    df.head(10)[["c","d"]]
    times.append(time.time())
    # case 2
    df.tail(10)[["c","d"]]
    times.append(time.time())
    # case 3
    df.iloc[:10, [2,3]]
    times.append(time.time())
    # case 4
    df.iloc[-10:, [2,3]]
    times.append(time.time())
    # case 5
    df.iloc[499995:500005, [2,3]]
    times.append(time.time())

    x = np.diff(times)
    x_scaled = [_/x[0] for _ in x]
    print(x_scaled)
    return 

**Pandas**

In [3]:
loc(df)

[1.0, 0.6335877862595419, 0.22781020805268673, 0.1556653195629397, 0.162550516389762]


**Modin**

In [4]:
loc(mdf)

[1.0, 0.3263088574250905, 0.13221779866495045, 0.10183494477205376, 0.21217110186487861]


**Koalas**

The performance profile is all over the place

In [5]:
loc(kdf, "spark")

22/08/09 13:04:22 WARN TaskSetManager: Stage 0 contains a task of very large size (3530 KiB). The maximum recommended task size is 1000 KiB.
22/08/09 13:04:24 WARN TaskSetManager: Stage 1 contains a task of very large size (3386 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[1.0, 18.62831076738706, 0.6191270844135079, 12.321858904105335, 0.7907172773081067]


**Dask**

Dask does not support this operation.

In [6]:
try:
    loc(ddf)
except Exception as e:
    print(e)

'DataFrame.iloc' only supports selecting columns. It must be used like 'df.iloc[:, column_indexer]'.


## Behavior Differences of Pandas-like Frameworks

## Introducing Fugue