In [1]:
import pymongo
from pymongo import MongoClient

## Let's generate some data

In [2]:
import random

In [3]:
def gen_data(size=10):
    records = [
        {
            "name": random.choice(["fred", "wilma", "barney", "betty"]),
            "number": random.randint(0, 100),
            "idx": i,
        }
        for i in range(size)
    ]
    return records

In [4]:
records = gen_data(size=100000)

Let's take a sneak peak on how this data looks like. You might notice that is a list of dictionaries. This looks a list of `documents` that you might insert into a `collection` in a `Mongo` database. 

In [5]:
records[:10]

[{'name': 'fred', 'number': 27, 'idx': 0},
 {'name': 'betty', 'number': 35, 'idx': 1},
 {'name': 'betty', 'number': 58, 'idx': 2},
 {'name': 'barney', 'number': 2, 'idx': 3},
 {'name': 'betty', 'number': 50, 'idx': 4},
 {'name': 'barney', 'number': 74, 'idx': 5},
 {'name': 'barney', 'number': 96, 'idx': 6},
 {'name': 'fred', 'number': 30, 'idx': 7},
 {'name': 'fred', 'number': 78, 'idx': 8},
 {'name': 'fred', 'number': 77, 'idx': 9}]

### Write and Read using PyMongo

Let's see how to write and read these records using a `PyMongo` workflow. You can start a local serve which by default will be on `"host":"localhost"` and `"port":27017` or you can  try to [`MongoDB Atlas`](https://docs.atlas.mongodb.com/tutorial/deploy-free-tier-cluster/) and deploy a free cluster there. For the latter, you'll need use a URI string with your MongoDB deployment's connection string.

For example if you create a cluster called `testcluster` and the URI looks something like this: 
```"mongodb+srv://<username>:<password>@testcluster.uj8nk.mongodb.net/myFirstDatabase?retryWrites=true&w=majority"```

Notes: 
- For the last option you will need to install `dnspython` -> `pip install dnspython` to be able to connect to your cluster.

- If you are using a free instance of Mongo Atlas, keep in mind that there are some limitation. For example the `allowDiskUse` option for the aggregation operation is not available. There is a sort in memory limit that is of 32MB.  This is important to keep in mind since `read_mongo` relies on the `aggregation` operation. It seems that this limits are preventing us to try bigger use cases on the M0 free tier. 

Let's connect to our cluster:

In [6]:
#replace this for your URI connection
host_uri = "mongodb+srv://<username>:<password>@<cluster-address>/myFirstDatabase?retryWrites=true&w=majority"
mongo_client = MongoClient(host_uri)

We create a cdatabase and a collection, but notice that tehy won't exist until we put some documents on it.

In [7]:
database = mongo_client["db_test"]
collection = database["coll_test"]

In [8]:
mongo_client.list_database_names()

['admin', 'local']

**Write to mongo with `pymongo`**

We'll use the `insert_many` comand to write to our database. 

In [9]:
%%time
collection.insert_many(records)

CPU times: user 430 ms, sys: 23.2 ms, total: 453 ms
Wall time: 12.2 s


<pymongo.results.InsertManyResult at 0x10c5c6180>

It took less than two seconds to write ten thousand documents. Now you can see our `db_test` in the list of databases. 

In [10]:
mongo_client.list_database_names()

['db_test', 'admin', 'local']

**Read from mongo with `pymongo`**

In [11]:
%%time
my_records = list(collection.find())  

CPU times: user 245 ms, sys: 28.9 ms, total: 274 ms
Wall time: 1.19 s


In [12]:
len(my_records)

100000

We can drop things from our database by doing:

In [13]:
database.drop_collection("coll_test")

{'nIndexesWas': 1,
 'ns': 'db_test.coll_test',
 'ok': 1.0,
 '$clusterTime': {'clusterTime': Timestamp(1626736995, 3),
  'signature': {'hash': b'qT\xd7!\xfe\xf0Ho:\xd9?@\xd2\x9b\xfd\x1b\xcc\xc1\x8f@',
   'keyId': 6932492981463154689}},
 'operationTime': Timestamp(1626736995, 3)}

## Write and read with `dask-mongo`

In [14]:
from dask_mongo import to_mongo, read_mongo
import dask.bag as db

In [15]:
from distributed import Client

In [16]:
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Status: running,Using processes: True
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads:  8,Total memory:  16.00 GiB

0,1
Comm: tcp://127.0.0.1:60893,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads:  8
Started:  Just now,Total memory:  16.00 GiB

0,1
Comm: tcp://127.0.0.1:60901,Total threads: 2
Dashboard: http://127.0.0.1:60903/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:60896,
Local directory: /Users/ncclementi/Documents/git/my_forks/dask-mongo/examples/dask-worker-space/worker-l07mju9r,Local directory: /Users/ncclementi/Documents/git/my_forks/dask-mongo/examples/dask-worker-space/worker-l07mju9r

