Distributed + HDFS + Pandas + fastavro
======================

We read in some CSV data with Pandas, filter it, and write it out as Avro.

CSV Bytes on HDFS $\rightarrow$ Blocks of Bytes in distributed RAM $\rightarrow$ Pandas DataFrames $\rightarrow$ Dask DataFrame $\rightarrow$ Avro-encoded bytes in RAM $\rightarrow$ Avro Bytes on HDFS

1.  `distributed.hdfs.read_binary('/nyc_trip_data_1.csv', hdfs)` $\rightarrow$ Futures of bytes
2.  `dfs = e.map(pd.read_csv, futures_of_bytes` $\rightarrow$ Futures of pandas dataframes
3.  `df = futures_to_collection(dfs)` $\rightarrow$ dask dataframe
4.  `df2 = df[df.passenger_count > 3][...]` $\rightarrow$ dask dataframe
5.  `dfs2 = collection_to_futures(df2)` $\rightarrow$ Futures of pandas dataframes
6.  `bytes = e.map(encode_dataframe_to_avro, dfs)` $\rightarrow$ Avro encoded bytes in memory
7.  `distributed.hdfs.write_binary('/nyc_trip_data_1.csv', futures, hdfs)` $\rightarrow$ Avro encoded bytes on disk


APIs
----

### HDFS3

Read and write bytes to Hadoop File System (HDFS).  User-accessible local Pythonic interface 

```python
with hdfs.open('/nyc/trip_data_1.csv') as f:
    df = pd.read_csv(f, nrows=5)
```

Also includes fancier API for internal use.


### Distributed

Direct remote execution.  Executes eagerly.

1.  `distributed.hdfs.read_binary('/nyc_trip_data_1.csv', hdfs)` $\rightarrow$ Futures of bytes
2.  `e.map(function, futures)` $\rightarrow$ more futures
3.  `distributed.hdfs.write_binary('/nyc_trip_data_1.csv', futures, hdfs)` $\rightarrow$ bytes on disk


### Dask

Fancy algorithms.  Pandas-like interface.  Executes lazily. 

1.  Pandas Futures $\rightarrow$ dask dataframe
2.  Fancy algorithms (filtering, groupby, etc..)
3.  Dask dataframe $\rightarrow$ pandas futures

In [1]:
from io import BytesIO

import pandas as pd

from distributed import Executor, progress, wait
from distributed.hdfs import read_binary, write_binary
from hdfs3 import HDFileSystem

In [2]:
e = Executor('172.31.9.67:8786')
e.restart()
hdfs = HDFileSystem()

### Local use of HDFS

In [3]:
%%time
with hdfs.open('/nyc/trip_data_1.csv') as f:
    df = pd.read_csv(f, nrows=5)

CPU times: user 2.86 ms, sys: 3.7 ms, total: 6.56 ms
Wall time: 9.69 ms


In [4]:
df

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171
1,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066
2,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002
3,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,0.7,-73.974602,40.759945,-73.984734,40.759388
4,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:25:03,2013-01-07 23:34:24,1,560,2.1,-73.97625,40.748528,-74.002586,40.747868


### Distributed use of HDFS

In [5]:
bytes = read_binary("/nyc/trip_data_1.csv", hdfs=hdfs, delimiter=b'\r\n')

In [6]:
bytes

