<a href="https://colab.research.google.com/github/MathRunner7/MongoDB_Learning/blob/main/PyMongoArrow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction
This colab notebook shows an example of how to use PyMongoArrow driver to perfrom operations on MongoDB dataset using Pandas, NumPy and ApacheArrow
For this task `pymongoarrow` standard package is used

In [1]:
#@title Download Necessary libraries
!pip install pymongo[srv]
!pip install pymongoarrow



In [2]:
#@title Import Libraries
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from pymongoarrow.monkey import patch_all
import pandas as pd
import numpy as np
import os
import pprint

# Import ObjectID from bson package (part of PyMongo distribution) to enable querying by ObjectID
from bson.objectid import ObjectId

In [3]:
#@title Establish and verify a connection to MongoDB Database
from google.colab import userdata
MONGODB_URI = userdata.get('URI')

# Create a new client and connect to the server
client = MongoClient(MONGODB_URI, server_api=ServerApi('1'))

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
    print("Connected Cluster is: ")
    print("Connected Databases are: ")
    for db in client.list_database_names():
        print(db)
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!
Connected Cluster is: 
Connected Databases are: 
blog
sample_airbnb
sample_analytics
sample_geospatial
sample_guides
sample_mflix
sample_restaurants
sample_supplies
sample_training
sample_weatherdata
admin
local


# Basic MongoClient Operations

In [4]:
# Address of the cluster
print('Address of the cluster:', client.address)
print('='*100)
# List of all connected node
print('List of all connected nodes: ', client.nodes)
print('='*100)
# Primary member of the cluster
print('Host, Port of Primary node:', client.primary)
print('='*100)
# All scondary members of the cluster
print('Host, Port of all secondary nodes:', client.secondaries)
print('='*100)
# Arbiters in the replica set
print('Arbiters in the replica set. Empty if no replica connected: ', client.arbiters)
print('='*100)
# If client accept write request or not
print('Does this client accept write request?: ', client.is_primary)
print('='*100)
# If client is connected to Mongos or not
print('Is this client connected to Mongos?: ', client.is_mongos)
print('='*100)

Address of the cluster: ('myatlasclusteredu-shard-00-02.bcqr7.mongodb.net', 27017)
List of all connected nodes:  frozenset({('myatlasclusteredu-shard-00-00.bcqr7.mongodb.net', 27017), ('myatlasclusteredu-shard-00-02.bcqr7.mongodb.net', 27017), ('myatlasclusteredu-shard-00-01.bcqr7.mongodb.net', 27017)})
Host, Port of Primary node: ('myatlasclusteredu-shard-00-02.bcqr7.mongodb.net', 27017)
Host, Port of all secondary nodes: {('myatlasclusteredu-shard-00-00.bcqr7.mongodb.net', 27017), ('myatlasclusteredu-shard-00-01.bcqr7.mongodb.net', 27017)}
Arbiters in the replica set. Empty if no replica connected:  set()
Does this client accept write request?:  True
Is this client connected to Mongos?:  False


In [5]:
# List of all databases in connected cluster
for db in client.list_databases():
  print(db)

{'name': 'blog', 'sizeOnDisk': 81920, 'empty': False}
{'name': 'sample_airbnb', 'sizeOnDisk': 58433536, 'empty': False}
{'name': 'sample_analytics', 'sizeOnDisk': 10063872, 'empty': False}
{'name': 'sample_geospatial', 'sizeOnDisk': 1531904, 'empty': False}
{'name': 'sample_guides', 'sizeOnDisk': 73728, 'empty': False}
{'name': 'sample_mflix', 'sizeOnDisk': 114925568, 'empty': False}
{'name': 'sample_restaurants', 'sizeOnDisk': 8441856, 'empty': False}
{'name': 'sample_supplies', 'sizeOnDisk': 1122304, 'empty': False}
{'name': 'sample_training', 'sizeOnDisk': 53059584, 'empty': False}
{'name': 'sample_weatherdata', 'sizeOnDisk': 2908160, 'empty': False}
{'name': 'admin', 'sizeOnDisk': 253952, 'empty': False}
{'name': 'local', 'sizeOnDisk': 43637559296, 'empty': False}


