# Chunking (*with an example using images*)

Many of the manipulations on distributed Bolt arrays `chunk` the component arrays under the hood to make operations like swapping and transposing more efficient. These chunked arrays are themselves useful for more fine-grained control over parallelization, so we expose them directly.

Let's start with a simple array

In [9]:
from bolt import ones

In [10]:
a = ones((2, 20, 10), sc)

In [3]:
a.shape

(2, 20, 10)

When we chunk, we specify the size of chunks along each axis. We only provide two sizes, not three, because we can only chunk along the axes stored in the values (which in this case includes two axes).

In [4]:
a.values.shape

(20, 10)

In [6]:
c = a.chunk((10, 5))

To see what happened, let's look at the values of the underlying RDD. Whereas initially each value was a `(20,10)` array, when chunked each one is `(10,5)`

In [6]:
a.tordd().values().first().shape

(20, 10)

In [7]:
c.tordd().values().first().shape

(10, 5)

Let's also look at the keys. They are tuples, but instead of just indexing along the first axis, they now additionally index along the chunk. Note that we've gone from 2 records in total to 8 because each subarray was broken into 4 chunks, for a total of 2 * 4.

In [8]:
a.tordd().keys().collect()

[(0,), (1,)]

In [9]:
c.tordd().keys().collect()

[((0,), (0, 0)),
 ((0,), (0, 1)),
 ((0,), (1, 0)),
 ((0,), (1, 1)),
 ((1,), (0, 0)),
 ((1,), (0, 1)),
 ((1,), (1, 0)),
 ((1,), (1, 1))]

We can perform parallelized `map` operations on the `ChunkedArray` and then `unchunk`, providing a useful interface for distributing over arrays when simply working along a subset of axes is not sufficient.

In [10]:
c.map(lambda x: x * 2).unchunk().toarray().shape

(2, 20, 10)


(2, 20, 10)

In [11]:
c.tordd().values().first()[0:2,0:2].shape

(2, 2)

In [12]:
c.tordd().keys().collect()

[((0,), (0, 0)),
 ((0,), (0, 1)),
 ((0,), (1, 0)),
 ((0,), (1, 1)),
 ((1,), (0, 0)),
 ((1,), (0, 1)),
 ((1,), (1, 0)),
 ((1,), (1, 1))]

In [21]:
from numpy import asarray

In [13]:
asarray([2]) * c.getnumber(c.plan, c.vshape)[0:1]

NameError: name 'asarray' is not defined

In [23]:
tuple(c.kshape) + tuple(asarray([4]))

(2, 4)

In [17]:
c.shape

(2, 20, 10)

In [8]:
c.map(lambda x: x[0:2,0]).unchunk().toarray().shape

[2 5]
[ 20.   1.]
(2, 20.0, 1.0)


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 7.0 failed 1 times, most recent failure: Lost task 2.0 in stage 7.0 (TID 22, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/freemanj11/code/spark-1.4.0-bin-hadoop1/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/Users/freemanj11/code/spark-1.4.0-bin-hadoop1/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/freemanj11/code/spark-1.4.0-bin-hadoop1/python/pyspark/rdd.py", line 2318, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/Users/freemanj11/code/spark-1.4.0-bin-hadoop1/python/pyspark/rdd.py", line 2318, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/Users/freemanj11/code/spark-1.4.0-bin-hadoop1/python/pyspark/rdd.py", line 2318, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/Users/freemanj11/code/spark-1.4.0-bin-hadoop1/python/pyspark/rdd.py", line 304, in func
    return f(iterator)
  File "/Users/freemanj11/code/spark-1.4.0-bin-hadoop1/python/pyspark/rdd.py", line 972, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/Users/freemanj11/code/spark-1.4.0-bin-hadoop1/python/pyspark/rdd.py", line 972, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/Users/freemanj11/code/spark-1.4.0-bin-hadoop1/python/pyspark/rdd.py", line 1873, in <lambda>
    map_values_fn = lambda kv: (kv[0], f(kv[1]))
  File "/Users/freemanj11/github/bolt-project/bolt/bolt/spark/chunk.py", line 138, in _unchunk
    arr[i] = d
IndexError: index 1 is out of bounds for axis 1 with size 1

	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:695)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
