# Taking the java out of pythonic big data

## Introduction

<img src="http://www.logospike.com/wp-content/uploads/2014/11/Java_logo-2.jpg" width=300> **Java** is ubiquitous in the world of Big Data, largely because of the prominence of Hadoop and tools built on it. There are distinct parts to java's place in this infrastructure (with a few example names people will recognize):

- services: long-running java processes managing the system, that other user processes talk to.
    - HDFS for resilient distributed data storage
    - YARN for allocating resources
- runtimes: user code using java execution engines to process data
    - map-reduce
- libraries: to enable user code to do what it needs
    - to interact with the services and runtimes, above
    - other functionality, like access to data formats (avro, parquet...)

It is no surprise that top-level data engines like pig, hive, etc., are written in java.

    
### What about spark?

Spark has risen extremely rapidly to be perhaps the most important big data tool around, or certainly the most talked-about platform.

Along with R, python is one of the API languages to run computations in the spark framework. The execution engine itself if written in scala, running on Java Virtual Machines (JVMs). Although scala is growing quickly, massive amount of spark programming actually takes place in python (and R), because
- many data-oriented coders are more comfortable in python
- there is an excellent ecosystem of data science related libraries (e.g., sklearn)
- usual benefits of rapid development with fewer lines of code and interactivity

Whilst opening up spark to python is great for us, python will always lag 
Running a python operation on data via spark involves multiple serialization steps python<->java
<img src="./spark_python.png">

## Sidestepping java

We can, of course, call java functions from within python or the shell, but they are very slow, having to start up a JVM each time:

In [1]:
%%time
! hadoop fs -ls /

Found 2 items
drwxrwxrwt   - hdfs hadoop          0 2016-04-18 19:11 /tmp
drwxr-xr-t   - hdfs hadoop          0 2016-04-18 19:11 /user
CPU times: user 29.8 ms, sys: 15.6 ms, total: 45.4 ms
Wall time: 2.33 s