In [6]:
# Name of all databases in connected cluster
for name in client.list_database_names():
  print(name)

blog
sample_airbnb
sample_analytics
sample_geospatial
sample_guides
sample_mflix
sample_restaurants
sample_supplies
sample_training
sample_weatherdata
admin
local


In [7]:
# Drop a database from a cluster
# client.drop_database(name_of_database)

In [8]:
try:
  client.get_default_database()
  client.get_database()
except Exception as e:
  print('Error occured:', e)

Error occured: No default database name defined or provided.


In [9]:
# Server Info
print('Information about the MongoDB server we’re connected to: ', client.server_info())

Information about the MongoDB server we’re connected to:  {'version': '8.0.4', 'gitVersion': 'bc35ab4305d9920d9d0491c1c9ef9b72383d31f9', 'modules': ['enterprise'], 'allocator': 'tcmalloc-google', 'javascriptEngine': 'mozjs', 'sysInfo': 'deprecated', 'versionArray': [8, 0, 4, 0], 'bits': 64, 'debug': False, 'maxBsonObjectSize': 16777216, 'storageEngines': ['devnull', 'inMemory', 'queryable_wt', 'wiredTiger'], 'ok': 1.0, '$clusterTime': {'clusterTime': Timestamp(1736940131, 371), 'signature': {'hash': b'\xf2\xe6\xe8`\x17\x1fC\xc8\xaf\x86b\xca\x8ab\xe58nv\x89 ', 'keyId': 7456853886385520646}}, 'operationTime': Timestamp(1736940131, 371)}


In [10]:
client.watch()

<pymongo.synchronous.change_stream.ClusterChangeStream at 0x783b0b397850>

# Working with Pandas/NumPy/Arrow

## MongoDB to pandas

In [15]:
#@title Fetch MongoDB collection to pandas dataframe
# Define database of connected cluster
db = client.sample_training
# Define collection name in connected database
collection = db.grades

patch_all()
# Standard method to fetch MongoDB collection into Pandas Dataframe is <collection>.find_pandas_all(<filter_query>, schema=defined_schema)
df = collection.find_pandas_all({}) # {} is an empty filter which will return all the documents from collection
df

Unnamed: 0,_id,student_id,scores,class_id
0,56d5f7eb604eb380b0d8d8d4,0.0,"[{'type': 'exam', 'score': 20.2317531451231}, ...",57.0
1,56d5f7eb604eb380b0d8d8d6,0.0,"[{'type': 'exam', 'score': 25.926204502143857}...",108.0
2,56d5f7eb604eb380b0d8d8d9,1.0,"[{'type': 'exam', 'score': 4.573007773049065},...",329.0
3,56d5f7eb604eb380b0d8d8dd,1.0,"[{'type': 'exam', 'score': 79.51115675153915},...",176.0
4,56d5f7eb604eb380b0d8d8e5,2.0,"[{'type': 'exam', 'score': 89.1838715782135}, ...",452.0
...,...,...,...,...
99995,56d5f7f1604eb380b0da5f59,9997.0,"[{'type': 'exam', 'score': 14.53874125559851},...",13.0
99996,56d5f7f1604eb380b0da5f64,9999.0,"[{'type': 'exam', 'score': 42.457223636253985}...",450.0
99997,56d5f7f1604eb380b0da5f66,9999.0,"[{'type': 'exam', 'score': 5.622441758929597},...",80.0
99998,56d5f7f1604eb380b0da5f60,9998.0,"[{'type': 'exam', 'score': 69.17238315773703},...",412.0


