In [None]:
import sys
!{sys.executable} -m pip install pymongo --user

In [None]:
from pymongo import MongoClient
from pymongo import ReplaceOne
from pymongo.errors import BulkWriteError
from pymongo.errors import OperationFailure
import pymongo
import uuid
import os
import random
import string
import time
import logging

def retry(func, *func_args, **kwargs):
    retry_count = kwargs.pop("retry_count", 20)
    delay = kwargs.pop("delay", 0.1)
    attempt = 0
    
    while (attempt <= retry_count):
        logging.debug('Attempt: %s', str(attempt))
        try:
            return func(*func_args, **kwargs)
        except OperationFailure as failure:
            if (attempt < retry_count and failure.code == 16500):
                logging.debug("waiting for %s seconds before retyring again", str(delay))
                time.sleep(delay) 
                attempt += 1
            else:
                raise

def flush(requests, target):
    startTime = time.perf_counter()

    result = retry(target.bulk_write,
        requests = requests,
        ordered = False,
        bypass_document_validation = True,
        retry_count = 20,
        delay = 0.05)

    endTime = time.perf_counter()
    duration =  endTime - startTime
    logging.info('Duration: %s', str(duration))
    
    logging.debug('StartTime: %s', str(startTime))
    logging.debug('EndTime: %s', str(endTime))
    logging.debug('Acknowledged: %s', str(result.acknowledged))

    if result.acknowledged:
        logging.debug('Deleted: %s', str(result.deleted_count))
        logging.debug('Inserted: %s', str(result.inserted_count))
        logging.debug('Matched: %s', str(result.matched_count))
        logging.debug('Modified: %s', str(result.modified_count))
        logging.debug('Upserted: %s', str(result.upserted_count))
    
    return result

def process(source, target, lastId):
    for doc in source.find({'id': {'$gt': lastId}}).sort('id', pymongo.ASCENDING):
        logging.debug('Id: %s', str(doc['id']))

        requests.append(ReplaceOne(
                filter = { 'id': doc['id'] },
                replacement = doc,
                upsert = True))

        if (len(requests) >= maxBatchSize):
            flush(requests, target)
            logging.info('Incremented Bookmark: %s', str(doc['id']))
            logging.info('Progress - copied records so far: %s', str(len(requests)))
            requests.clear()
            
            return doc['id']

    if (len(requests) > 0):
        flush(requests, target)
        logging.debug('Progress - copied %s records', str(len(requests)))
        requests.clear() 
        
        return ''

logging.getLogger().propagate = True
logging.getLogger().setLevel(logging.DEBUG)
#logging.getLogger().setLevel(logging.INFO)

uri = 'mongodb://' + \
    os.environ['DB_ACCOUNT_NAME'] + \
    ':' + \
    os.environ['COSMOS_KEY'] + \
    '@' + \
    os.environ['DB_ACCOUNT_NAME'] + \
    '.documents.azure.com:10255/?ssl=true&replicaSet=globaldb'

client = MongoClient(uri)
database = client.get_database(name='testdb')
source = database.get_collection(name='source')
target = database.get_collection(name='target')

maxBatchSize = 10
lastId = ''
requests = []
copiedCount = 0
    
while(True):
    lastId = retry(process, source, target, lastId, retry_count = 3, delay = 1.1)
    
    if (lastId == ''):
        break
        
print("Finished")