### Import Libraries

In [14]:
import pymongoarrow
from pymongo import MongoClient
import pandas as pd
import numpy as np
import pyarrow
import pprint
from datetime import datetime
import os
from dotenv import load_dotenv

### Reading enviroment variables

In [15]:
load_dotenv()
MONGODB_URI = os.environ['MONGODB_URI_LOCAL']


### Establish connection with our MongoDB database running locally

In [16]:
client = MongoClient(MONGODB_URI)

In [17]:
db = client.newDB
col = db.newCol

### Create a list of documents

In [18]:
data = [
    {'_id': 1, 'measure': 43, 'status': 'active', 'installed_on': datetime(2022, 1, 8, 3, 43, 12)},
    {'_id': 2, 'measure': 32, 'status': 'active', 'installed_on': datetime(2022, 1, 2, 11, 43, 27)},
    {'_id': 3, 'measure': 62, 'status': 'inactive', 'installed_on': datetime(2022, 3, 12, 3, 53, 12)},
    {'_id': 4, 'measure': 59, 'status': 'active', 'installed_on': datetime(2022, 4, 8, 3, 22, 45)},
    ]

In [19]:
col.insert_many(data)

<pymongo.results.InsertManyResult at 0x221feea89d0>

In [20]:
for doc in col.find({}):
    pprint.pprint(doc)

{'_id': 1,
 'installed_on': datetime.datetime(2022, 1, 8, 3, 43, 12),
 'measure': 43,
 'status': 'active'}
{'_id': 2,
 'installed_on': datetime.datetime(2022, 1, 2, 11, 43, 27),
 'measure': 32,
 'status': 'active'}
{'_id': 3,
 'installed_on': datetime.datetime(2022, 3, 12, 3, 53, 12),
 'measure': 62,
 'status': 'inactive'}
{'_id': 4,
 'installed_on': datetime.datetime(2022, 4, 8, 3, 22, 45),
 'measure': 59,
 'status': 'active'}


### Using pandas do retrieve data from MongoDB and create a Dataframe

In [21]:
# let`s patch pymongo in place
from pymongoarrow.monkey import patch_all
patch_all()

In [22]:
query = {'measure': {'$gt': 40}}
df = col.find_pandas_all(query)

In [23]:
print(df)

   _id  measure    status        installed_on
0    1       43    active 2022-01-08 03:43:12
1    3       62  inactive 2022-03-12 03:53:12
2    4       59    active 2022-04-08 03:22:45


### Defining a Schema

In [24]:
from pymongoarrow.api import Schema

### Transfer MongoDB data into a numpy array

In [25]:
# schema definition
schema = Schema({'_id': int, 'measure': float, 'status': str, 'installed_on': datetime})
query = {'measure': {'$gt': 40}}

In [26]:
npa = col.find_numpy_all(query, schema=schema)

In [27]:
print(npa)

{'_id': array([1, 3, 4], dtype=int64), 'measure': array([43., 62., 59.]), 'status': array(['active', 'inactive', 'active'], dtype='<U8'), 'installed_on': array(['2022-01-08T03:43:12.000', '2022-03-12T03:53:12.000',
       '2022-04-08T03:22:45.000'], dtype='datetime64[ms]')}


### Transfer MongoDB data into an arrow table

In [28]:
arrow_table = col.find_arrow_all({})

In [29]:
print(arrow_table)

pyarrow.Table
_id: int32
measure: int32
status: string
installed_on: timestamp[ms]
----
_id: [[1,2,3,4]]
measure: [[43,32,62,59]]
status: [["active","active","inactive","active"]]
installed_on: [[2022-01-08 03:43:12.000,2022-01-02 11:43:27.000,2022-03-12 03:53:12.000,2022-04-08 03:22:45.000]]


### PyMongoArrow`s aggregate operations

 - aggregate_pandas_all()
 - aggregate_numpy_all()
 - aggregate_arrow_all()

In [31]:
pipeline = [
    {'$match': {'measure': {'$gt': 40}}}
]
df_agg = col.aggregate_pandas_all(pipeline, schema=schema)


In [32]:
df_agg

Unnamed: 0,_id,measure,status,installed_on
0,1,43.0,active,2022-01-08 03:43:12
1,3,62.0,inactive,2022-03-12 03:53:12
2,4,59.0,active,2022-04-08 03:22:45


### Writing data back to MongoDB

In [33]:
from pymongoarrow.api import write

In [34]:
# write dataframe data back to MongoDB
write(db.pandas_data, df)

{'insertedCount': 3}

In [35]:
for doc in db.pandas_data.find({}):
    pprint.pprint(doc)

{'_id': 1,
 'installed_on': datetime.datetime(2022, 1, 8, 3, 43, 12),
 'measure': 43,
 'status': 'active'}
{'_id': 3,
 'installed_on': datetime.datetime(2022, 3, 12, 3, 53, 12),
 'measure': 62,
 'status': 'inactive'}
{'_id': 4,
 'installed_on': datetime.datetime(2022, 4, 8, 3, 22, 45),
 'measure': 59,
 'status': 'active'}


In [36]:
# write numpy array data back to MongoDB
write(db.numpy_data, npa)

{'insertedCount': 3}

In [37]:
for doc in db.numpy_data.find({}):
    pprint.pprint(doc)

{'_id': 1,
 'installed_on': datetime.datetime(2022, 1, 8, 3, 43, 12),
 'measure': 43.0,
 'status': 'active'}
{'_id': 3,
 'installed_on': datetime.datetime(2022, 3, 12, 3, 53, 12),
 'measure': 62.0,
 'status': 'inactive'}
{'_id': 4,
 'installed_on': datetime.datetime(2022, 4, 8, 3, 22, 45),
 'measure': 59.0,
 'status': 'active'}


In [38]:
# write arrow table data back to MongoDB
write(db.arrow_data, arrow_table)

{'insertedCount': 4}

In [39]:
for doc in db.arrow_data.find({}):
    pprint.pprint(doc)

{'_id': 1,
 'installed_on': datetime.datetime(2022, 1, 8, 3, 43, 12),
 'measure': 43,
 'status': 'active'}
{'_id': 2,
 'installed_on': datetime.datetime(2022, 1, 2, 11, 43, 27),
 'measure': 32,
 'status': 'active'}
{'_id': 3,
 'installed_on': datetime.datetime(2022, 3, 12, 3, 53, 12),
 'measure': 62,
 'status': 'inactive'}
{'_id': 4,
 'installed_on': datetime.datetime(2022, 4, 8, 3, 22, 45),
 'measure': 59,
 'status': 'active'}