In [63]:
#@title Directly fetch aggregated data to Pandas DF
aggregation_pipeline =[
    # Stage-1
      {'$unwind': '$scores'},                               # Convert separate document for each element in socres array
    # Stage-2
      {'$match': {'scores.type': {'$in':['exam','quiz']}}}, # Filter out only those entries where exam type if exam and quiz
    # Stage-3
      {'$group': {                                          # Group the data
            '_id':{'student_id':'$student_id',              # First on student id
                   'exam_type':'$scores.type'},             # and then on Exam type
            'avg_score': {'$avg': '$scores.score'},         # Find average score for each student_id and exam_type pair
            'total_score': {'$sum': '$scores.score'}}},     # Find total score for each student_id and exam_type pair
    # Stage-3
      {'$sort': {'_id.student_id': 1,                       # Sort the data in ascending order for student_id
                 '_id.exam_type':-1}},                      # Sort the data in descending order for exam type
    # Stage-4
      {'$project': {                                        # Define which fields to include in final output documents
                    '_id':0,                                # 0 means exclude this field (_id)
                    'student':'$_id.student_id',            # student field will have values as student_id from _id field of grouped data
                    'exam':'$_id.exam_type',                # exam field will have values as exam_type from _id field of grouped data
                    'avg_score':1,                          # 1 means show this field (avg_score)
                    'total_score':1}}                       # 1 means show this field (total_score)
    ]
df_agg = collection.aggregate_pandas_all(aggregation_pipeline)
df_agg

Unnamed: 0,avg_score,total_score,student,exam
0,59.131906,591.319059,0.0,quiz
1,50.248797,502.487965,0.0,exam
2,56.731649,567.316490,1.0,quiz
3,48.071900,480.719000,1.0,exam
4,53.471985,534.719854,2.0,quiz
...,...,...,...,...
19995,43.765651,437.656512,9997.0,exam
19996,60.879808,608.798082,9998.0,quiz
19997,51.088235,510.882346,9998.0,exam
19998,52.006571,520.065706,9999.0,quiz


Similar to pandas we can fetch data to NumPy and Arrow using
1. `<collection>.find_numpy_all()` or `<collection>.aggregate_numpy_all()`
2. `<collection>.find_arrow_all()` or `<collection>.aggregate_arrow_all()`

## Python to MongoDB

In [65]:
from pymongoarrow.api import write
# write(collection_name, pandas df / numpy array / arrow table)
write(db.pandas_student_score, df_agg)

{'insertedCount': 20000}

In [67]:
i = 10
for doc in db.pandas_student_score.find():
  print(doc)
  if i == 0:
    break
  i -= 1

{'_id': ObjectId('6787b098d8e9d4deec094157'), 'avg_score': 59.13190591017333, 'total_score': 591.3190591017333, 'student': 0.0, 'exam': 'quiz'}
{'_id': ObjectId('6787b098d8e9d4deec094158'), 'avg_score': 50.24879651902832, 'total_score': 502.4879651902832, 'student': 0.0, 'exam': 'exam'}
{'_id': ObjectId('6787b098d8e9d4deec094159'), 'avg_score': 56.73164904756372, 'total_score': 567.3164904756372, 'student': 1.0, 'exam': 'quiz'}
{'_id': ObjectId('6787b098d8e9d4deec09415a'), 'avg_score': 48.07190004642783, 'total_score': 480.7190004642783, 'student': 1.0, 'exam': 'exam'}
{'_id': ObjectId('6787b098d8e9d4deec09415b'), 'avg_score': 53.471985379242845, 'total_score': 534.7198537924285, 'student': 2.0, 'exam': 'quiz'}
{'_id': ObjectId('6787b098d8e9d4deec09415c'), 'avg_score': 43.93769193631586, 'total_score': 439.37691936315855, 'student': 2.0, 'exam': 'exam'}
{'_id': ObjectId('6787b098d8e9d4deec09415d'), 'avg_score': 44.64886533444073, 'total_score': 446.4886533444073, 'student': 3.0, 'exam'

In [68]:
client.close()