# Topics
-----------------------------
## Prequel
<ul>
    <li>Introduction to Generators & Decorators</li>
</ul>
<h2>Dask</h2>

<ul>
    <li> Parallelizing the traditional pandas pipeline</li>
    <li> Dask Arrays</li>
    <li> Dask dataframes</li>
    <li> What's there and What's not?</li>
    <li> Dask Distributed </li>
</ul>
   

## Generators

In [None]:
# Simple generator
"""
If a function contains at least one yield statement (it may contain other yield or return statements), 
it becomes a generator function. Both "yield" and "return" will return some value from a function.

Here is how a generator function differs from a normal function.

--Generator function contains one or more yield statement.
--When called, it returns an object (iterator) but does not start execution immediately.
--Methods like __iter__() and __next__() are implemented automatically. So we can iterate through the items using next().
--Once the function yields, the function is paused and the control is transferred to the caller.
--Local variables and their states are remembered between successive calls.
--Finally, when the function terminates, StopIteration is raised automatically on further calls.


****The __iter__() function returns an iterator for the given object (array, set, tuple etc. or custom objects).
It creates an object that can be accessed one element at a time using __next__() function, 
which generally comes in handy when dealing with loops.
"""
def my_gen():
    n = 1
    print('This is printed first')
    yield n

    n += 1
    print('This is printed second')
    yield n

    n += 1
    print('This is printed at last')
    yield n

In [None]:
gen = my_gen()
print(type(gen))
#next(gen)
#next(gen)
#next(gen)
#next(gen)

In [None]:
"""
This is how we create generators on the fly
"""
my_list =[1,2,3,4,5,6]
gen = (x**2 for x in my_list)
print(type(gen))
for i in gen:
    print(i)

In [None]:
next(gen)

In [None]:
"""
Generator expression can be used inside functions. 
When used in such a way, the round parentheses can be dropped.
"""
print(sum(x**2 for x in my_list))
print(max(x**2 for x in my_list))

## Decorators

In [None]:
"""
Python has an interesting feature called decorators to add functionality to an existing code.
This is also called metaprogramming as a part of the program tries to modify another part of the program at compile time.

Functions can be passed as arguments to another function.

Such function that take other functions as arguments are also called higher order functions. 
Here is an example of such a function.
"""
def inc(x):
    return x + 1

def dec(x):
    return x - 1

def operate(func, x):
    result = func(x)
    return result

print(operate(inc,3))
print(operate(dec,3))

In [None]:
"""
Furthermore, a function can return another function.

Here, is_returned() is a nested function which is defined and returned, each time we call is_called().
"""
def is_called():
    def is_returned():
        print("Hello")
    return is_returned

new = is_called()
#Outputs "Hello"
new()

In [None]:
def make_pretty(func):
    def inner():
        print("I got decorated")
        func()
    return inner

def ordinary():
    print("I am ordinary")
    
ordinary()
# let's decorate this ordinary function
pretty = make_pretty(ordinary)
pretty()

In [None]:
@make_pretty
def ordinary():
    print("I am ordinary")

#ordinary()

In [None]:
import dask
from dask import delayed
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def addition(x, y):
    sleep(1)
    return x + y

@delayed
def increament(x):
    return x+1

@delayed
def double(x):
    return 2*x

@delayed
def add(x,y):
    return x+y

In [None]:
x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)

In [None]:
data = [i for i in range(1,6)]
output = []

for i in data:
    a = increament(i)
    b = double(i)
    c = add(a,b)
    output.append(c)

total = sum(output)

print(total.compute())

Here we have used the delayed annotation to show that we want these functions to operate lazily - to save the set of inputs and execute only on demand. `dask.delayed` is also a function which can do this, without the annotation, leaving the original function unchanged, e.g., 
```python
    delayed_inc = delayed(inc)
```

## Dask

