# How to easily parallelize your code in python with Dask

## Importing essential libraries for tutorial

In [None]:
import os
import pickle
import gzip
import shutil
import psutil
from collections import Counter
from Bio import SeqIO
from dask.distributed import Client, LocalCluster, progress
from fuzzysearch import find_near_matches

## Creating workspace and downloading dataset

In [None]:
os.makedirs("DaskWorkingSpace", exist_ok = True)
os.chdir("DaskWorkingSpace")

We will use curl, to be OS agnostic (works on Windows, Linux and Mac)

In [None]:
!curl http://ftp.ebi.ac.uk/ensemblgenomes/pub/release-51/plants/fasta/arabidopsis_thaliana/dna/Arabidopsis_thaliana.TAIR10.dna.toplevel.fa.gz --output athaliana.fa.gz

## Ungziping the dataset

It's really easy with Python standard library

In [None]:
with gzip.open('athaliana.fa.gz', 'rb') as f_in:
    with open('athaliana.fa', 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)

## Parsing dataset with Biopython

In [None]:
sequences = list(SeqIO.parse("athaliana.fa", "fasta"))

Dataset contains 7 records, each is a set of DNA alphabet

In [None]:
len(sequences)

In [None]:
sequences[0]

Our task is to count <b> how many unique nucleotides are within each record </b>

## Defining function

In [None]:
def CountBases(record):
    header = record.description
    sequence = record.seq.__str__()
    c = Counter(sequence)
    result = (header,c)
    return result

## Single process approach

In [None]:
%%time
results_count = []
for record in sequences:
    result = CountBases(record)
    results_count.append(result)

## Multi process approach

### First we need to set up dask distributed LocalCluster and connect Client to it (for workers to communicate)
<b> We want to use all of our systems power </b>

In [None]:
nproc = psutil.cpu_count()
print(nproc)

<b> If your CPU count is exceeding number of records within `sequences` list, set the nproc to the length of it (to be more kosher) </b>

In [None]:
if nproc > len(sequences):
    nproc = len(sequences)

<b> Finally we define the cluster object </b>

In [None]:
cluster = LocalCluster(n_workers = nproc)

<b> Now we can connect our client to the cluster </b>

In [None]:
client = Client(cluster)

In [None]:
client

<b> Scheduler automatically chooses its port, but the best thing is that at 127.0.0.1:8787 you'll find dashboard on which you can see how well your computation is going (if you are using defaults) </b>

### And run parallelized computation with the same function as above

In [None]:
%%time
futures = client.map(CountBases, sequences)
progress(futures) #the progress widget, more informative within pure python scripts
results_count_parallel = client.gather(futures)

In [None]:
results_count == results_count_parallel

In [None]:
print(results_count_parallel)
del results_count
del results_count_parallel

<b> It is very good practice to delete ram residuals by method `cancel` of `Client` </b>

```
    client.cancel(futures)
```

You can also do it by restarting whole Client (which is not recommended), but for the educational purpose we will do that (to have clean state of scheduler that does not have a clue about data)

In [None]:
client.restart()

## Now let's do it again on less trivial task of pattern searching
We do not want to exceed the memory on Your machines, so we will use only 1/8 of each record

In [None]:
def FindPattern(record, pattern, subs, ins, dels):
    header = record.description
    sequence = record.seq.__str__()
    matches = find_near_matches(pattern, sequence, max_substitutions = subs, max_insertions = ins, max_deletions = dels)
    return (header,matches)

### Let's define a preset of function arguments

In [None]:
pattern = "TGATTTGGATGATTCAAGACTTCTCGGTACTGCA"
subs = 1
ins = 1
dels = 1

### Single process

In [None]:
%%time
results_fuzz = []
for record in sequences:
    result = FindPattern(record, pattern, subs, ins, dels)
    results_fuzz.append(result)

### Multi process

#### This time function takes multiple arguments, so we need to define helper function

In [None]:
def FuzzHelper(x):
    return FindPattern(x, pattern, subs, ins, dels)

In [None]:
%%time
futures = client.map(FuzzHelper, sequences)
progress(futures)
results_fuzz_parallel = client.gather(futures)

In [None]:
results_fuzz == results_fuzz_parallel

In [None]:
print(results_fuzz_parallel)

In [None]:
client.restart()

## Working with data chunks and I/O streaming

In [None]:
def SplitByteChunks(records):
    os.makedirs("tmp", exist_ok = True)
    chunkpaths = []
    for i in range(len(records)):
        header = records[i].description
        sequence = records[i].seq.__str__()
        chunk = (header, sequence)
        fpath = os.path.join("tmp", f"chunk_{i}.pickle")
        with open(fpath, "wb") as handle:
            pickle.dump(chunk, handle)
        chunkpaths.append(fpath)
    return chunkpaths

In [None]:
pickles = SplitByteChunks(sequences)
print(pickles)

In [None]:
def FindPatternPickle(fpath, pattern, subs, ins, dels):
    with open(fpath, "rb") as handle:
        record = pickle.load(handle)
    header, sequence = record
    matches = find_near_matches(pattern, sequence, max_substitutions = subs, max_insertions = ins, max_deletions = dels)
    return (header,matches)

In [None]:
def PickleHelper(x):
    return FindPatternPickle(x, pattern, subs, ins, dels)

In [None]:
%%time
futures = client.map(PickleHelper, pickles)
progress(futures)
results_pickle = client.gather(futures)

In [None]:
results_fuzz == results_fuzz_parallel == results_pickle

# Dask docs
- https://dask.org/ (Main page)
- http://distributed.dask.org/en/latest/ (Dask Distributed)
- https://docs.dask.org/en/latest/dataframe.html (Dask DataFrame)
- https://docs.dask.org/en/latest/bag.html (Dask Bag)
- https://ml.dask.org/ (Dask Machine Learning)