### Transaction 

#### 1) Basic configuration

In [1]:
from pymongo import MongoClient
import datetime


#get the password from my local driver.
import json
with open("C:\data\key.json") as f:
    password = json.loads(f.read())

username = "migo"
DBname = "test"



cluster_connection = f"mongodb+srv://{username}:{password['mongo']}@cluster0.mqzcx.mongodb.net/{DBname}?retryWrites=true&w=majority"
#+srv is the sign of cloud server. 

client = MongoClient(cluster_connection,tz_aware=True)

In [18]:
#specific configuration for transaction
from pymongo import WriteConcern, read_concern, ReadPreference

from pymongo.errors import ConnectionFailure,OperationFailure

In [5]:

my_wc_majority = WriteConcern('majority',wtimeout=1000)
db = client.test

#you must have an existing collection. 
#plus, CRUD operations in transaction must be on existing collections.

db.create_collection("orders")
db.create_collection("inventory")


Collection(Database(MongoClient(host=['cluster0-shard-00-00.mqzcx.mongodb.net:27017', 'cluster0-shard-00-01.mqzcx.mongodb.net:27017', 'cluster0-shard-00-02.mqzcx.mongodb.net:27017'], document_class=dict, tz_aware=True, connect=True, retrywrites=True, w='majority', authsource='admin', replicaset='atlas-1048e3-shard-0', ssl=True), 'test'), 'inventory')

In [8]:
db.list_collection_names()

['orders', 'inventory']

In [9]:
client.get_database(DBname,write_concern=my_wc_majority).orders.insert_one({"sku":"abc123","qty":0})
client.get_database(DBname,write_concern=my_wc_majority).inventory.insert_one({"sku":"abc123","qty":1000})


<pymongo.results.InsertOneResult at 0x22afed16200>

---



In [26]:

### Step1

        #Define the operations and their sequence within the transaction

def update_orders_and_inventory(session):                    #session?    < - with client.start_session() as my_session
    orders = session.client.test.orders
    inventory = session.client.test.inventory
    
    with session.start_transaction(                          #the start of session. 
        read_concern = read_concern.ReadConcern("snapshot"), #1
        write_concern = WriteConcern(w="majority"),          #2
        read_preference = ReadPreference.PRIMARY):           #3
        
        orders.insert_one({"sku":"abc123","qty":100},session=my_session)
        inventory.update_one({"sku":"abc123","qty":{"$gte":100}},{"$inc":{"qty":-100}},session=my_session)
        
        commit_with_retry(my_session)
        
    
#1 read_concern.ReadConcern
"""
The read concern level specifies the level of
isolation for read operations.  For example, a read operation using a
read concern level of ``majority`` will only return data that has been
written to a majority of nodes. 
"""

#2 WriteConcern

"""
Starting in MongoDB 5.0, the implicit default write concern is w: majority. 
The query returns the data that has been acknowledged by a majority of the replica set members. 
The documents returned by the read operation are durable, even in the event of failure.
"""

#3 ReadPreference.PRIMARY
"""
PRIMARY: Read from the primary. This is the default, and provides the
strongest consistency. If no primary is available, raise
-> `~pymongo.errors.AutoReconnect`
"""

In [20]:
### Step2

        #Attempt to run and commit transaction with retry logic
    
def commit_with_retry(session):
    while True:
        try:
            #commit uses "write concern set" at transaction start.
            session.commit_transaction()
            print("Transaction committed.")
            #if succeful, break out.
            break
        except (ConnectionFailure, OperationFailure) as exc:
            #Can retry commit
            if exc.has_error_label("UnknownTransactionCommitResult"):
                print("UnknownTransactionCommitResult, retrying commit operation...")
                continue
            else:
                print("Error during commit")
                raise
                

In [22]:
### Step3

        #Attempt with retry logic to run the transaction function - "txn_func"

def run_transaction_with_retry(txn_func,session):  #transaction function + session created by "with client.start_session() as session"
    while True:
        try:
            txn_func(session) #performs transaction
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # if transient error, retry the whole transaction
            if exc.has_error_label("TransientTransactionError"):
                print("TransientTransactionError, retrying transaction...")
                continue
            else:
                raise
    

In [27]:
### Step4

        #Start a session

with client.start_session() as my_session:
    
#Step5
        #Call the function "run_transaction_with_retry" passing it the function to call "update_orders_and_inventory" and the session "my_session" to associate with this transaction
    try:
        run_transaction_with_retry(update_orders_and_inventory,my_session)
    except Exception as exc:
        raise
    

Transaction committed.
