In [3]:
%pip install docker pymongo pandas

You should consider upgrading via the '/home/david/.pyenv/versions/3.8.6/envs/python-data-prep/bin/python3.8 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


Setup mongo and import data

In [4]:
import os
import docker

docker_client = docker.from_env()
MONGO_PORT = 27018

mongo_container = docker_client.containers.run(
    'mongo:latest',
    detach=True,
    name='mongo-data-prep',
    remove=True,
    ports={'27017/tcp': MONGO_PORT},
    mem_limit='2G',
    volumes={
        os.path.join(os.getcwd(), 'db'): {'bind': '/data/db', 'mode': 'rw'},
    }
)

In [5]:
from pymongo import MongoClient
mongo_client = MongoClient('127.0.0.1', MONGO_PORT)
db = mongo_client['data-prep']

In [6]:
sales_small_col = db.get_collection('sales-small')
sales_small_col.drop()

In [7]:
from pathlib import Path
import json
sales_small_data = json.loads(Path('/home/david/dev/toucan-toco/python-data-prep/data/sales-small.json').read_text())
sales_small_col.insert_many(sales_small_data['data'])

<pymongo.results.InsertManyResult at 0x7f506c771780>

In [8]:
import pandas as pd

1. Simulating a simple weaverbird pipeline with just a filter, on a very small domain:

In [9]:
'''
{
    "domain": "sales-small",
    "name": "domain"
},
'''
def get_domain(domain):
    return pd.DataFrame(db.get_collection(domain).find({}))
df = get_domain('sales-small')

In [10]:
'''
{
    "name": "filter",
    "condition": {
      "column": "Payment_Type",
      "operator": "eq",
      "value": "Mastercard"
    }
}
'''
def filter(df, column, value):
    return df[df[column] == value]
df1 = filter(df, 'Payment_Type', 'Mastercard')

In [11]:
%%time

'''
Whole pipeline
'''

df = get_domain('sales-small')
filter(df, 'Payment_Type', 'Mastercard')

CPU times: user 7.71 ms, sys: 0 ns, total: 7.71 ms
Wall time: 7.8 ms


Unnamed: 0,_id,Transaction_date,Product,Price,Payment_Type,Name,City,State,Country,Account_Created,Last_Login,Latitude,Longitude
1,5f9142baee2fc549d19201bd,1/5/09 4:10,Product1,1200,Mastercard,Nicola,Roodepoort,Gauteng,South Africa,1/5/09 2:33,1/7/09 5:13,-26.166667,27.866667
2,5f9142baee2fc549d19201be,1/4/09 13:17,Product1,1200,Mastercard,Renee Elisabeth,Tel Aviv,Tel Aviv,Israel,1/4/09 13:03,1/4/09 22:10,32.066667,34.766667
3,5f9142baee2fc549d19201bf,1/2/09 6:17,Product1,1200,Mastercard,carolina,Basildon,England,United Kingdom,1/2/09 6:00,1/2/09 6:08,51.5,-1.116667
6,5f9142baee2fc549d19201c2,1/5/09 8:58,Product2,3600,Mastercard,Marcia,Telgte,Nordrhein-Westfalen,Germany,9/1/08 3:39,1/14/09 2:07,52.333333,7.9
9,5f9142baee2fc549d19201c5,1/15/09 12:54,Product1,1200,Mastercard,Annelies,Ile-Perrot,Quebec,Canada,1/15/09 12:22,1/16/09 7:53,45.4,-73.933333
15,5f9142baee2fc549d19201cb,1/12/09 15:12,Product1,1200,Mastercard,David,Deptford,NJ,United States,1/12/09 14:07,1/19/09 3:47,39.83806,-75.15306
16,5f9142baee2fc549d19201cc,1/19/09 16:10,Product1,1200,Mastercard,Frank,Old Greenwich,CT,United States,1/19/09 15:31,1/19/09 16:00,41.02278,-73.56528
17,5f9142baee2fc549d19201cd,1/20/09 6:03,Product1,1200,Mastercard,Andrea,Shreveport,LA,United States,1/20/09 5:13,1/20/09 7:15,32.525,-93.75
26,5f9142baee2fc549d19201d6,1/4/09 9:54,Product1,1200,Mastercard,AMY,The Woodlands,TX,United States,12/30/08 20:41,1/25/09 18:23,30.15778,-95.48917
27,5f9142baee2fc549d19201d7,1/22/09 15:32,Product1,1200,Mastercard,Tara,Killiney,Dublin,Ireland,2/27/07 11:35,1/26/09 4:32,53.252222,-6.1125


2. Simulating a groupby, keeping the original format, on larger domain

In [12]:
sales_col = db.get_collection('sales')
sales_col.drop()
data = pd.read_csv('/home/david/dev/toucan-toco/python-data-prep/data/sales.csv').to_dict(orient='rows')
sales_col.insert_many(data)




<pymongo.results.InsertManyResult at 0x7f501d2cc240>

In [20]:
%%time

'''
[
  {
    "domain": "sales",
    "name": "domain"
  },
  {
    "name": "aggregate",
    "on": [
      "DATE"
    ],
    "aggregations": [
      {
        "columns": [
          "WEEKLY_SALES"
        ],
        "newcolumns": [
          "Transaction_date-sum"
        ],
        "aggfunction": "sum"
      }
    ],
    "keepOriginalGranularity": true
  }
]
'''

%time df = get_domain('sales')
%time df['WEEKLY_SALES-sum'] = df.groupby(by=['DATE'])['WEEKLY_SALES'].transform('sum')
df