[<Future: status: pending, key: read_block-124dce2e35986afe4d3e12779fa217ac>,
 <Future: status: pending, key: read_block-50ffcbef3b364dc7e023831c3741b20f>,
 <Future: status: pending, key: read_block-03cec003b46d3749bf92d91254c2fa24>,
 <Future: status: pending, key: read_block-7c380f28053a466ee35c1be07621aeb5>,
 <Future: status: pending, key: read_block-ab7e46dc169cf9735372d42287d1d274>,
 <Future: status: pending, key: read_block-d1ac5bc8646c5664cfcbe08fd7bbeceb>,
 <Future: status: pending, key: read_block-24de6760635ec9c449f49f3108dbe5b6>,
 <Future: status: pending, key: read_block-0dff59b6974ae5c30f3ef00ee52c7de2>,
 <Future: status: pending, key: read_block-b03a108743893919891da58772900066>,
 <Future: status: pending, key: read_block-0e83a5cfc9b24acec6ae8d2d75333d5c>,
 <Future: status: pending, key: read_block-0fe84b98b573024540c6b9e99cd2bca9>,
 <Future: status: pending, key: read_block-1fef52eb4eb08989861f837edd8581cd>,
 <Future: status: pending, key: read_block-8a411af0346eaa454f92a

### Distribute pandas functions on bytes

In [7]:
def load(b, **kwargs):
    bio = BytesIO(b)
    return pd.read_csv(bio, **kwargs)

In [8]:
head = e.submit(lambda bytes: load(bytes, nrows=5), bytes[0])  # get a snippet as a dataframe
head

<Future: status: pending, key: <lambda>-e427adda9ce565bc08437d0e94fb82cb>

In [9]:
head = head.result()  # bring result to local process
head

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171
1,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066
2,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002
3,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,0.7,-73.974602,40.759945,-73.984734,40.759388
4,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:25:03,2013-01-07 23:34:24,1,560,2.1,-73.97625,40.748528,-74.002586,40.747868


In [10]:
head.columns  # we need these to help the other blocks of bytes

Index([u'medallion', u'hack_license', u'vendor_id', u'rate_code',
       u'store_and_fwd_flag', u'pickup_datetime', u'dropoff_datetime',
       u'passenger_count', u'trip_time_in_secs', u'trip_distance',
       u'pickup_longitude', u'pickup_latitude', u'dropoff_longitude',
       u'dropoff_latitude'],
      dtype='object')

In [12]:
dfs = [e.submit(load, bytes[0])] + e.map(load, bytes[1:], names=head.columns)

# Dask DataFrame

We unite all of the scattered pandas dataframes into one logical dask dataframe.  No computation or data movement occurs.  This is purely administrative.

In [13]:
dfs

[<Future: status: pending, key: load-b8649f9b28fc324cae1306bc4e793c3c>,
 <Future: status: pending, key: load-5ac8033eecc178ab201ba7699a595318>,
 <Future: status: pending, key: load-0ca9a1f39834fe75faee682307d91c2f>,
 <Future: status: pending, key: load-3fc8c8edf429f81a366ae221eb168cfa>,
 <Future: status: pending, key: load-7ea2d47492316df297bb5cef8a978a41>,
 <Future: status: pending, key: load-0ecb80fdd9e2e21e898adeb4c209473a>,
 <Future: status: pending, key: load-b5f0124dc4cf699be5fa67ca29b5f536>,
 <Future: status: pending, key: load-ee654c4dd01d0aa1589d522b322d7f32>,
 <Future: status: pending, key: load-e813e8b71c1bf16cf48c8043207d443f>,
 <Future: status: pending, key: load-0d71842dfd07b883b9f1f630a575439f>,
 <Future: status: pending, key: load-6975eb8a64821e2dfce8a942dda318e5>,
 <Future: status: pending, key: load-db1d8ff2bdf57fa973fc169c238fcc83>,
 <Future: status: pending, key: load-b5d2dc2ef86fb81790f609aea2c3d0b8>,
 <Future: status: pending, key: load-8963dd9c76975f5d3e9ea22e3c6

In [14]:
from distributed.collections import futures_to_dask_dataframe
ddf = futures_to_dask_dataframe(dfs)

Setting global dask scheduler to use distributed


In [15]:
type(ddf)

dask.dataframe.core.DataFrame

In [16]:
ddf.head()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171
1,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066
2,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002
3,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,0.7,-73.974602,40.759945,-73.984734,40.759388
4,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:25:03,2013-01-07 23:34:24,1,560,2.1,-73.97625,40.748528,-74.002586,40.747868


## Filter

Lets say that we only care about trips with lots of passengers.  We use `dask.dataframe` to apply pandas syntax to all of our data at once.

In [17]:
ddf2 = ddf[ddf.passenger_count > 3][['pickup_datetime', 'passenger_count', 'trip_time_in_secs']]
ddf2.head()

Unnamed: 0,pickup_datetime,passenger_count,trip_time_in_secs
0,2013-01-01 15:11:48,4,382
21,2013-01-07 18:05:36,4,1094
38,2013-01-05 03:20:28,4,1387
129,2013-01-05 19:52:03,4,959
157,2013-01-13 04:36:00,5,600


We convert the `dask.dataframe` back to futures to that we can use lower-level tools again.

In [18]:
dfs2 = e.compute(*ddf2.to_imperative())  
dfs2

[<Future: status: pending, key: finalize-646cbe1ccfb17f08a6c6515c446a6c7d>,
 <Future: status: pending, key: finalize-b666aea0901cb3634db36a38cd80d9f3>,
 <Future: status: pending, key: finalize-614646bfb363bafda351735c633e1d75>,
 <Future: status: pending, key: finalize-53562c7e193fa270a008840dd2dbca9b>,
 <Future: status: pending, key: finalize-960ded82f71aa4cca0c9fc22f4fd10a0>,
 <Future: status: pending, key: finalize-a7ee9b8a77ff418f11c4968d7490e896>,
 <Future: status: pending, key: finalize-5531b5d3d70154085054d505d5ccd72a>,
 <Future: status: pending, key: finalize-911fdcf2e8e203ac1871ad15373329cc>,
 <Future: status: pending, key: finalize-20da7bf426dcd983b5a89de2c8bd3b31>,
 <Future: status: pending, key: finalize-98cc5e737154c306bffac25e4a587584>,
 <Future: status: pending, key: finalize-ec4b234a83ff5fb35759d5392f7d1f7f>,
 <Future: status: pending, key: finalize-71b16610d8e01a7cab05223055c4148b>,
 <Future: status: pending, key: finalize-30d52a7035bd97a0cf65f26c1059735f>,
 <Future: st

## Write Avro

### Generate schema

Cyavro has a nice utility to generate an Avro schema from a Pandas DataFrame.

In [19]:
import json
import cyavro
schema = json.loads(cyavro.infer_avro_schema_for_dataframe(ddf2.head()))
schema

{u'fields': [{u'name': u'pickup_datetime', u'type': u'string'},
  {u'name': u'passenger_count', u'type': u'long'},
  {u'name': u'trip_time_in_secs', u'type': u'long'}],
 u'name': u'AutoGen',
 u'namespace': u'com.maxpoint.cyavro.autogenerated',
 u'type': u'record'}

### Encode to bytes

In [20]:
def encode_dataframe_to_avro(df, schema=None):
    """ Encode a Pandas DataFrame to an Avro file 
    
    Returns bytes
    """
    from fastavro import writer
    if schema is None:
        import cyavro, json
        schema = json.loads(cyavro.infer_avro_schema_for_dataframe(df))
    bio = BytesIO()
    records = df.to_dict(orient='records')
    writer(bio, schema, records)
    return bio.getvalue()

In [21]:
output = e.map(encode_dataframe_to_avro, dfs2, schema=schema)

In [22]:
output

[<Future: status: pending, key: encode_dataframe_to_avro-a39d00a7909711e3c868c0d45541e68e>,
 <Future: status: pending, key: encode_dataframe_to_avro-8e323fd4685aef616c005bc5f7666818>,
 <Future: status: pending, key: encode_dataframe_to_avro-2f63aab85962af566331abd94644447e>,
 <Future: status: pending, key: encode_dataframe_to_avro-2c2c6d8b5cc12301a051ff22fe4ff0c8>,
 <Future: status: finished, key: encode_dataframe_to_avro-5e6e228231ab05ba31c38655d675e462>,
 <Future: status: pending, key: encode_dataframe_to_avro-cde3d10301d698b200f3b2902001ede5>,
 <Future: status: pending, key: encode_dataframe_to_avro-bcd91d9d41ebfc2afb39e09712f00f6f>,
 <Future: status: pending, key: encode_dataframe_to_avro-7cb5f81d3558eb0dfc091d3bc787fbf6>,
 <Future: status: pending, key: encode_dataframe_to_avro-492dfaa9ec0e5577a2743ced45f5f432>,
 <Future: status: pending, key: encode_dataframe_to_avro-c760f38b8d53b2ba38ea0553274e20c8>,
 <Future: status: pending, key: encode_dataframe_to_avro-1fc39dd7b9b057379e8d80

### Write bytes to HDFS

In [23]:
if hdfs.exists('/nyc/avro/'):
    hdfs.rm('/nyc/avro/')
hdfs.mkdir('/nyc/avro/')
writes = write_binary('/nyc/avro/trip.*.avro', output)

In [24]:
writes

[<Future: status: pending, key: write-febc6806ef20c45e4f96b626f5bbe4d1>,
 <Future: status: pending, key: write-d87354ec5f686aa3de20805378e3d4d7>,
 <Future: status: pending, key: write-42966cd2e138cd472d2f4f1115d3d66d>,
 <Future: status: finished, key: write-8ac448dc47041428c2eb6ad387faf18a>,
 <Future: status: finished, key: write-961be39e4effb2ee5a68a2bd86a0d463>,
 <Future: status: finished, key: write-b107382a3c5d90c2c70fd9b89a342cdd>,
 <Future: status: pending, key: write-c16da581d1fdcc21c727a4909c644c0c>,
 <Future: status: pending, key: write-57a75dc8320ba6d2f1ed3b2e653c7a1a>,
 <Future: status: pending, key: write-5c49480f4644a6334c713699deac9bb0>,
 <Future: status: pending, key: write-fc35ff9ed626d4c4543b70c70c542cc2>,
 <Future: status: pending, key: write-d1143fc7247027403ac6f0d5926dc779>,
 <Future: status: pending, key: write-1def86ccb4d5cbf47a7c210a77336bad>,
 <Future: status: pending, key: write-02d31212e560bb940b4e6f331606e87b>,
 <Future: status: pending, key: write-1ec663ace1

In [25]:
progress(writes)

### Locally verify avro file works

We do this using the `hdfs3` library from the head node.  This isn't distributed, data-local, or maximally efficient, but its great for simple checks.

In [28]:
wait(writes);

In [29]:
import fastavro
f = hdfs.open('/nyc/avro/trip.0.avro')
reader = fastavro.reader(f)

In [30]:
reader.next()

{u'passenger_count': 4,
 u'pickup_datetime': u'2013-01-01 15:11:48',
 u'trip_time_in_secs': 382}

In [31]:
reader.next()

{u'passenger_count': 4,
 u'pickup_datetime': u'2013-01-07 18:05:36',
 u'trip_time_in_secs': 1094}

In [32]:
hdfs.rm('/nyc/avro')

True