<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Dask-Bag-Creation" data-toc-modified-id="Dask-Bag-Creation-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Dask Bag Creation</a></span><ul class="toc-item"><li><span><a href="#Getting-the-data-files" data-toc-modified-id="Getting-the-data-files-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Getting the data files</a></span></li><li><span><a href="#Creating-a-Dask.Bag-from-a-file" data-toc-modified-id="Creating-a-Dask.Bag-from-a-file-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Creating a Dask.Bag from a file</a></span></li><li><span><a href="#Creating-a-dask-bag-using-from_sequence()" data-toc-modified-id="Creating-a-dask-bag-using-from_sequence()-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Creating a dask bag using <code>from_sequence()</code></a></span></li><li><span><a href="#Further-Reading" data-toc-modified-id="Further-Reading-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Further Reading</a></span></li></ul></li></ul></div>

# Dask Bag Creation

[Introduction to Dask, by Anderson Banihirwe](https://github.com/andersy005/dask-notebooks)

In this notebook we will introduce two different ways of getting data into the basic dask data structure, the [**```Dask.Bag```**](https://dask.pydata.org/en/latest/bag.html). Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs, i.e **messy data**. 

Messy data is often encountered at the beginning stage od data processing workflows when large volumes of raw data are first consumed. In this category, we find data in file formats such as JSON, CSV, XML, or any other format that does not enforce strict structure and datatypes. For this reason, the initial data massaging and processing is often done with Python ```lists, dicts``` and ```sets```.

These core data structures are optimized for general-purpose storage and processing.  Adding streaming computation with iterators/generator expressions or libraries like itertools or [toolz](https://toolz.readthedocs.io/en/latest/) let us process large volumes in a small space.


Dask.bag is a high level Dask collection to automate common workloads of this form. In a nutshell

 **```dask.bag = map, filter, toolz + parallel execution```**



```Dask.Bag``` is the equivalent of the ```Spark.RDD```


## Getting the data files

In this notebook we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.


In [1]:
import urllib.request
import os
import pathlib
DATASET_DIR = pathlib.Path('../data')
DATASET_FILEPATH = pathlib.Path('../data/kddcup.data_10_percent.gz')
if not DATASET_DIR.exists():
    os.mkdir(DATA_DIR)
    f = urllib.request.urlretrieve(
        "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz",
        DATASET_FILEPATH)

## Creating a Dask.Bag from a file

The most common way of creating a dask bag is to load it from a file. Notice that Dask's ```bag.read_text()``` can handle compressed files directly.

In [2]:
import dask
import dask.bag as db
# Progress Bar
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()

In [3]:
DATASET_FILEPATH

PosixPath('../data/kddcup.data_10_percent.gz')

In [4]:
raw_data = db.read_text(DATASET_FILEPATH)

In [5]:
raw_data

dask.bag<bag-fro..., npartitions=1>

Now we have our data file loaded into the ```raw_data dask.bag``` object. 

Without getting into [Dask Bag methods and top level user functions](https://dask.pydata.org/en/latest/bag-api.html), the most basic thing we can do to check that we got our bag contents right is to ```count()``` the number of lines loaded from the file into the bag.

In [6]:
raw_data.count().compute()

[########################################] | 100% Completed |  0.9s


494021

We can also check the first few entries in our data. 

In [7]:
raw_data.take(5)

[########################################] | 100% Completed |  0.1s


('0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.\n',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.\n',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.\n',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.\n',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.\n')

In the following notebooks, we will use this raw data to learn about the different functionalities of Dask. 

## Creating a dask bag using ```from_sequence()```

Another way of creating a dask bag is to use an existing Python iterable:

In [8]:
a = range(100)
data = db.from_sequence(a)

In [9]:
data

dask.bag<from_se..., npartitions=100>

We can control the number of partitions into which data is partitionned:

In [10]:
data = db.from_sequence(a, npartitions=10)

In [11]:
data

dask.bag<from_se..., npartitions=10>

As we did before, we can ```count()``` the number of elements in the bag.

In [12]:
data.count()

<dask.bag.core.Item at 0x7fc1daafc400>

Notice that by calling ```count()``` returns a delayed object. In other words, nothing is executed until we tell dask to the actual computation with ```compute()```. This is also known as **lazy evaluation**.

In [13]:
data.count().compute()

[########################################] | 100% Completed |  0.1s


100

As before, we can access the first few elements in our bag. 

In [14]:
data.take(10)

[########################################] | 100% Completed |  0.1s


(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

## Further Reading

This section provides more resources on the topic if you are looking to go deeper.

**Docs:**
-  [**Create Dask Bags**](https://dask.pydata.org/en/latest/bag-creation.html)

In [15]:
%load_ext version_information
%version_information dask

Software,Version
Python,3.6.4 64bit [GCC 4.8.2 20140120 (Red Hat 4.8.2-15)]
IPython,6.2.1
OS,Linux 4.13.0 32 generic x86_64 with debian stretch sid
dask,0.16.1
Fri Feb 16 23:30:13 2018 CST,Fri Feb 16 23:30:13 2018 CST