CPU times: user 2.76 s, sys: 47.7 ms, total: 2.81 s
Wall time: 3.03 s
CPU times: user 80.2 ms, sys: 0 ns, total: 80.2 ms
Wall time: 81.9 ms
CPU times: user 2.84 s, sys: 47.8 ms, total: 2.89 s
Wall time: 3.11 s


Unnamed: 0,_id,STORE_ID,DEPT_ID,DATE,WEEKLY_SALES,WEEKLY_SALES-sum
0,5f9142bdee2fc549d19201ee,1,1,05/02/2010,24924.50,49750740.50
1,5f9142bdee2fc549d19201ef,1,1,12/02/2010,46039.49,48336677.63
2,5f9142bdee2fc549d19201f0,1,1,19/02/2010,41595.55,48276993.78
3,5f9142bdee2fc549d19201f1,1,1,26/02/2010,19403.54,43968571.13
4,5f9142bdee2fc549d19201f2,1,1,05/03/2010,21827.90,46871470.30
...,...,...,...,...,...,...
421565,5f9142c0ee2fc549d19870ab,45,98,28/09/2012,508.37,43734899.40
421566,5f9142c0ee2fc549d19870ac,45,98,05/10/2012,628.10,47566639.31
421567,5f9142c0ee2fc549d19870ad,45,98,12/10/2012,1061.02,46128514.25
421568,5f9142c0ee2fc549d19870ae,45,98,19/10/2012,760.01,45122410.57


In [19]:
%%time
'''Compare with mongo'''

sales_agg = [
    {
        "$group": {
            "_id": {
                "DATE": "$DATE"
            },
            "_vqbDocsArray": {
                "$push": "$$ROOT"
            },
            "WEEKLY_SALES-sum": {
                "$sum": "$WEEKLY_SALES"
            }
        }
    },
    {
        "$unwind": "$_vqbDocsArray"
    },
    {
        "$replaceRoot": {
            "newRoot": {
                "$mergeObjects": [
                    "$_vqbDocsArray",
                    "$$ROOT"
                ]
            }
        }
    },
    {
        "$project": {
            "_id": 0,
            "_vqbDocsArray": 0
        }
    }
]

# Memory fail without allowDiskUse
sales_col.aggregate(sales_agg)

OperationFailure: Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in., full error: {'ok': 0.0, 'errmsg': "Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in.", 'code': 292, 'codeName': 'QueryExceededMemoryLimitNoDiskUseAllowed'}

In [18]:
%%time
sales_col.aggregate(sales_agg, allowDiskUse=True)

CPU times: user 2.37 ms, sys: 156 µs, total: 2.52 ms
Wall time: 1.42 s


<pymongo.command_cursor.CommandCursor at 0x7f5018aa5e80>

Result:
- fail without allowDiskUse
- double time in python due to the extraction from mongo of the whole dataset (3s)
- the operation itself is notably faster in Python (80ms vs 1.5s)

=> we will need to find a smart way to cache the extraction, maybe in a parquet file or something else fast

### Sort
```json
{
    "name": "sort",
    "columns": [
      {
        "column": "DATE",
        "order": "asc"
      }
    ]
}
```


In [21]:
%%time

def sort(df: pd.DataFrame, columns: list) -> pd.DataFrame:
    return df.sort_values([c['column'] for c in columns], ascending=[(c.get('order') == 'desc') for c in columns])

%time df = get_domain('sales')
%time df1 = sort(df, [{'column': 'DATE', 'order': 'asc'}])
df1

CPU times: user 2.98 s, sys: 55.5 ms, total: 3.03 s
Wall time: 3.22 s
CPU times: user 886 ms, sys: 11.6 ms, total: 897 ms
Wall time: 892 ms
CPU times: user 3.86 s, sys: 67.1 ms, total: 3.93 s
Wall time: 4.11 s


Unnamed: 0,_id,STORE_ID,DEPT_ID,DATE,WEEKLY_SALES
404092,5f9142bfee2fc549d1982c6a,43,94,31/12/2010,41000.74
91966,5f9142bdee2fc549d193692c,10,33,31/12/2010,9978.04
341181,5f9142bfee2fc549d19736ab,36,8,31/12/2010,3362.24
416393,5f9142bfee2fc549d1985c77,45,33,31/12/2010,2924.12
123735,5f9142beee2fc549d193e545,13,48,31/12/2010,1309.00
...,...,...,...,...,...
286751,5f9142bfee2fc549d196620d,30,2,01/04/2011,13535.83
65225,5f9142bdee2fc549d19300b7,7,54,01/04/2011,28.00
184906,5f9142beee2fc549d194d438,19,74,01/04/2011,16831.26
392696,5f9142bfee2fc549d197ffe6,42,13,01/04/2011,14201.53


In [22]:
%%time
sales_sort = {
    "$sort": {
      "Account_Created": 1
    }
}
sales_col.aggregate(sales_agg)


OperationFailure: Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in., full error: {'ok': 0.0, 'errmsg': "Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in.", 'code': 292, 'codeName': 'QueryExceededMemoryLimitNoDiskUseAllowed'}

In [24]:
%%time
sales_col.aggregate(sales_agg, allowDiskUse=True)

CPU times: user 3.23 ms, sys: 104 µs, total: 3.33 ms
Wall time: 1.34 s


<pymongo.command_cursor.CommandCursor at 0x7f50178165b0>

Same conclusions:
- mongo needs allowDiskUse
- python operation notably faster than mongo (800ms vs 1.34s)