0,1
Comm: tcp://127.0.0.1:60899,Total threads: 2
Dashboard: http://127.0.0.1:60900/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:60895,
Local directory: /Users/ncclementi/Documents/git/my_forks/dask-mongo/examples/dask-worker-space/worker-jjkx4q15,Local directory: /Users/ncclementi/Documents/git/my_forks/dask-mongo/examples/dask-worker-space/worker-jjkx4q15

0,1
Comm: tcp://127.0.0.1:60905,Total threads: 2
Dashboard: http://127.0.0.1:60906/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:60897,
Local directory: /Users/ncclementi/Documents/git/my_forks/dask-mongo/examples/dask-worker-space/worker-xk563v0a,Local directory: /Users/ncclementi/Documents/git/my_forks/dask-mongo/examples/dask-worker-space/worker-xk563v0a

0,1
Comm: tcp://127.0.0.1:60908,Total threads: 2
Dashboard: http://127.0.0.1:60909/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:60898,
Local directory: /Users/ncclementi/Documents/git/my_forks/dask-mongo/examples/dask-worker-space/worker-l_fvzsrf,Local directory: /Users/ncclementi/Documents/git/my_forks/dask-mongo/examples/dask-worker-space/worker-l_fvzsrf


To be able to write to mongo we need to convert our list of records into a [`dask.bag`](https://docs.dask.org/en/latest/bag.html) and specify how many partitions we want.

In [17]:
npartitions = 10
records_bag = db.from_sequence(records, npartitions=npartitions)

**`to_mongo`** 

If the database and colections you pass do not exist they'll be created. 

In [18]:
%%time
to_mongo(records_bag,  
         database='new_database', 
         collection='new_collection',
         connection_kwargs={"host": host_uri})

CPU times: user 2.44 s, sys: 196 ms, total: 2.64 s
Wall time: 18.6 s


If you check your collections in your atlas cluster, you should now see a new database called `"new_database"`. Let's try to read some data using `dask-mongo`

**`read_mongo`**

In [19]:
read_bag = read_mongo(connection_kwargs={"host": host_uri}, 
                database='new_database', 
                collection='new_collection',
                chunksize=500)

In [20]:
read_bag

dask.bag<read_mongo, npartitions=200>

In [21]:
%%time
records_list = read_bag.compute()

CPU times: user 1.54 s, sys: 171 ms, total: 1.71 s
Wall time: 18.9 s


**Read with match**

You can also read only a protion of your data records by providing a `match`. For example we can only bring the records where `idx` is bigger than 100 and smaller than 10000. 

In [22]:
read_bag_filtered = read_mongo(connection_kwargs={"host": host_uri}, 
                database='new_database', 
                collection='new_collection',
                chunksize=500, 
                match={"idx": {"$gt": 100, "$lt":110}})

In [23]:
%%time
records_list_filtered = read_bag_filtered.compute()

CPU times: user 30.1 ms, sys: 5.61 ms, total: 35.7 ms
Wall time: 335 ms


In [24]:
records_list_filtered

[{'_id': ObjectId('60f6093e562ff3ba3c06f12d'),
  'name': 'barney',
  'number': 68,
  'idx': 101},
 {'_id': ObjectId('60f6093e562ff3ba3c06f12e'),
  'name': 'wilma',
  'number': 63,
  'idx': 102},
 {'_id': ObjectId('60f6093e562ff3ba3c06f12f'),
  'name': 'fred',
  'number': 87,
  'idx': 103},
 {'_id': ObjectId('60f6093e562ff3ba3c06f130'),
  'name': 'barney',
  'number': 10,
  'idx': 104},
 {'_id': ObjectId('60f6093e562ff3ba3c06f131'),
  'name': 'fred',
  'number': 48,
  'idx': 105},
 {'_id': ObjectId('60f6093e562ff3ba3c06f132'),
  'name': 'betty',
  'number': 96,
  'idx': 106},
 {'_id': ObjectId('60f6093e562ff3ba3c06f133'),
  'name': 'betty',
  'number': 9,
  'idx': 107},
 {'_id': ObjectId('60f6093e562ff3ba3c06f134'),
  'name': 'fred',
  'number': 100,
  'idx': 108},
 {'_id': ObjectId('60f6093e562ff3ba3c06f135'),
  'name': 'fred',
  'number': 21,
  'idx': 109}]

### Drop collection if you want your cluster clean

In [25]:
mongo_client["new_database"].drop_collection("new_collection")

{'nIndexesWas': 1,
 'ns': 'new_database.new_collection',
 'ok': 1.0,
 '$clusterTime': {'clusterTime': Timestamp(1626737053, 2),
  'signature': {'hash': b'\x9b/M\xccO\xeb\xf2\xf1\xae\x9a\x1aeU\x84O\x85\xe3R\xedv',
   'keyId': 6932492981463154689}},
 'operationTime': Timestamp(1626737053, 2)}