Cloud Firestore, or "Firestore", is a flexible, scalable NoSQL cloud database, originally developed by Firebase but has been acquired by Google and evolved into its flagship offering to compete against AWS DynamoDB and Azure Cosmos DB.
Due to this history, I find Cloud Firestore's official documentations to be scattered across multiple Firebase/Google websites, and not always available in my favorite Python language:
- Firebase - Cloud Firestore Documentation
- Google Cloud - How-to guides
- GoogleAPIs.dev - Python Client for Google Cloud Firestore
To help myself and other Pythoneers, I have tested these common database operations and summarized as follows.
- Register a free Google account (if you don't have one yet)
- Log into Firebase console
- Go to "Project Overview" > (gear button) > "Project settings" > "Service accounts"
- Switch "Admin SDK configuration snippet" language to "Python", then hit "Generate new private key"
- Save the generated JSON file to a secure location
Source code (also provided in firestore_conn.py
file):
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore
from datetime import datetime
# Connect to Firestore with service account
print(f'>>> {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Started connecting to Firestore database...')
firestore_key = r'your_service_account_file.json' # Prod
cred = credentials.Certificate(firestore_key)
app = firebase_admin.initialize_app(cred)
db = firestore.client(app=app)
print(f'<<< {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Firestore database connected \n---')
try:
# Firestore operations, such as
doc_ref = db.collection('coll_id').document('doc_id')
except Exception as e_read_write:
print(f'??? {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Exception in doing...:', e_read_write)
else:
print(f'<<< {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Finished doing...\n---')
The table below lists out the common operations of Firestore database. Since GitHub Markdown doesn't render the HTML table satisfactorily, I have also attached a picture of the table.
Note: These operations work on a small number of documents/collections (in the range of hundreds, when the operation can finish in less than 60 seconds). For operations on a larger set, use paginated operation demonstrated by my repo Firestore2CSV.
str_proj_name = db.project
all_colls_in_db = db.collections()
docs_from_different_colls = db.get_all(doc_list)
coll_group = db.collection_group('coll_id')
str_coll_id = coll_ref.id
doc_ref_parent = sub_coll_ref.parent
coll_ref.on_snapshot(callback)
str_doc_id = doc_ref.id
str_doc_path = doc_ref.path
coll_ref_parent = doc_ref.parent
Operation | Subtype | On Collections | On Documents |
---|---|---|---|
Referencing | One-level | coll_ref = db.collection('coll_id')
|
doc_ref = db.collection('coll_id').document('doc_id')
doc_ref = db.document('coll_id/doc_id')
doc_ref = db.document('coll_id', 'doc_id')
|
Multi-level | sub_coll_ref = db.collection('coll_id').document('doc_id).collection('sub_coll_id')
sub_coll_ref = db.collection('coll_id/doc_id/sub_coll_id')
sub_coll_ref = db.collection('coll_id', 'doc_id', 'sub_coll_id')
|
doc_ref = sub_coll_ref.document('sub_doc_id')
doc_ref = db.document('coll_id/doc_id/sub_coll_id/sub_doc_id')
doc_ref = db.document('coll_id', 'doc_id', 'sub_coll_id', 'sub_doc_id')
doc_ref = doc_snapshot.reference
|
|
Read | – | doc_list = coll_ref.get()
if len(doc_list) > 0:
docs = coll_ref.stream() # docs is iterable
for doc_snapshot in docs:
print(doc_snapshot.to_dict())
#
doc_list = coll_ref.get()
if len(doc_list) > 0:
doc_refs = coll_ref.list_documents() # docs_refs is iterable
for doc_ref in doc_refs:
print(doc_ref.get().to_dict())
|
doc_snapshot = doc_ref.get()
if doc_snapshot.exists:
print(doc_snapshot.to_dict())
|
Query | Simple where (row-wise) |
qry = coll_ref.select(fieldpath).where('col', '==', 'val') # fieldpath is like ['col1', 'col2', …]
doc_list = qry.get()
if len(doc_list) > 0:
docs = qry.stream()
…
|
– |
Compound where |
qry = coll_ref.select(fieldpath).where('col', '==', 'val').where()… # Can only query on single key if no indexing manually set
|
– | |
Order & limit | qry = coll_ref.where('col', '==', 'val')
.order_by('col', direction=firestore.Query.ASCENDING|DESCENDING)
.start_at|end_at|start_after|end_before(cursor)
.offset(num_to_skip)
.limit|limit_to_last(count)
|
– | |
Create | # collection automatically created once a document is created in it
coll_ref.add(data_dict, document_id=None) # Add with auto/given doc id
|
doc_ref = coll_ref.document('doc_id')
doc_ref.create(data_dict) # Create given doc id. Will fail if existing
|
|
Update | # collection automatically created once a document is created in it
coll_ref.add(data_dict, document_id=None) # Add with auto/given doc id
|
doc_ref.set(data_dict, merge=False|True) # merge defaults to False. Will create doc if nonexistent
doc_ref.update(data_dict) # Will create fields if nonexistent, but error out if no document
#
doc_ref.update({'object.attribute': 'new_value'}) # Field path (in dot notation) for updating nested object
doc_ref.update({'array': firestore.ArrayUnion|ArrayRemove([])}) # Add/remove array items
doc_ref.update({'number': firestore.Increment(n)}) # Increment number
|
|
Delete | # collection automatically deleted once all documents in it are deleted
|
doc_ref.update({'field_to_delete': firestore.DELETE_FIELD}) # Delete a field
doc_ref.delete() # Delete the whole document
|
|
Atomic Operations | Batched writes | – | my_batch = db.batch()
my_batch.set(doc_ref, data_dict)
my_batch.update(doc_ref, data_dict)
my_batch.delete(doc_ref)
my_batch.commit() # Up to 20 document access calls before you have to commit
|
Transactions | – | @firestore.transactional
def run_transaction(transaction, doc_ref):
doc_dict = doc_ref.get(transaction=transaction).to_dict()
# Do some read that must happen before write
my_transaction.update(doc_ref, data_dict)
#
my_transaction = db.transaction()
run_transaction(my_transaction, doc_ref)
|