# python操作mongodb

> [https://pymongo.readthedocs.io/en/stable/tutorial.html](https://pymongo.readthedocs.io/en/stable/tutorial.html)

In [2]:
# 初始化
import pymongo
from pymongo import MongoClient

# 建立连接
client = MongoClient() # 创建正在运行的mongod实例
# client2 = MongoClient("localhost" ,27017) # 显示指定主机和端口
# client3 = MongoClient("mongodb://localhost:27017/) # 显示指定主机和端口


In [3]:
# 获取数据库
db = client.test_database
# db2 = client["test-database"]

# 获取集合(文档，对应表)
collection = db.test_collection
# collection2 =db["test-collection"]

In [4]:
# 创建文档（字典表示，数据单元）
import datetime
post = {
    "author": "Mike",
    "text": "My first blog post!",
    "tags": ["mongodb", "python", "pymongo"],
    "date": datetime.datetime.now(tz=datetime.timezone.utc),
}

In [5]:
# 将文档插入集合（数据添加到表）
posts = db.posts
post_id = posts.insert_one(post).inserted_id
post_id

ObjectId('65e68dfd03a55a7bf54133c6')

In [6]:
# 查看集合、文档
import pprint

pprint.pprint(posts.find_one())
pprint.pprint(posts.find_one({"author": "Mike"}))
pprint.pprint(posts.find_one({"_id": post_id}))
post_id_as_str = str(post_id)
posts.find_one({"_id": post_id_as_str})  # No result


{'_id': ObjectId('65e68dfd03a55a7bf54133c6'),
 'author': 'Mike',
 'date': datetime.datetime(2024, 3, 5, 3, 13, 58, 390000),
 'tags': ['mongodb', 'python', 'pymongo'],
 'text': 'My first blog post!'}
{'_id': ObjectId('65e68dfd03a55a7bf54133c6'),
 'author': 'Mike',
 'date': datetime.datetime(2024, 3, 5, 3, 13, 58, 390000),
 'tags': ['mongodb', 'python', 'pymongo'],
 'text': 'My first blog post!'}
{'_id': ObjectId('65e68dfd03a55a7bf54133c6'),
 'author': 'Mike',
 'date': datetime.datetime(2024, 3, 5, 3, 13, 58, 390000),
 'tags': ['mongodb', 'python', 'pymongo'],
 'text': 'My first blog post!'}


In [7]:
posts.find_one({"_id": post_id_as_str})  # No result

In [8]:
# str转换为ObjectId
from bson.objectid import ObjectId

post_id = ObjectId(post_id_as_str)
posts.find_one({"_id": post_id})

{'_id': ObjectId('65e68dfd03a55a7bf54133c6'),
 'author': 'Mike',
 'text': 'My first blog post!',
 'tags': ['mongodb', 'python', 'pymongo'],
 'date': datetime.datetime(2024, 3, 5, 3, 13, 58, 390000)}

In [9]:
# 批量插入（返回ObjectId列表，模式自由，可随意增删文档）
new_posts = [
    {
        "author": "Mike",
        "text": "Another post!",
        "tags": ["bulk", "insert"],
        "date": datetime.datetime(2009, 11, 12, 11, 14),
    },
    {
        "author": "Eliot",
        "title": "MongoDB is fun",
        "text": "and pretty easy too!",
        "date": datetime.datetime(2009, 11, 10, 10, 45),
    },
]
result = posts.insert_many(new_posts)
result.inserted_ids

[ObjectId('65e690ee03a55a7bf54133c7'), ObjectId('65e690ee03a55a7bf54133c8')]

In [11]:
# 查询多个文档
for post in posts.find():
    pprint.pprint(post)
print()
for post in posts.find({"author":"Mike"}):
    pprint.pprint(post)

{'_id': ObjectId('65e68dfd03a55a7bf54133c6'),
 'author': 'Mike',
 'date': datetime.datetime(2024, 3, 5, 3, 13, 58, 390000),
 'tags': ['mongodb', 'python', 'pymongo'],
 'text': 'My first blog post!'}
{'_id': ObjectId('65e690ee03a55a7bf54133c7'),
 'author': 'Mike',
 'date': datetime.datetime(2009, 11, 12, 11, 14),
 'tags': ['bulk', 'insert'],
 'text': 'Another post!'}
{'_id': ObjectId('65e690ee03a55a7bf54133c8'),
 'author': 'Eliot',
 'date': datetime.datetime(2009, 11, 10, 10, 45),
 'text': 'and pretty easy too!',
 'title': 'MongoDB is fun'}

{'_id': ObjectId('65e68dfd03a55a7bf54133c6'),
 'author': 'Mike',
 'date': datetime.datetime(2024, 3, 5, 3, 13, 58, 390000),
 'tags': ['mongodb', 'python', 'pymongo'],
 'text': 'My first blog post!'}
{'_id': ObjectId('65e690ee03a55a7bf54133c7'),
 'author': 'Mike',
 'date': datetime.datetime(2009, 11, 12, 11, 14),
 'tags': ['bulk', 'insert'],
 'text': 'Another post!'}


In [13]:
# 文档计数
print(posts.count_documents({}))
print(posts.count_documents({"author":"Mike"}))

3
2


In [14]:
# 范围查询
d = datetime.datetime(2009, 11, 12, 12)
for post in posts.find({"date": {"$lt": d}}).sort("author"):
    pprint.pprint(post)

{'_id': ObjectId('65e690ee03a55a7bf54133c8'),
 'author': 'Eliot',
 'date': datetime.datetime(2009, 11, 10, 10, 45),
 'text': 'and pretty easy too!',
 'title': 'MongoDB is fun'}
{'_id': ObjectId('65e690ee03a55a7bf54133c7'),
 'author': 'Mike',
 'date': datetime.datetime(2009, 11, 12, 11, 14),
 'tags': ['bulk', 'insert'],
 'text': 'Another post!'}


In [15]:
# 创建唯一索引
result = db.profiles.create_index([("user_id",pymongo.ASCENDING)], unique=True)
sorted(list(db.profiles.index_information())) # 索引信息

['_id_', 'user_id_1']

In [16]:
# 插入文档数据
user_profiles = [{"user_id": 211, "name": "Luke"}, {"user_id": 212, "name": "Ziltoid"}]
result = db.profiles.insert_many(user_profiles)

In [17]:
# 插入重复文档测试
new_profile = {"user_id": 213, "name": "Drew"}
duplicate_profile = {"user_id": 212, "name": "Tommy"}
result = db.profiles.insert_one(new_profile)  # This is fine.
result = db.profiles.insert_one(duplicate_profile)

DuplicateKeyError: E11000 duplicate key error collection: test_database.profiles index: user_id_1 dup key: { user_id: 212 }, full error: {'index': 0, 'code': 11000, 'errmsg': 'E11000 duplicate key error collection: test_database.profiles index: user_id_1 dup key: { user_id: 212 }', 'keyPattern': {'user_id': 1}, 'keyValue': {'user_id': 212}}

In [24]:
from pymongo.errors import DuplicateKeyError
try:
    result = db.profiles.insert_one(duplicate_profile)
except DuplicateKeyError as e:
    print(e.details['errmsg'])

E11000 duplicate key error collection: test_database.profiles index: user_id_1 dup key: { user_id: 212 }