### Why Dask
Aside from the [detailed introduction](http://dask.pydata.org/en/latest/), we can summarize the basics of Dask as follows:
- process data that doesn't fit into memory by breaking it into blocks and specifying task chains
- parallelize execution of tasks across cores and even nodes of a cluster
- move computation to the data rather than the other way around, to minimize communication overheads

All of this allows you to get the most out of your computation resources, but program in a way that is very familiar: for-loops to build basic tasks, Python iterators, and the Numpy (array) and Pandas (dataframe) functions for multi-dimensional or tabular data, respectively.

The remainder of this notebook will take you through the first of these programming paradigms. This is more detail than some users will want, who can skip ahead to the iterator, array and dataframe sections; but there will be some data processing tasks that don't easily fit into those abstractions and need to fall back to the methods here.

We include a few examples at the end of the notebooks showing that the ideas behind how Dask is built are not actually that novel, and experienced programmers will have met parts of the design in other situations before. Those examples are left for the interested.

In [None]:
import pandas as pd
from sys import getsizeof
import matplotlib.pyplot as plt
import numpy as np
import dask.array as da
import dask.dataframe as dd
import time
from matplotlib.pyplot import *
from matplotlib import animation
from matplotlib import cm



In [None]:
template = "./flightdelays/flightdelays-2016-{:d}.csv"
filenames = [template.format(i) for i in range(1,6)]

In [None]:
def pct_delayed(df):
    # Compute number of delayed flights: n_delayed
    n_delayed = (df['DEP_DELAY']>0).sum()
    # Return percentage of delayed flights
    return n_delayed*100/len(df)

In [None]:
dataframes = (pd.read_csv(file) for file in filenames)
monthly_delayed = [pct_delayed(df) for df in dataframes]
print(getsizeof(dataframes))
x = range(1,6)
plt.plot(x, monthly_delayed, marker='o', linewidth=0)
plt.ylabel('% Delayed')
plt.xlabel('Month - 2016')
plt.xlim((1,6))
plt.ylim((0,100))
plt.show()

In [None]:
@delayed
def pct_delayed(df):
    # Compute number of delayed flights: n_delayed
    n_delayed = df.DEP_DELAY >0
    n_delayed = sum(n_delayed)
    # Return percentage of delayed flights
    return n_delayed*100/len(df)

@delayed
def read_file(fname):
    return pd.read_csv(fname)

In [None]:
delayed = [pct_delayed(read_file(fname)) for fname in filenames]

In [None]:
total_delayed = sum(delayed)
total_delayed.compute()

## Dask Arrays

Dask array provides a parallel, larger-than-memory, n-dimensional array using blocked algorithms. Simply put: distributed Numpy.

*  **Parallel**: Uses all of the cores on your computer
*  **Larger-than-memory**:  Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.
*  **Blocked Algorithms**:  Perform large computations by performing many smaller computations

**Related Documentation**

* [Documentation](http://dask.readthedocs.io/en/latest/array.html)
* [API reference](http://dask.readthedocs.io/en/latest/array-api.html)

### Blocked Algorithms* :
A *blocked algorithm* executes on a large dataset by breaking it up into many small blocks.

For example, consider taking the sum of a billion numbers.  We might instead break up the array into 1,000 chunks, each of size 1,000,000, take the sum of each chunk, and then take the sum of the intermediate sums.

We achieve the intended result (one sum on one billion numbers) by performing many smaller results (one thousand sums on one million numbers each, followed by another sum of a thousand numbers.)


You can create a `dask.array` `Array` object with the `da.from_array` function.  This function accepts

1.  `data`: Any object that supports NumPy slicing.
2.  `chunks`: A chunk size to tell us how to block up our array, like `(1000,1000)`

In [None]:
import numpy as np
import dask.array as da

x = da.random.normal(10, 0.1, size=(20000, 20000),   # 400 million element array 
                              chunks=(1000, 1000))   # Cut into 1000x1000 sized chunks
y = x.mean(axis=0)[::100]                            # Perform NumPy-style operations

In [None]:
x.nbytes / 1e9  # Gigabytes of the input processed lazily

In [None]:
y.compute()     # Time to compute the result

In [None]:
data = np.arange(20000)
dask_data = da.from_array(data, chunks=len(data)//4)
print(dask_data.chunks)
print(dask_data.mean().compute())
print(dask_data.mean())

## Dask DataFrames

In [None]:
datafile  = './nyctaxitrip/train.csv'
df = dd.read_csv(datafile, parse_dates=[2,3])


In [None]:
df.info()

In [None]:
longitude = list(df.pickup_longitude.values.compute()) + list(df.dropoff_longitude.values.compute())
latitude = list(df.pickup_latitude.values.compute()) + list(df.dropoff_latitude.values.compute())
plt.figure(figsize = (10,10))
plt.plot(longitude,latitude,'.', alpha = 0.4, markersize = 0.05)
plt.show()

In [None]:
##removing far places
xlim = [-74.03, -73.77]
ylim = [40.63, 40.85]
df = df[(df.pickup_longitude> xlim[0]) & (df.pickup_longitude < xlim[1])]
df = df[(df.dropoff_longitude> xlim[0]) & (df.dropoff_longitude < xlim[1])]
df = df[(df.pickup_latitude> ylim[0]) & (df.pickup_latitude < ylim[1])]
df = df[(df.dropoff_latitude> ylim[0]) & (df.dropoff_latitude < ylim[1])]

In [None]:
longitude = list(df.pickup_longitude.values.compute()) + list(df.dropoff_longitude.values.compute())
latitude = list(df.pickup_latitude.values.compute()) + list(df.dropoff_latitude.values.compute())
plt.figure(figsize = (10,10))
plt.plot(longitude,latitude,'.', alpha = 0.4, markersize = 0.05)
plt.show()

In [None]:
df['pickup_day'] = df['pickup_datetime'].dt.day
df['pickup_month'] = df['pickup_datetime'].dt.month
df['pickup_weekday'] = df['pickup_datetime'].dt.weekday
df['pickup_hour'] = df['pickup_datetime'].dt.hour

df['drop_day'] = df['dropoff_datetime'].dt.day
df['drop_month'] = df['dropoff_datetime'].dt.month
df['drop_weekday'] = df['dropoff_datetime'].dt.weekday
df['drop_hour'] = df['dropoff_datetime'].dt.hour

## Available Functions
* Trivially parallelizable operations (fast):
    *  Elementwise operations:  ``df.x + df.y``
    *  Row-wise selections:  ``df[df.x > 0]``
    *  Loc:  ``df.loc[4.0:10.5]``
    *  Common aggregations:  ``df.x.max()``
    *  Is in:  ``df[df.x.isin([1, 2, 3])]``
    *  Datetime/string accessors:  ``df.timestamp.month``
* Cleverly parallelizable operations (also fast):
    *  groupby-aggregate (with common aggregations): ``df.groupby(df.x).y.max()``
    *  value_counts:  ``df.x.value_counts``
    *  Drop duplicates:  ``df.x.drop_duplicates()``
    *  Join on index:  ``dd.merge(df1, df2, left_index=True, right_index=True)``
* Operations requiring a shuffle (slow-ish, unless on index)
    *  Set index:  ``df.set_index(df.x)``
    *  groupby-apply (with anything):  ``df.groupby(df.x).apply(myfunc)``
    *  Join not on the index:  ``pd.merge(df1, df2, on='name')``
* Ingest operations
    *  Files: ``dd.read_csv, dd.read_parquet, dd.read_json, dd.read_orc``, etc.
    *  Pandas: ``dd.from_pandas``
    *  Anything supporting numpy slicing: ``dd.from_array``
    *  From any set of functions creating sub dataframes via ``dd.from_delayed``.
    *  Dask.bag: ``mybag.to_dataframe(columns=[...])``

## What doesn't work
Dask.dataframe only covers a small but well-used portion of the Pandas API.
This limitation is for two reasons:

1.  The Pandas API is *huge*
2.  Some operations are genuinely hard to do in parallel (e.g. sort)

Additionally, some important operations like ``set_index`` work, but are slower
than in Pandas because they include substantial shuffling of data, and may write out to disk.

## Dask Distributed

So far we have been calling `thing.compute()` or `dask.compute(thing)` without worrying what this entails. Now we will discuss the options available for that execution, and in particular, the distributed scheduler, which comes with additional functionality.

Dask comes with four available schedulers:
- "threaded": a scheduler backed by a thread pool
- "processes": a scheduler backed by a process pool
- "single-threaded" (aka "sync"): a synchronous scheduler, good for debugging
- distributed: a distributed scheduler for executing graphs on multiple machines, see below.

To select one of these for computation, you can specify at the time of asking for a result, e.g.,
```python
myvalue.compute(scheduler="single-threaded")  # for debugging
```

or set the current default, either temporarily or globally
```python
with dask.config.set(scheduler='processes'):
    # set temporarily fo this block only
    myvalue.compute()

dask.config.set(scheduler='processes')
# set until further notice
```
for cluster setup :https://distributed.dask.org/en/latest/setup.html

In [None]:
data = da.arange(100000,chunks= 4)

import time
for sch in ['threading', 'processes', 'sync']:
    t0 = time.time()
    _ = data.mean().compute(scheduler=sch)
    print(sch, time.time() - t0)

In [None]:
from dask.distributed import Client
client = Client()  # set up local cluster on your laptop
client

In [None]:
def inc(x):
    time.sleep(5)
    return x + 1

def dec(x):
    time.sleep(3)
    return x - 1

def add(x, y):
    time.sleep(7)
    return x + y

In [None]:
x = delayed(inc)(1)
y = delayed(dec)(2)
total = delayed(add)(x, y)
total.compute()