# Dask tutorial

## Dask delayed

### Basics 

Sometimes we face problems that are parallelizable, but don’t fit into high-level abstractions like Dask Array or Dask DataFrame. Consider the following example:

In [41]:
from dask import delayed

In [29]:
def inc(x):
    return x + 1

def double(x):
    return x * 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)
total

50

There is clearly **parallelism** in this problem - many of the inc, double, and add functions can evaluate independently, but it’s not clear how to convert this to a big array or big DataFrame computation.

As written, this code runs sequentially in a single thread. However,  a lot of this could be executed in parallel.

The Dask delayed function decorates  functions so that they operate lazily. Rather than executing your function immediately, it will defer execution, placing the function and its arguments into a task graph.

In [30]:
import dask

output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)
    output.append(c)

total = dask.delayed(sum)(output)
total

Delayed('sum-0dca6a9e-429a-4dbd-88f2-3c239e5e6831')

We used the `dask.delayed` function to wrap the function calls that we want to turn into tasks. None of the inc, double, add, or sum calls have happened yet. Instead, the object total is a Delayed result that contains a task graph of the entire computation. Looking at the graph we see clear opportunities for parallel execution. 

`total.visualize()`
to visualizate graph you should have Graphviz. I have some comlication with instalation. So you can just uncomment following line or take a look at the graph in the presentation. Sorry for inconvinience. 

In [25]:
#total.visualize()

In [31]:
total.compute()

50

### Decorator

Delayed function can be used as a decorator. Here is a reproduction of our original problem as a parallel code:

In [32]:
import dask

@dask.delayed
def inc(x):
    return x + 1

@dask.delayed
def double(x):
    return x * 2

@dask.delayed
def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = dask.delayed(sum)(output)

In [35]:
from time import sleep

### Parallelize a for loop

`for loops` are one of the most common things that we want to parallelize. Use `dask.delayed` on inc and sum to parallelize the computation below:

In [37]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [38]:
%%time
# Sequential code

results = []
for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)

Wall time: 3 ms


In [39]:
total

Delayed('add-858b337652043b59a3d54832a961cf97')

In [42]:
%%time
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)

total.compute()

Wall time: 52.5 ms


44

More relevant examples you can find by following links:
* https://github.com/dask/dask-tutorial/blob/main/01_dask.delayed.ipynb
* https://docs.dask.org/en/latest/delayed.html

## Dask future

Dask futures provide fine-grained real-time execution for custom situations. This is the foundation for other APIs like Dask arrays and dataframes.
Unlike for arrays and dataframes, you need the Dask client to use the Futures interface.
This interface is good for task scheduling like `dask.delayed`, but is immediate rather than lazy, which provides some more flexibility in situations where the computations may evolve over time.


### Start Dask Client

First of all, we need start a Client to use the futures interface. This tracks state among the various worker processes or threads.

In [43]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 59059 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:59059/status,

0,1
Dashboard: http://127.0.0.1:59059/status,Workers: 1
Total threads: 4,Total memory: 3.90 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:59060,Workers: 1
Dashboard: http://127.0.0.1:59059/status,Total threads: 4
Started: Just now,Total memory: 3.90 GiB

0,1
Comm: tcp://127.0.0.1:59071,Total threads: 4
Dashboard: http://127.0.0.1:59072/status,Memory: 3.90 GiB
Nanny: tcp://127.0.0.1:59063,
Local directory: C:\Users\Дом\Desktop\jupyter\dask-worker-space\worker-wc__osv7,Local directory: C:\Users\Дом\Desktop\jupyter\dask-worker-space\worker-wc__osv7


`Client.submit(func, *args[, key, workers, ...])` - Submit a function application to the scheduler

`Client.map(func, *iterables[, key, workers, ...])` - Map a function on a sequence of arguments

`Future.result([timeout])` - Wait until computation completes, gather result to local process

*...*

More information about Client you can find by this link https://docs.dask.org/en/latest/futures.html

### Create simple functions

These functions do simple operations like add two numbers together, but they sleep for a random amount of time to simulate real work.

In [44]:

import time
import random

def inc(x):
    time.sleep(random.random())
    return x + 1

def double(x):
    time.sleep(random.random())
    return 2 * x

def add(x, y):
    time.sleep(random.random())
    return x + y

We can run them locally


In [45]:
inc(1)

2

Or we can submit them to run remotely with Dask. This immediately returns a future that points to the ongoing computation, and eventually to the stored result.

