In [1]:
!pip install pymongo



In [1]:
from pymongo import MongoClient
import datetime

In [2]:
client = MongoClient()

The above code will connect on the default host and port. We can also specify the host and port explicitly, as follows:

In [3]:
client = MongoClient(host="localhost", port=27017)
# OR
client = MongoClient("mongodb://localhost:27017")

client

MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True)

**Getting a Database**
Once you have a connected instance of MongoClient, 
you can access any database managed by the specified MongoDB server. 
To define which database you want to use, you can use the dot notation:

In [4]:
db = client.test_database
# OR 
db = client["test_database"]
#This statement is handy when the name of your database isn’t a valid Python identifier.

db

Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'test_database')

In this case, **newCollection** is an instance of **Collection** and represents a physical collection of documents in your database. You can insert documents into tutorial by calling `.insert_one()` on it with a document as an argument:

In [5]:
collection = db.test_collection
#OR
collection = db['test_collection']
#This statement is handy when the name of your collection isn’t a valid Python identifier.

collection

Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'test_database'), 'test_collection')

**Sample Document**
Following example shows the document structure of a blog site, which is simply a comma separated key value pair.

In [6]:
post = {"author": "Mike",
        "text": "My first blog post!",
        "tags": ["mongodb", "python", "pymongo"],
        "date": datetime.datetime.utcnow()}

In [7]:
posts = db.posts
post_id = posts.insert_one(post).inserted_id
post_id

ObjectId('628de76d9c628e0a3f69ff33')

In [9]:
import pprint
pprint.pprint(posts.find_one())

{'_id': ObjectId('628de76d9c628e0a3f69ff33'),
 'author': 'Mike',
 'date': datetime.datetime(2022, 5, 25, 8, 16, 48, 627000),
 'tags': ['mongodb', 'python', 'pymongo'],
 'text': 'My first blog post!'}


In [90]:
item2 = {
    "title": "Python's Requests Library (Guide)",
    "author": "Alex",
    "contributors": [
        "Aldren",
        "Brad",
        "Joanna"
    ],
    "url": "https://realpython.com/python-requests/"
}

item3 = {
    "title": "Object-Oriented Programming (OOP) in Python 3",
    "author": "David",
    "contributors": [
        "Aldren",
        "Joanna",
        "Jacob"
    ],
    "url": "https://realpython.com/python3-object-oriented-programming/"
}

In [91]:
new_result = newCollection.insert_many([item2, item3])

print(f"Multiple items: {new_result.inserted_ids}")

NameError: name 'newCollection' is not defined

This is faster and more straightforward than calling `.insert_one()` multiple times. The call to `.insert_many()` takes an iterable of documents and inserts them into the tutorial collection in your rptutorials database.

***Querying for More Than One Document*** 

To retrieve documents from a collection, you can use `.find()`. Without arguments, `.find()` returns a Cursor object that yields the documents in the collection on demand:

In [39]:
for doc in newCollection.find():
    print(doc)

{'_id': ObjectId('6220899c61cdcb1411c5421d'), 'title': 'Working With JSON Data in Python', 'author': 'Lucas', 'contributors': ['Aldren', 'Dan', 'Joanna'], 'url': 'https://realpython.com/python-json/'}
{'_id': ObjectId('622089ca61cdcb1411c5421e'), 'title': 'Working With JSON Data in Python', 'author': 'Lucas', 'contributors': ['Aldrenn', 'Dan', 'Joanna'], 'url': 'https://realpython.com/python-json/'}
{'_id': ObjectId('62208aa561cdcb1411c5421f'), 'title': "Python's Requests Library (Guide)", 'author': 'Alex', 'contributors': ['Aldren', 'Brad', 'Joanna'], 'url': 'https://realpython.com/python-requests/'}
{'_id': ObjectId('62208aa561cdcb1411c54220'), 'title': 'Object-Oriented Programming (OOP) in Python 3', 'author': 'David', 'contributors': ['Aldren', 'Joanna', 'Jacob'], 'url': 'https://realpython.com/python3-object-oriented-programming/'}


***Querying for One Document*** 