Therefore, there had been several attempts to make interface libraries to talk to big-data services in native python; see, e.g., [snakebite](https://github.com/spotify/snakebite), which uses RCP directly to talk to the HDFS namenode.

Here we demonstrate two small libraries to connect python to big data. They are useful in their own right, or in combination with dask-distributed, a pure python cluster execution engine.

- [hdfs3](http://hdfs3.readthedocs.org), based on [libhdfs3](http://pivotalrd.github.io/libhdfs3/) (c++, from pivotal) for HDFS
- [s3fs](http://s3fs.readthedocs.org), based on [boto3](https://boto3.readthedocs.org/) for Amazon S3

```
> conda install hdfs3
> conda install s3fs
```

Both provide an almost identical pythonic API for filesystem manipulation, and access to files compatible with the standard python file object. The APIs of the two projects are purposely very similar.

Here are a few examples of familiar file system operations.

In [2]:
import hdfs3, s3fs
hdfs = hdfs3.HDFileSystem()
s3 = s3fs.S3FileSystem()
s3.ls('distributed-test')

['distributed-test/csv/',
 'distributed-test/gzip-json/',
 'distributed-test/nested/',
 'distributed-test/test/']

The same operation as the hadoop CLI `ls` above, a factor of ~450x faster:

In [3]:
%%time
hdfs.ls('/')

CPU times: user 1.2 ms, sys: 96 µs, total: 1.29 ms
Wall time: 4.73 ms


[{'block_size': 0,
  'group': 'hadoop',
  'kind': 'directory',
  'last_access': 0,
  'last_mod': 1461006682,
  'name': '//tmp',
  'owner': 'hdfs',
  'permissions': 1023,
  'replication': 0,
  'size': 0},
 {'block_size': 0,
  'group': 'hadoop',
  'kind': 'directory',
  'last_access': 0,
  'last_mod': 1461006705,
  'name': '//user',
  'owner': 'hdfs',
  'permissions': 1005,
  'replication': 0,
  'size': 0}]

In [4]:
s3.du('distributed-test/test')

{'distributed-test/test/accounts.1.json': 133,
 'distributed-test/test/accounts.2.json': 133}

In [5]:
s3.walk('distributed-test/csv')

['distributed-test/csv/2014/',
 'distributed-test/csv/2014/2014-01-01.csv',
 'distributed-test/csv/2014/2014-01-02.csv',
 'distributed-test/csv/2014/2014-01-03.csv',
 'distributed-test/csv/2015/',
 'distributed-test/csv/2015/2015-01-01.csv',
 'distributed-test/csv/2015/2015-01-02.csv',
 'distributed-test/csv/2015/2015-01-03.csv',
 'distributed-test/csv/gzip/',
 'distributed-test/csv/gzip/2015-01-01.csv.gz',
 'distributed-test/csv/gzip/2015-01-02.csv.gz',
 'distributed-test/csv/gzip/2015-01-03.csv.gz']

In [6]:
s3.glob('distributed-test/csv/*/*.csv')

['distributed-test/csv/2014/2014-01-01.csv',
 'distributed-test/csv/2014/2014-01-02.csv',
 'distributed-test/csv/2014/2014-01-03.csv',
 'distributed-test/csv/2015/2015-01-01.csv',
 'distributed-test/csv/2015/2015-01-02.csv',
 'distributed-test/csv/2015/2015-01-03.csv']

In [7]:
s3.cat('distributed-test/csv/2014/2014-01-01.csv')

b'name,amount,id\nAlice,100,1\nBob,200,2\nCharlie,300,3\n'

In [8]:
s3.get('distributed-test/csv/2014/2014-01-01.csv', 'myfile.csv')
%cat myfile.csv

name,amount,id
Alice,100,1
Bob,200,2
Charlie,300,3


...and repeating the simple `ls` with a new interpreter: 11x faster than hadoop CLI:

In [9]:
%%time
!/opt/anaconda/bin/python -c "import hdfs3; print(hdfs3.HDFileSystem().ls('/'))"

[{'size': 0, 'owner': 'hdfs', 'permissions': 1023, 'replication': 0, 'kind': 'directory', 'last_mod': 1461006682, 'name': '//tmp', 'group': 'hadoop', 'last_access': 0, 'block_size': 0}, {'size': 0, 'owner': 'hdfs', 'permissions': 1005, 'replication': 0, 'kind': 'directory', 'last_mod': 1461006705, 'name': '//user', 'group': 'hadoop', 'last_access': 0, 'block_size': 0}]
CPU times: user 0 ns, sys: 7.64 ms, total: 7.64 ms
Wall time: 198 ms


Reading only a specific block from a file, with delimiters (which could be multi-byte sentinel values). In this case, we get exactly one whole line:

In [10]:
# block-wise access with delimiters:
s3.read_block('distributed-test/csv/2014/2014-01-01.csv', 5, 10, delimiter=b'\n')

b'Alice,100,1\n'

For HDFS, we also have access to information about which data node holds which blocks of any given file.

In [11]:
hdfs.put('myfile.csv', '/myfile.csv')
hdfs.get_block_locations('/myfile.csv')

[{'hosts': [b'ip-172-31-2-252.ec2.internal',
   b'ip-172-31-2-253.ec2.internal',
   b'ip-172-31-2-254.ec2.internal'],
  'length': 51,
  'offset': 0}]

Both libraries provide a python file object compliant interface with (binary) read, write and append modes; files are line-iterable.

In [12]:
with hdfs.open('/myfile.csv', 'rb') as f:
    f.seek(10)
    print(f.read(10))

print()
    
with hdfs.open('/myfile.csv', 'rb') as f:
    for line in f:
        print(line)

with hdfs.open('/temporary', 'wb') as f:
    f.write(b'Hello World')    
    
hdfs.cat('/temporary')

b't,id\nAlice'

b'name,amount,id'
b'Alice,100,1'
b'Bob,200,2'
b'Charlie,300,3'
b''


b'Hello World'

Other libraries can use hdfs3 and s3fs files as if they were ordinary python files.

In [13]:
# Compatibility
import pandas as pd
import gzip
with s3.open('distributed-test/csv/gzip/2015-01-01.csv.gz', 'rb') as f:
    df = pd.read_csv(gzip.open(f))
df

Unnamed: 0,name,amount,id
0,Alice,100,1
1,Bob,200,2
2,Charlie,300,3


Additional goodies:
- HDFS reads/writes are local (short-circuit) if possible. Since we can find which node has which block, we can ship the task to the data, rather than the other way around.
- S3 files are cached with read-ahead in read mode
- S3 files are uploaded in chunks and combined server-side

## Dask-distributed

So we can access Big Data - now we want to process it on a cluster.

Lets look at some example data: 26GB of CSV from the NYC taxi data.

In [14]:
s3.du('dask-data/nyc-taxi/2014/'), s3.du('dask-data/nyc-taxi/2014/', total=True) / 2**30

({'dask-data/nyc-taxi/2014/yellow_tripdata_2014-01.csv': 2324817062,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-02.csv': 2205460986,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-03.csv': 2604044841,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-04.csv': 2463299452,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-05.csv': 2488952100,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-06.csv': 2287263479,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-07.csv': 2204602640,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-08.csv': 2133936569,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-09.csv': 2260069774,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-10.csv': 2409216587,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-11.csv': 2235168890,
  'dask-data/nyc-taxi/2014/yellow_tripdata_2014-12.csv': 2198671160},
 25.905206371098757)

In [15]:
# file structure
print(s3.head('dask-data/nyc-taxi/2014/yellow_tripdata_2014-01.csv').decode())

vendor_id, pickup_datetime, dropoff_datetime, passenger_count, trip_distance, pickup_longitude, pickup_latitude, rate_code, store_and_fwd_flag, dropoff_longitude, dropoff_latitude, payment_type, fare_amount, surcharge, mta_tax, tip_amount, tolls_amount, total_amount

CMT,2014-01-09 20:45:25,2014-01-09 20:52:31,1,0.69999999999999996,-73.994770000000003,40.736828000000003,1,N,-73.982226999999995,40.731789999999997,CRD,6.5,0.5,0.5,1.3999999999999999,0,8.9000000000000004
CMT,2014-01-09 20:46:12,2014-01-09 20:55:12,1,1.3999999999999999,-73.982392000000004,40.773381999999998,1,N,-73.960448999999997,40.763995000000001,CRD,8.5,0.5,0.5,1.8999999999999999,0,11.4
CMT,2014-01-09 20:44:47,2014-01-09 20:59:46,2,2.2999999999999998,-73.988569999999996,40.739406000000002,1,N,-73.986626000000001,40.765217,CRD,11.5,0.5,0.5,1.5,0,14
CMT,2014-01-09 20:44:57,2014-01-09 20:51:40,1,1.7,-73.960212999999996,40.770463999999997,1,N,-73.979862999999995,40.777050000000003,CRD,7.5,0.5,0.5,1.7,0,10.19999999999999

A dask-distributed scheduler and set of workers was already set up on the cluster. See [here](distributed.readthedocs.org/en/latest/) for a detailed description of the design and use of distributed.

In [16]:
import distributed
e = distributed.Executor('localhost:8786')
e.ncores()
# JSON interface on :9786
# web interface on :8787

{'172.31.2.252:36319': 2,
 '172.31.2.252:45257': 2,
 '172.31.2.253:33361': 2,
 '172.31.2.253:39447': 2,
 '172.31.2.254:45268': 2,
 '172.31.2.254:50737': 2}

First, lets transfer the data to the local HDFS. No parallelism is required for this, the first function could be run over the files serially. However, this demonstrated how `distcp` (which may be familiar from the map-reduce world) can be simply parallelised over the nodes using a single call to `map`. Since all the cores access the same HDFS, they can write locally, but the blocks are replicated for fault tolerance around the cluster.

In [17]:
def download(origin, destination, host=hdfs.host, port=hdfs.port):
    """ S3->HDFS streaming downloader """
    hdfs = hdfs3.HDFileSystem(host=host, port=port)
    s3 = s3fs.S3FileSystem()
    blocksize = 10**7
    out = True
    with s3.open(origin, 'rb') as f1, hdfs.open(destination, 'wb') as f2:
        while out:
            out = f1.read(blocksize)
            f2.write(out)

def distcp(inglob, outpath, executor):
    """ Parallel distributed S3->HDFS downloader"""
    hdfs = hdfs3.HDFileSystem()
    hdfs.mkdir(outpath)
    s3 = s3fs.S3FileSystem()
    infiles = s3.glob(inglob)
    outfiles = [outpath + f.rsplit('/', 1)[1] for f in infiles]
    return executor.map(download, infiles, outfiles)

Now we perform the copy. Notice that `map`, above, returns *futures*, local handles to operations happening on the cluster, but that the client does not block. However, we can monitor its progress either here in the notebook, through the REST interface, or in the web UI. There is also a simplified text progressbar for console use.

In [18]:
futures = distcp('dask-data/nyc-taxi/2014/*', '/NYC/', e)
print(futures[0])
distributed.progress(futures)

<Future: status: pending, key: download-67385a0f16fe62e8857908ec5795e303>


In [19]:
hdfs.du('/NYC', total=True)['/NYC'] / 2**30

25.905206371098757

Dask gives you multiple levels of entry, allowing direct low-level control or convenient abstractions:
- direct access to the the dask DAG interface (e.g., for developers making new algorithms)
- intermediate executor interface with map/submit/scatter etc., returning
    - futures pointing to processes working on the cluster (eagerly evaluated remotely, but not blocking)
    - imperative values for sets of tasks ready to be submitter (lazy evaluation)
- collections
    - `bag`: schema-less python objects, like dictionaries (e.g., from JSON) - manipulate with toolz
    - `array`: numerical NDarrays with numpy functionality and linear algebra
    - `dataframe`: tabular data with pandas functionality
    
As well as the distributed engine, dask is easily usable for thread- or process-based out-of-core processing on a single machine.

#### Higher-level dask functionality

In [20]:
import distributed.hdfs
# split set of files into many chunks, each of which becomes an in-memory 
# pandas dataframe, a pieces of the whole dataset
df = distributed.hdfs.read_csv('/NYC/*', header='infer', skip_blank_lines=True,
                                lazy=False, skipinitialspace=True)
df = e.persist(df)

Setting global dask scheduler to use distributed


Now the cluster is working to load all of the data into memory but, again, the client is not blocked and we are free to do other work or sumit more tasks to the scheduler. Blocking only happens when we require a concrete result:

In [21]:
out = df.vendor_id.count()
distributed.progress(out)

In [23]:
out.compute()

164883392

Now that the data is in memory (with one small pandas data-frame for each HDFS block of the original data), querying the 165M-row data-set happens much faster, even when doing calculations on it on the way.

In [24]:
%%time
print(df.passenger_count.sum().compute())

279682748
CPU times: user 110 ms, sys: 0 ns, total: 110 ms
Wall time: 955 ms


In [25]:
%%time
print((df.passenger_count + 1).sum().compute())

444566140
CPU times: user 160 ms, sys: 0 ns, total: 160 ms
Wall time: 1.32 s


In [26]:
%%time
df.groupby('passenger_count').tip_amount.mean().compute()

CPU times: user 209 ms, sys: 5.2 ms, total: 214 ms
Wall time: 1.66 s


passenger_count
0      2.005099
1      1.529201
2      1.499112
3      1.369163
4      1.241804
5      1.524903
6      1.490462
7      3.637299
8      3.904592
9      4.690079
208    0.000000
Name: tip_amount, dtype: float64

### The wider python big-data ecosystem

Many tools are being built to plug the gaps and introduce novel processign functionality - we are fleshing out the massive data clustered python ecosystem.

- Xarray and upcoming projects based on dask
- knit: YARN integration
- dec2: quick and easy dask-distributed clusters
- Anaconda for Cluster Management (free or as part of paid Anaconda)
- avro format, parquet coming (others?)
- Mosaic (data portal)/blaze server 
- ...

<img src="http://www.h-online.com/security/imgs/46/4/5/3/9/3/9/NoMoreJava-be60b4a92a66c80f-be60b4a92a66c80f.png">

In [None]:
e.restart()

In [None]:
e.shutdown()