In [46]:
future = client.submit(inc, 1)  # returns immediately with pending future
future

If you wait a second, and then check on the future again, you’ll see that it has finished.

In [47]:
future  # scheduler and client talk constantly

You can block on the computation and gather the result with the .result() method.

In [48]:
future.result()

2

### Chain dependencies

You can submit tasks on other futures. This will create a dependency between the inputs and outputs. Dask will track the execution of all tasks, ensuring that downstream tasks are run at the proper time and place and with the proper data.


In [49]:
x = client.submit(inc, 1)
y = client.submit(double, 2)
z = client.submit(add, x, y)
z

In [50]:
z.result()

6

Note that we never blocked on x or y nor did we ever have to move their data back to our notebook.

### Submit many tasks

So we’ve learned how to run Python functions remotely. This becomes useful when we add two things:
* We can submit thousands of tasks per second
* Tasks can depend on each other by consuming futures as inputs

We submit many tasks that depend on each other in a normal Python for loop.


In [52]:
zs = []

In [53]:
%%time

for i in range(256):
    x = client.submit(inc, i)     # x = inc(i)
    y = client.submit(double, x)  # y = inc(x)
    z = client.submit(add, x, y)  # z = inc(y)
    zs.append(z)

Wall time: 6.62 s


In [54]:
total = client.submit(sum, zs)

To make this go faster, add an additional workers with more cores - this is more practical when using an actual cluster

In [55]:
client.cluster.scale(10)  

### Custom computation: Tree summation

As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for loop and a bit of normal Python logic.

In [57]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        future = client.submit(add, L[i], L[i + 1])  # add neighbors
        new_L.append(future)
    L = new_L                                   # swap old list for new

