In [1]:
import contextlib
import os

import pandas as pd

from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
import functools
from bson.objectid import ObjectId
from datetime import datetime
import socket
import time


class task_locker:
    def __init__(self, url, version):
        self.url = url
        self.client = MongoClient(url)
        self.task = self.client['task'].task
        self.task.create_index([('_version', 1), ('_task_id', 1)], unique=True);
        self.version = version

    def register_lock(self, _task_id, **kwargs):
        try:

            print('begin insert')
            lock_id = self.task.insert({
                "_version": self.version,
                "_task_id": _task_id,
                'server_ip': socket.gethostname(),
                'ct': datetime.now(),
                **kwargs})
            return str(lock_id)
        except Exception as e:
            if isinstance(e, DuplicateKeyError):
                print(f'Already has same taskid:{self.version},{_task_id}')
            else:
                raise e
            return False

    def update_lock(self, lock_id, result):
        #         res = self.task.find_one({'_id':ObjectId(lock_id)})
        #         begin = res.get('ct')
        #         time.sleep(3)
        #         end = datetime.now()
        #         duration = end-begin
        #         print('duration=',duration)

        self.task.update_one({'_id': ObjectId(lock_id)},
                             {'$set': {'result': result, },
                              "$currentDate": {"mt": True}
                              },
                             )

    def lock(self, max_time=-1):
        locker = self

        def decorator(f):
            @functools.wraps(f)
            def wrapper(*args, **kwargs):

                job_paras = {'fn_name': f.__name__,
                             'args': args,
                             'kwargs': kwargs}
                print('job_paras', job_paras)
                print(f"{f.__name__}({args},{kwargs})")
                lock_id = locker.register_lock(_task_id=str(job_paras), **job_paras, )
                if lock_id:
                    print(f'create lock success for {f.__name__} with:{lock_id}')
                    res = f(*args, **kwargs)
                    locker.update_lock(lock_id, res)
                    return res
                else:
                    exist_lock = locker.task.find_one({"_version": self.version,
                                                       '_task_id': str(job_paras)})
                    raise Warning(f'Already had lock#{exist_lock}')

            return wrapper

        return decorator

    @contextlib.contextmanager
    def lock_block(self, task_id='Default_block', **job_paras):
        import sys
        lock_id = self.register_lock(_task_id=str(task_id), **job_paras, )
        if lock_id:
            print(f'create lock success for block#{task_id} with:{lock_id}')
            yield
            self.update_lock(lock_id, result=None)
        else:
            exist_lock = self.task.find_one({"_version": self.version,
                                               '_task_id': str(job_paras)})
            raise Warning(f'Already had lock#{exist_lock}')


    def remove_version(self):
        self.task.remove({'_version' : self.version})

locker = task_locker('mongodb://sample:password@mongo:27017/db?authSource=admin', version='v1')


 

locker.remove_version()


@locker.lock()
def testabc(a, b, c=4, d=5):
    # paras = locals()
    return 'done'

testabc(1, d=3, b=3, c=24)


with locker.lock_block('task_2', abc='def'):
    print('test')


job_paras {'fn_name': 'testabc', 'args': (1,), 'kwargs': {'d': 3, 'b': 3, 'c': 24}}
testabc((1,),{'d': 3, 'b': 3, 'c': 24})
begin insert
create lock success for testabc with:5df49b2c1b263fc4f68b081c
begin insert
create lock success for block#task_2 with:5df49b2c1b263fc4f68b081d
test




In [2]:
def get_db():
    from pymongo import MongoClient
    client = MongoClient('mongodb://sample:password@mongo:27017/db?authSource=admin')
    db = client.myFirstMB
    return db

def add_country(db):
    res = db.countries.insert({"version" : "03", "task_id": "123"})
    print('raw',res)
    print(dir(res))
    return res#res.inserted_id
    
def get_country(db):
    return db.countries.find_one()



db = get_db() 
res = add_country(db)
#print (get_country(db))
# print(type(res), res)
print(res, type(res))
res = db.countries.update_one({'_id':res}, 
                                 {'$set':{'version':'bb', 'xx':1234000}},
                                 upsert=False, )

print('update res', res.raw_result)


for row in db.countries.find():
    print(row)

raw 5df49b2c1b263fc4f68b081f
['_ObjectId__generate', '_ObjectId__id', '_ObjectId__random', '_ObjectId__validate', '__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', '_inc', '_inc_lock', '_pid', '_random', '_type_marker', 'binary', 'from_datetime', 'generation_time', 'is_valid']
5df49b2c1b263fc4f68b081f <class 'bson.objectid.ObjectId'>
update res {'n': 1, 'nModified': 1, 'ok': 1.0, 'updatedExisting': True}
{'_id': ObjectId('5df49b056acac052ab6116d5'), 'version': 'bb', 'task_id': '123', 'xx': 1234000}
{'_id': ObjectId('5df49b18bfe0d03687fa19a3'), 'version': 'bb', 'task_id': '123', 'xx': 1234000}
{'_id': ObjectId('5df49b2c1b263fc4f68b081f'), 'version': 'bb', 'task_id': '123', 'xx': 1

  


In [3]:
db.countries.find_one({'version': '01', 'task_id': '123'})

In [4]:
db.countries.find_one({'id':'5df2f93e43cc4cf8b1fcd6c2'})

In [5]:
from pymongo.errors import DuplicateKeyError
try:
    t_id = db.task.insert_one({"version" : "01", "task_id": "124"})
    print(t_id)
except Exception as e :
    if isinstance(e, DuplicateKeyError):
        print('Already has same taskid')
    else:
        raise e
 

str(t_id.inserted_id)

<pymongo.results.InsertOneResult object at 0x11c75bb88>


'5df49b2c1b263fc4f68b0820'

In [6]:
def test(a, b ,c, d=5):
    paras = locals()
    print(type(paras))
    print(str(paras))
test(b=1,a=2,d=3, c=5)

<class 'dict'>
{'a': 2, 'b': 1, 'c': 5, 'd': 3}


In [7]:
from pymongo import MongoClient
client = MongoClient('mongodb://sample:password@mongo:27017/db?authSource=admin')
#db = client.myFirstMB
#db.collection.createIndex( { <field1>: <type>, <field2>: <type2>, ... } )
db = client['task']

db.task.remove({})

  import sys


{'n': 4, 'ok': 1.0}

In [8]:
with locker.lock_block('test3'):
    pass

begin insert
create lock success for block#test3 with:5df49b2c1b263fc4f68b0822




In [9]:
from bson.objectid import ObjectId
from pymongo import MongoClient
client = MongoClient('mongodb://sample:password@mongo:27017/db?authSource=admin')
task1 = client['task'].task
res = task1.find_one({'_id': ObjectId('5df494bd6acac052ab6116c9')})
res

In [10]:
task1.remove({'_version':'v1'})

  """Entry point for launching an IPython kernel.


{'n': 1, 'ok': 1.0}