You can also use `.find_one()` to retrieve a single document.

In [36]:
jon_item = newCollection.find_one({"author": "Alex"})
print(jon_item)

{'_id': ObjectId('622209d6d58b2d12b2950138'), 'title': "Python's Requests Library (Guide)", 'author': 'Alex', 'contributors': ['Aldren', 'Brad', 'Joanna'], 'url': 'https://realpython.com/python-requests/'}


In this kind of situation, you should keep your connection alive and only close it before exiting the application to clear all the acquired resources.

***Counting***

If we just want to know how many documents match a query we can perform a `count_documents()` operation instead of a full query. We can get a count of all of the documents in a collection:

In [37]:
newCollection.count_documents({})

4

***Aggregation Examples***

There are several methods of performing aggregations in MongoDB. These examples cover the new aggregation framework, using map reduce and using the group method.

create a sample collection named inventory with the following document:

In [99]:
from pymongo import MongoClient
db = MongoClient().database_1
db.inventory.insert_one({"_id" : 2, "item" : "ABC1", "sizes": [ "S", "M", "L"]})

BulkWriteError: batch op errors occurred, full error: {'writeErrors': [{'index': 0, 'code': 11000, 'keyPattern': {'_id': 1}, 'keyValue': {'_id': 2}, 'errmsg': 'E11000 duplicate key error collection: database_1.inventory index: _id_ dup key: { _id: 2 }', 'op': {'_id': 2, 'item': 'ABC1', 'sizes': ['S', 'M', 'L']}}], 'writeConcernErrors': [], 'nInserted': 0, 'nUpserted': 0, 'nMatched': 0, 'nModified': 0, 'nRemoved': 0, 'upserted': []}

The following aggregation uses the $unwind stage to output a document for each element in the sizes array:

In [24]:
result = db.inventory.aggregate( [ { "$unwind": "$sizes" } ] )
print(list(result))

[{'_id': 2, 'item': 'ABC1', 'sizes': 'S'}, {'_id': 2, 'item': 'ABC1', 'sizes': 'M'}, {'_id': 2, 'item': 'ABC1', 'sizes': 'L'}]


In [43]:
from pymongo import MongoClient
db = MongoClient().aggregation_example
result = db.things.insert_many([{"x": 1, "tags": ["dog", "cat"]},
                                {"x": 2, "tags": ["cat"]},
                                {"x": 2, "tags": ["mouse", "cat", "dog"]},
                                {"x": 3, "tags": []}])
result.inserted_ids

[ObjectId('6222128bd58b2d12b295013b'),
 ObjectId('6222128bd58b2d12b295013c'),
 ObjectId('6222128bd58b2d12b295013d'),
 ObjectId('6222128bd58b2d12b295013e')]

As python dictionaries don’t maintain order you should use `SON` or `collections.OrderedDict` where explicit ordering is required eg `“$sort”`:

In [53]:
from bson.son import SON
pipeline = [
    {"$unwind": "$tags"},
    {"$group": {"_id": "$tags", "count": {"$sum": 1}}},
    {"$sort": SON([("count", -1), ("_id", -1)])}
]
result = db.things.aggregate( pipeline )

In [54]:
print(list(result))

[{'_id': 'cat', 'count': 3}, {'_id': 'dog', 'count': 2}, {'_id': 'mouse', 'count': 1}]


To run an explain plan for this aggregation use the `command()` method:

In [55]:
db.command('aggregate', 'things', pipeline=pipeline, explain=True)