If we watch the dashboard’s status page (http://localhost:8787/status), we can note two things:

* The red bars are for inter-worker communication. They happen as different workers need to combine their intermediate values

* There is lots of parallelism at the beginning but less towards the end as we reach the top of the tree where there is less work to do.

More informations about this example you can find by this link https://examples.dask.org/futures.html

## Dask Bag

Dask Bag implements operations like `map`, `filter`, `fold`, and `groupby` on collections of generic Python objects. It does this in parallel with a small memory footprint using Python iterators. It is similar to a parallel version of PyToolz or a Pythonic version of the PySpark RDD.

**Common Uses**

Dask bags are often used to parallelize simple computations on unstructured or semi-structured data like text data, log files, JSON records, or user defined Python objects.

**Execution**
Execution on bags provide two benefits:

* Parallel: data is split up, allowing multiple cores or machines to execute in parallel

* Iterating: data processes lazily, allowing smooth execution of larger-than-memory data, even on a single machine within a single partition

**Known Limitations**
Bags provide very general computation (any Python function). This generality comes at cost. Bags have the following known limitations:

* By default, they rely on the multiprocessing scheduler, which has its own set of known limitations (see Shared Memory)

* Bags are immutable and so you can not change individual elements

* Bag operations tend to be slower than array/DataFrame computations in the same way that standard Python containers tend to be slower than NumPy arrays and Pandas DataFrames

* Bag’s `groupby` is slow. You should try to use Bag’s `foldby` if possible. Using foldby requires more thought though

### Basics

In [68]:
import dask.bag as db

import sys

The usage of `dask.bag` API involves a list of steps which are commonly used to perform complicated computations in parallel which are as below:

* Create dask bag lazy object using `from_sequence()`, `from_delayed()` or `from_url()`.

Example: bag = dask.bag.from_sequence(range(1000000000))
* Perform list of operations like `map()`, `filter()`, `groupby()`, etc one by one on lazy dask bag object created from step 1.

Example: bag_final = bag.map(lambda x : x*x).filter(lambda x: x%2)
* Call `compute()` method on final bag object from step 2 which was created after calling all operations.

Example: bag_final.compute()

Now let's look at these steps.

### Step 1: Create Lazy bag Objects 

The `from_sequence()` method is commonly used as a starting point in converting a list of operations into dask compatible operations so that they run in parallel. The `from_sequence()` method accepts a list of values, iterators and converts it to a lazy dask bag object consisting of a list of values which will go input to the next methods called on it.

In [69]:
bag1 = db.from_sequence(range(1000))
bag1

dask.bag<from_sequence, npartitions=100>

By default `from_sequence()` will divide data into 100 partitions. We can also explicitly pass a number of values to keep in each partition as well as partition size by setting npartitions and partition_size parameter.

In [70]:
bag2 = db.from_sequence(range(1000000), partition_size=1000, npartitions=1000)
bag2

dask.bag<from_sequence, npartitions=1000>

We'll check size of bag object using sys.getsizeof() which returns the size of the object in bytes.

In [71]:
sys.getsizeof(bag1), sys.getsizeof(bag2)

(48, 48)

We can see that both have a size of 48 bytes even though input to both are different size lists.

### Step 2: Perform List of Operations on Lazy Bag Object from Step 1

We'll now apply a list of commonly available functions to perform various computations on the list of values. These methods will also generate another lazy dask bag object.

Below are list of commonly used operations:

* `bag_object.map(function)`: It'll apply function passed to `map` on all individual entry of bag_object.
* `bag_object.filter(condition)`: It'll check condition passed to `filter` on all individual entry of bag_object and only keep entries which satisfies condition.
* `bag_object.product(another_bag)`: It calculates cross product of both bags and creates another bag of that values.
* `bag_object.max()`: It returns maximum from list.
* `bag_object.min()`: It returns minimum from list.
* `bag_object.accumulate()`: It takes as input binary function which operates on two input values and returns one value. This value is given as input as first parameter in next iteration.
* `bag_object.count()`: It returns number of values in a bag object.
* `bag_object.sum()`: It returns sum of all values of list.
* `bag_object.std()`: It returns standard deviation.
* `bag_object.frequencies()`: - It returns frequency of each value in bag.
* `bag_object.groupby()`: - It groups all values in list based on some key specified. We can then perform operations on these grouped values.
* `bag_object.join()`: - It joins one list with another list based on key specified. It merges values where key matches.
* `bag_object.topk()`: - It joins one list with another list based on key specified. It merges values where key matches.

Please make a note that the above steps will also create another lazy bag object only. It'll only perform actual computation when we call `compute()` on the final bag object.


In [73]:
final_bag1 = bag1.map(lambda x: x*2)

final_bag1


dask.bag<lambda, npartitions=100>

In [74]:
final_bag2 = bag2.filter(lambda x: x%100 == 0)

final_bag2

dask.bag<filter-lambda, npartitions=1000>

### Step 3: Call `compute()` on Final Bag Object to Perform Computation in Parallel 

The final step to actually perform computation in parallel and return result is to call `compute()` method on the final bag object. We'll call compute() on both of our final objects created in the previous step.

In [75]:
len(final_bag1.compute())

1000

In [76]:
final_bag2.compute()[:10]

[0, 100, 200, 300, 400, 500, 600, 700, 800, 900]

We can evaluate bag objects from step 1 and it'll return the actual list. We can even directly call the method list passing bag object and it'll also return all values.

In [77]:
final_list = bag1.compute()

print("Size : %d bytes"%sys.getsizeof(final_list))
print("Length of Values : ", len(final_list))

Size : 8856 bytes
Length of Values :  1000


In [78]:
list(bag1)[:10]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

### Performing All Steps Together

`map()` & `filter()` Usage

Below created one simple example which loops through 10Mn numbers takes the square of each and only keeps number which is divisible by 100. We have first implemented it with a loop in pure python and then converted it to the dask version.

In [79]:
final_list = []

for i in range(1000000):
    x = i*2
    if x%100 == 0:
        final_list.append(x)

final_list[:10]

[0, 100, 200, 300, 400, 500, 600, 700, 800, 900]

In [80]:
bag1 = db.from_sequence(range(1000000))

result = bag1.map(lambda x: x*2).filter(lambda x : x%100 == 0)

result.compute()[:10]

[0, 100, 200, 300, 400, 500, 600, 700, 800, 900]

`join()` usage

example of usage of `join()` function of dask.bag API where we have lists of tuples and we want to sum up values in both lists where first values of tuple matches. We have implemented both pure Python and dask.bag API versions for an explanation.

In [81]:
x = [("a",100), ("b",200), ("c",300), ("d",400), ("e",500)]
y = [("a",150), ("b",250), ("c",350), ("d",450), ("e",550)]

result = {}
for key, val in x+y:
    if key in result:
        result[key] += val
    else:
        result[key] = val

list(result.items())

[('a', 250), ('b', 450), ('c', 650), ('d', 850), ('e', 1050)]

In [82]:
bag1 = db.from_sequence(x)

bag1.join(y, lambda x: x[0]).map(lambda x: (x[0][0], sum([val[1] for val in x]))).compute()

[('a', 250), ('b', 450), ('c', 650), ('d', 850), ('e', 1050)]

`groupby()` usage 

example of usage of `groupby()` where we are looping through a list of two values tuple. We want to take a sum of the second value in tuples where the first value is the same. We have explained it below with both normal python and dask.bag API version.

In [83]:
x = [("a",100), ("b",200), ("c",300), ("d",400), ("e",500), ("a",200), ("e",300)]

result = {}

for key, val in x:
    if key in result:
        result[key] += val
    else:
        result[key] = val

list(result.items())

[('a', 300), ('b', 200), ('c', 300), ('d', 400), ('e', 800)]

In [84]:
bag1 = db.from_sequence([("a",100), ("b",200), ("c",300), ("d",400), ("e",500), ("a",200), ("e",300)])

bag1.groupby(lambda x: x[0]).map(lambda x: (x[0], sum([i[1] for i in x[1]]))).compute()

[('d', 400), ('e', 800), ('c', 300), ('a', 300), ('b', 200)]

More examples are available by following links:

* https://coderzcolumn.com/tutorials/python/dask-bag-parallel-computing-in-python
* https://docs.dask.org/en/latest/bag.html
* https://github.com/dask/dask-tutorial/blob/main/02_bag.ipynb

### Saving Output to a File 

It's advisable to save the output of the bag to save it to another file after computation is complete. It might happen that output after performing `compute()` is very big and can not be held in memory than its better to save it to disk and then verify results.

Dask bag provides a list of methods for converting the output to another format and saving it to various file formats. Below are three methods available for converting a bag of values to another format and saving it.

* `to_avro()` - It can be used to save a bag of values as Avro files
* `to_dataframe()` - It can be used to convert bag of values to dask data frames which is available through the dask.dataframe module and lets us work on pandas data frames in parallel
* `to_textfiles()` - It can be used to save a bag of values as text files


## Dask Array 

Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using Dask graphs.

### Scope

Dask arrays support most of the NumPy interface like the following:

* Arithmetic and scalar mathematics: `+`, `*`, `exp`, `log, ...
* Reductions along axes: `sum()`, `mean()`, `std()`, `sum(axis=0)`, ...
* Tensor contractions / dot products / matrix multiply: `tensordot`
* Axis reordering / transpose: `transpose`
* Slicing: `x[:100, 500:100:-2]`
* Fancy indexing along single axes with lists or NumPy arrays: `x[:, [10, 1, 5]]`
* Array protocols like `__array__` and `__array_ufunc__`
* Some linear algebra: `svd`, `qr`, `solve`, `solve_triangular`, `lstsq`
* …

**However**, Dask Array does not implement the entire NumPy interface. Users expecting this will be disappointed. Notably, Dask Array lacks the following features:

* Much of `np.linalg` has not been implemented. This has been done by a number of excellent BLAS/LAPACK implementations, and is the focus of numerous ongoing academic research projects
* Arrays with unknown shapes do not support all operations
* Operations like `sort` which are notoriously difficult to do in parallel, and are of somewhat diminished value on very large data (you rarely actually need a full sort). Often we include parallel-friendly alternatives like `topk`
* Dask Array doesn’t implement operations like `tolist` that would be very inefficient for larger datasets. Likewise, it is very inefficient to iterate over a Dask array with for loops
* Dask development is driven by immediate need, hence many lesser used functions have not been implemented. Community contributions are encouraged

This creates a `10000x10000` array of random numbers, represented as many numpy arrays of size 1000x1000 (or smaller if the array cannot be divided evenly). In this case there are 100 (10x10) numpy arrays of size 1000x1000.

In [85]:
import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
x

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 762.94 MiB 7.63 MiB Shape (10000, 10000) (1000, 1000) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray


Use NumPy syntax as usual

In [86]:
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z

Unnamed: 0,Array,Chunk
Bytes,39.06 kiB,3.91 kiB
Shape,"(5000,)","(500,)"
Count,430 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 39.06 kiB 3.91 kiB Shape (5000,) (500,) Count 430 Tasks 10 Chunks Type float64 numpy.ndarray",5000  1,

Unnamed: 0,Array,Chunk
Bytes,39.06 kiB,3.91 kiB
Shape,"(5000,)","(500,)"
Count,430 Tasks,10 Chunks
Type,float64,numpy.ndarray


Call `.compute()` when you want your result as a NumPy array.


## Persist data in memory

If you have the available RAM for your dataset then you can persist data in memory.
This allows future computations to be much faster.

In [87]:
y = y.persist()

In [88]:
%time y[0, 0].compute()

Wall time: 1.03 s


1.9515086969580655

In [89]:
%time y.sum().compute()

Wall time: 2.6 s


99991960.07231274

The detailed examples about Dask Array could be found by this link https://github.com/dask/dask-tutorial/blob/main/03_array.ipynb

**It also includes Blocked Algorithms**

## Dask Dataframe

Dask `DataFrame` is a large parallel `DataFrame` composed of many smaller Pandas `DataFrames`, split along the index. These Pandas `DataFrames` may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. One Dask `DataFrame` operation triggers many operations on the constituent Pandas `DataFrames`.

**Main Take-aways**

* Dask DataFrame should be familiar to Pandas users
* The partitioning of dataframes is important for efficient execution

**Limitations**

Dask.dataframe only covers a small but well-used portion of the Pandas API. This limitation is for two reasons:

* The Pandas API is huge
* Some operations are genuinely hard to do in parallel (e.g. `sort`)

Additionally, some important operations like `set_index work`, but are slower than in Pandas because they include substantial shuffling of data, and may write out to disk.

### Download the NYC Flights dataset 

Load data with an extract of flights in the USA across several years. This data is specific to flights out of the three airports in the New York City area.

In [1]:
import urllib

print("- Downloading NYC Flights dataset... ", end='', flush=True)
url = "https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz"
filename, headers = urllib.request.urlretrieve(url, 'nycflights.tar.gz')
print("Done!", flush=True)

- Downloading NYC Flights dataset... Done!


In [2]:
import tarfile

# extract the .csv files from the tar file
with tarfile.open(filename, mode='r:gz') as flights:
            flights.extractall('data/')

In [5]:
import os
import dask.dataframe as dd

df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]})

df

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,datetime64[ns],int64,float64,int64,float64,int64,object,int64,float64,float64,int64,float64,float64,float64,object,object,float64,float64,float64,int64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Notice that the respresentation of the dataframe object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes.

In [6]:
df.head()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,0,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,0,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,0,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,0,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,0,0


`df.tail()` this command will give an error
**In order to prevent this error** we should infer the correct type for coloumns with mixed dtypes.

In [7]:
df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
df.tail()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
269176,1999-12-27,1,1645.0,1645,1830.0,1901,UA,1753,N516UA,225.0,...,205.0,-31.0,0.0,LGA,DEN,1619.0,7.0,13.0,False,0
269177,1999-12-28,2,1726.0,1645,1928.0,1901,UA,1753,N504UA,242.0,...,214.0,27.0,41.0,LGA,DEN,1619.0,5.0,23.0,False,0
269178,1999-12-29,3,1646.0,1645,1846.0,1901,UA,1753,N592UA,240.0,...,220.0,-15.0,1.0,LGA,DEN,1619.0,5.0,15.0,False,0
269179,1999-12-30,4,1651.0,1645,1908.0,1901,UA,1753,N575UA,257.0,...,233.0,7.0,6.0,LGA,DEN,1619.0,5.0,19.0,False,0
269180,1999-12-31,5,1642.0,1645,1851.0,1901,UA,1753,N539UA,249.0,...,232.0,-10.0,-3.0,LGA,DEN,1619.0,6.0,11.0,False,0


### Basic examples

Let's find out following things with Dask DataFrames

1. In total, how many non-cancelled flights were taken from each airport?

In [8]:
df[~df.Cancelled].groupby('Origin').Origin.count().compute()

Origin
EWR    1139451
JFK     427243
LGA     974267
Name: Origin, dtype: int64

2.  What was the average departure delay from each airport?

In [9]:
df.groupby("Origin").DepDelay.mean().compute()

Origin
EWR    10.295469
JFK    10.351299
LGA     7.431142
Name: DepDelay, dtype: float64

3. What day of the week has the worst average departure delay?

In [10]:
df.groupby("DayOfWeek").DepDelay.mean().compute()

DayOfWeek
1     8.096565
2     8.149109
3     9.141912
4    10.538275
5    11.476687
6     7.824071
7     8.994296
Name: DepDelay, dtype: float64

### Sharing Intermediate Results


When computing all of the above, we sometimes did the same operation more than once. For most operations, dask.dataframe hashes the arguments, allowing duplicate computations to be shared, and only computed once.

For example, lets compute the mean and standard deviation for departure delay of all non-canceled flights. Since dask operations are lazy, those values aren't the final results yet. They're just the recipe required to get the result.

If we compute them with two calls to compute, there is no sharing of intermediate computations.

In [12]:
non_cancelled = df[~df.Cancelled]
mean_delay = non_cancelled.DepDelay.mean()
std_delay = non_cancelled.DepDelay.std()

In [13]:
%%time

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

Wall time: 13.7 s


In [15]:
import dask

In [16]:
%%time

mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

Wall time: 10.5 s


Using `dask.compute` takes roughly 1/2 the time. This is because the task graphs for both results are merged when calling dask.compute, allowing shared operations to only be done once instead of twice. In particular, using dask.compute only does the following once:

* the calls to `read_csv`
* the filter `(df[~df.Cancelled])`
* some of the necessary reductions (`sum`, `count`)

More examples of Dask DataFrame can be find by following links:

* https://github.com/dask/dask-tutorial/blob/main/04_dataframe.ipynb
* https://nbviewer.org/github/danbochman/Open-Source-Spotlight/blob/master/Dask/Dask.ipynb
* https://examples.dask.org/dataframe.html

## Dask ML 

Dask-ML provides scalable machine learning in Python using Dask alongside popular machine learning libraries like Scikit-Learn, XGBoost, and others.

There are a couple of distinct scaling problems you might face. The scaling strategy depends on which problem you're facing

* For in-memory problems, just use scikit-learn (or your favorite ML library)
* For large models, use `dask_ml.joblib` and your favorite scikit-learn estimator
* For large datasets, use `dask_ml` estimators


### Preprocessing

`dask_ml.preprocessing` contains some scikit-learn style transformers that can be used in Pipelines to perform various data transformations as part of the model fitting process. These transformers will work well on dask collections (`dask.array`, `dask.dataframe`), NumPy arrays, or pandas dataframes. They’ll fit and transform in parallel.

###  Scikit-Learn Clones

Some of the transformers are (mostly) drop-in replacements for their scikit-learn counterparts.

* `MinMaxScaler([feature_range, copy, clip])` - Transform features by scaling each feature to a given range.

* `QuantileTransformer(*[, n_quantiles, ...])` - Transforms features using quantile information.

* `RobustScaler(*[, with_centering, ...])` - Scale features using statistics that are robust to outliers.

* `StandardScaler(*[, copy, with_mean, with_std])` - Standardize features by removing the mean and scaling to unit variance.

* `LabelEncoder([use_categorical])` - Encode labels with value between 0 and n_classes-1.

* `OneHotEncoder(n_values, ...)` - Encode categorical integer features as a one-hot numeric array.

* `PolynomialFeatures([degree, ...])` - Generate polynomial and interaction features.

**These can be used just like the scikit-learn versions, except that:**

* They operate on dask collections in parallel

* `.transform` will return a `dask.array` or `dask.dataframe` when the input is a dask collection

### Create Scikit-Learn Estimator

In [21]:
!pip install scikit-learn



**Because of some issue with scikit-learn I could not excute following lines, please folow the presentation**

### Training on Large Datasets

Most estimators in scikit-learn are designed to work on in-memory arrays. Training with larger datasets may require different algorithms.

All of the algorithms implemented in Dask-ML work well on larger than memory datasets, which you might store in a dask array or dataframe.

In this example, dask_ml.datasets.make_blobs  generates some random dask arrays


Here, we use the k-means implemented in Dask-ML to cluster the points. It uses the k-means|| (read: “k-means parallel”) initialization algorithm, which scales better than k-means++. All of the computation, both during and after initialization, can be done in parallel.

### Dask ML- Scikit-Learn & Joblib
As an example you might distribute a randomized cross validated parameter search as follows:

### Dask ML - XGBoost & LightGBM

XGBoost is a powerful and popular library for gradient boosted trees. For larger datasets or faster training XGBoost also provides a distributed computing solution. LightGBM is another library similar to XGBoost; it also natively supplies native distributed training for decision trees.


Dask sets up XGBoost’s master process on the Dask scheduler and XGBoost’s worker processes on Dask’s worker processes. Then it moves all of the Dask dataframes’ constituent Pandas dataframes to XGBoost and lets XGBoost train. Fortunately, because XGBoost has an excellent Python interface, all of this can happen in the same process without any data transfer. The two distributed services can operate together on the same data.

When XGBoost is finished training Dask cleans up the XGBoost infrastructure and continues on as normal.

More information about Dask ML you can find by following links:

* https://ml.dask.org/
* https://github.com/Svetave/dask-tutorial/blob/main/08_machine_learning.ipynb
* https://examples.dask.org/machine-learning.html

