### Dask DataFrames

Similar to the design of arrays, dask dataframes are paritioned pandas dataframes. Computations run pandas operators on the chunks and aggregate results from the chunked operations.

<img src="https://dask.org/_images/dask-dataframe.svg" width="256" title="    https://dask.org/_images/dask-dataframe.svg" />

For the most part, dask has tried to implement all of pandas, but there are some inefficient operations that it does not support.

Let's load up a dataframe and see what we get.  This is the NYC flight data used in the dask tutorial.

In [None]:
import dask.dataframe as dd
df = dd.read_csv('../input/dask-nyc-flights/*.csv',
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
df

In [None]:
df.tail()

The dataframe has 10 chunks (npartitions) that correspond to the ten files that were read.  Dataframes have two key properties:
  * they are tabular (two-dimensional) data
  * each column has a datatype defined by a _schema_
  
The programmer is encouraged to think of this as "spreadsheet or SQL table".  It is reasonable to call the data __structured__.
  * this is in contrast to semi-structured data which has tags and hierarchy, but does not enforce types. Examples are XML and JSON.
  * in database parlance, this is flat data model.

### Data Slicing and Aggregation

The most basic operations in a database and in dask is to <code>select</code> rows and <code>project</code> columns.

#### Selecting rows

Find all flights flown by a specific plane, identified by <code>TailNum</code>

In [None]:
df[df.TailNum=='N516UA'].compute()

#### Projecting Columns

Build a dataframe that describes the plane ('TailNum') and route ('FlightNum')

In [None]:
routes = df[['FlightNum','TailNum']].compute()
routes

### Aggregating Data

A common data science inquiry is to query an aggregate (mean, min, max, sum, etc.) in a group.  This is done with a <code>groupby</code> query. The pattern is to construct a grouping and then aggregate over the grouping.

__Query:__ How many different flights were flown by each plane?

In [None]:
routes.groupby('TailNum').FlightNum.count()

I have intentionaly mixed syntax. Dataframes in both R and Python use two form of syntax interchangeably.
  * <code>dataframe.columnName</code>
  * <code>dataframe['columnName']

Many functions only accept the bracketed indexing of columns.
    
__Query:__ How many different planes flew each flight?

In [None]:
routes.groupby('FlightNum')['TailNum'].count()

__Query:__ What were the most routes flown by a single plane?

In [None]:
routes.groupby('TailNum')['FlightNum'].count().max()

__Query:__ What is the maximum number of planes to fly a single route?

In [None]:
routes.groupby('FlightNum')['TailNum'].count().max()

But, these aggregates are not really the questions we want answered. More natural questions are awkward.
  * What plane flew the most routes?
  * What route was flown by the most planes?

In [None]:
routes.groupby('FlightNum').TailNum.count().idxmax()

In [None]:
routes.groupby('TailNum').FlightNum.count().idxmax()

This reveals problems with the data.  So, let's look for an actual plane.

In [None]:
routes[routes.TailNum != 'UNKNOW'].groupby('TailNum').FlightNum.count().idxmax()

and ask for how many flights it has flown

In [None]:
routes[routes.TailNum != 'UNKNOW'].groupby('TailNum').FlightNum.count().max()

and verify that this is the right answer.

In [None]:
routecount = routes.groupby('TailNum').FlightNum.count()
routecount.get('N413DA')

We have uncovered what I think is the most annoying part of dask and pandas dataframes. Aggregate functions return series, which are not dataframes. They have different methods.  I would have preferred to have written:

<code>routecount[routecount.TailNum=='N413DA'].compute()</code>

But, that's dataframe syntax not series syntax.  Aggregates assume that the output is small and return pandas series.

In [None]:
type(routecount)

### Indexes

All dataframes have a __default index__.  In this case, the index was generated when we loaded the data and is the row number in the pandas dataframe. Surprisingly, the index is not unique.  The same index value appears in each pandas dataframe.

In [None]:
print(df.index, "\n")
print("Number of rows in the database\n", len(df))
maxindex = df.index.nunique().compute()
print("Number of unique values in the index\n", maxindex)

# find all entries with index value 22000
df.loc[22000].compute()

The notion of an index comes from relational databases: "A database index is a data structure that improves the speed of data retrieval operations on a database table at the cost of additional writes and storage space to maintain the index data structure."

Real indexes come in many forms, but are most commonly:
  * hash tables -- organize the data by the hash value of a key field for constant time $O(1)$ lookup by key.
  * B+-trees/sorted -- sort the data in a tree to lookup a key in $O(\log n)$ time and be able to scan sequential keys.

A blog from TimesTen gives a reasonable schematic.

<img src="https://cdn.app.compendium.com/uploads/user/e7c690e8-6ff9-102a-ac6d-e4aebca50425/bbeb190a-b93b-4d7b-bd6c-3f9928cd87d2/Image/fdf8758152659ecde76a20de8c60c23b/which_is_best.JPG" width="512" title="https://cdn.app.compendium.com/uploads/user/e7c690e8-6ff9-102a-ac6d-e4aebca50425/bbeb190a-b93b-4d7b-bd6c-3f9928cd87d2/Image/fdf8758152659ecde76a20de8c60c23b/which_is_best.JPG" />

Indexing can be accomplished in dask by calling <code>set_index</code>, which often results in a global shuffle of the data.  In can be very expensive.  The data that we have is already sorted by date, so setting the index to that value does not take a long time. 



In [None]:
%time df.set_index('Date')

In [None]:
df

Dask has implemented some what I would expect out of indexing:
1. dask can be told that there is existing structure in the data
2. dask reorganizes data to optimize queries

Dask does not support some features that I think it should
1. hash indexing
2. data readers infer the existence of an index

### Why do indexes matter?

Understanding the structure of the can lead to massive performance differences.
We can create an example in which seemingly identical queries perform 
differently because of an implicit index structure.

We create 100 files each with 100000 entries with two fields 'A' and 'B'.  'A' contains an integer that identifies the file number (0-99). 'A' is the same in each file. 'B' contains a sequence of numbers 0-1000000 in each files. These data:
  * are too big to fit in memory
  * have one field that will be identical in each pandas dataframe

In [None]:
df = dd.read_csv('../output/untracked/*.csv')
df.head

Now let's compare how long it takes to sum all the elements grouped by each key for both columns.  In both cases, we touch all of the data.  But, the sums in when we <code>groupby</code> 'A' can all be evaluated in one chunk.  When we <code>groupby</code> 'B', partial sums at each value must be aggregated across all chunks.

In [None]:
%time df.groupby('A').sum().compute()

In [None]:
%time df.groupby('B').sum().compute()

In [None]:
df.set_index('A')
%time df.groupby('A').sum().compute()

This are big performance differences that come from both:
  * the organization of the data
  * dask knowing about the organization of the data

### Parting Thoughts

Limitations and comments:
  * dask does not have a general sort capability
    * but, can be accomplished with set_index
    * shuffle is inefficient, use a different engine
  * dask does not support row indexing by phycial offset
    * this is not an important feature

### Generate data

Uncomment and run this once to make data.

In [None]:
#import csv
#for i in range(100):
#    with open(f'../output/untracked/csv{i}.csv', 'w', newline='') as csvfile:
#        csvw = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
#        csvw.writerow(['A','B'])
#        for j in range(1000000):
#            csvw.writerow([i,j])