{'explainVersion': '1',
 'stages': [{'$cursor': {'queryPlanner': {'namespace': 'aggregation_example.things',
     'indexFilterSet': False,
     'parsedQuery': {},
     'queryHash': 'EE638FFC',
     'planCacheKey': '92B1C2A0',
     'maxIndexedOrSolutionsReached': False,
     'maxIndexedAndSolutionsReached': False,
     'maxScansToExplodeReached': False,
     'winningPlan': {'stage': 'PROJECTION_SIMPLE',
      'transformBy': {'tags': 1, '_id': 0},
      'inputStage': {'stage': 'COLLSCAN', 'direction': 'forward'}},
     'rejectedPlans': []}}},
  {'$unwind': {'path': '$tags'}},
  {'$group': {'_id': '$tags', 'count': {'$sum': {'$const': 1}}}},
  {'$sort': {'sortKey': {'count': -1, '_id': -1}}}],
 'serverInfo': {'host': 'LAPTOP-86L5CLLU',
  'port': 27017,
  'version': '5.0.6',
  'gitVersion': '212a8dbb47f07427dae194a9c75baec1d81d9259'},
 'serverParameters': {'internalQueryFacetBufferSizeBytes': 104857600,
  'internalQueryFacetMaxOutputDocSizeBytes': 104857600,
  'internalLookupStageIntermedi

In [42]:
client.close()

In [74]:
import pymongo
client = pymongo.MongoClient()
db = client['single_db']
collection = db['stock']

In [88]:
items = [
    {
     "_id": 1,
     "title": "x1",
     "author": "y1",
     "likes": 10
    },
    {
     "_id": 2,
     "title": "x2",
     "author": "y1",
     "likes": 20
    },
    {
     "_id": 3,
     "title": "x3",
     "author": "y3",
     "likes": 30
    }
]


In [89]:
collection.insert_many(items)

BulkWriteError: batch op errors occurred, full error: {'writeErrors': [{'index': 0, 'code': 11000, 'keyPattern': {'_id': 1}, 'keyValue': {'_id': 1}, 'errmsg': 'E11000 duplicate key error collection: single_db.stock index: _id_ dup key: { _id: 1 }', 'op': {'_id': 1, 'title': 'x1', 'author': 'y1', 'likes': 10}}], 'writeConcernErrors': [], 'nInserted': 0, 'nUpserted': 0, 'nMatched': 0, 'nModified': 0, 'nRemoved': 0, 'upserted': []}

In [107]:
pipeline = [
    {"$group": {"_id": "$author", "count": {"$sum": "$likes"}}},
]
agg = collection.aggregate(pipeline)
print(list(agg))
db.command('aggregate', 'collection', pipeline=pipeline, explain=True)

[{'_id': 'y3', 'count': 270}, {'_id': 'y1', 'count': 260}, {'_id': 'y2', 'count': 20}]


{'explainVersion': '1',
 'stages': [{'$cursor': {'queryPlanner': {'namespace': 'database_1.collection',
     'indexFilterSet': False,
     'parsedQuery': {},
     'maxIndexedOrSolutionsReached': False,
     'maxIndexedAndSolutionsReached': False,
     'maxScansToExplodeReached': False,
     'winningPlan': {'stage': 'EOF'},
     'rejectedPlans': []}}},
  {'$group': {'_id': '$author', 'count': {'$sum': '$likes'}}}],
 'serverInfo': {'host': 'LAPTOP-86L5CLLU',
  'port': 27017,
  'version': '5.0.6',
  'gitVersion': '212a8dbb47f07427dae194a9c75baec1d81d9259'},
 'serverParameters': {'internalQueryFacetBufferSizeBytes': 104857600,
  'internalQueryFacetMaxOutputDocSizeBytes': 104857600,
  'internalLookupStageIntermediateDocumentMaxSizeBytes': 104857600,
  'internalDocumentSourceGroupMaxMemoryBytes': 104857600,
  'internalQueryMaxBlockingSortMemoryUsageBytes': 104857600,
  'internalQueryProhibitBlockingMergeOnMongoS': 0,
  'internalQueryMaxAddToSetBytes': 104857600,
  'internalDocumentSourceSetW

In [101]:
# spark = SparkSession.builder.appName("pyspark-notebook2")\
# .master("localhost")\
# .config("spark.executor.memory", "1g")\
# .config("spark.mongodb.input.uri","mongodb://mongo1:27017,mongo2:27018,mongo3:27019/Stocks.Source?replicaSet=rs0")\
# .config("spark.mongodb.output.uri","mongodb://mongo1:27017,mongo2:27018,mongo3:27019/Stocks.Source?replicaSet=rs0")\
# .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0")\
# .getOrCreate()