In [4]:
## import time
import pymongo
import pprint
from pymongo.errors import WriteError, DuplicateKeyError
from pymongo import MongoClient
from datetime import datetime
from datetime import timedelta
import os

In [5]:
client = MongoClient('mongodb://localhost:27017')
db = client.abaco

## The newest and greatest Mongo store

In [6]:
class testStore():
    def __init__(self):
        db = client.abaco
        test = db['3']
        self._db = test
        
    def __getitem__(self, fields):
        """
        Atomically does either:
        Gets and returns 'self[key]' or 'self[key][field1][field2][...]' as a dictionary
        """
        key, _, subscripts = self._process_inputs(fields)
        result = self._db.find_one(
            {'_id': key},
            projection={'_id': False})
        if result == None:
            raise KeyError(f"'_id' of '{key}' not found")
        try:
            return eval('result' + subscripts)
        except KeyError:
            raise KeyError(f"Subscript of {subscripts} does not exists in document of '_id' {key}")

    def __setitem__(self, fields, value):
        """
        Atomically does either:
        Sets 'self[key] = value' or sets 'self[key][field1][field2][...] = value'
        """
        key, dots, _ = self._process_inputs(fields)
        try:
            if isinstance(fields, str) and isinstance(value, dict):
                result = self._db.update_one(
                    filter={'_id': key},
                    update={'$set': value},
                    upsert=True)
            else:
                result = self._db.update_one(
                    filter={'_id': key},
                    update={'$set': {dots: value}},
                    upsert=True)
        except WriteError:
            raise WriteError(
                "Likely due to trying to set a subfield of a field that does not exists." +
                "\n Try setting a dict rather than a value. Ex. store['id_key', 'key', 'field'] = {'subfield': 'value'}")
        if result.raw_result['nModified'] == 0:
            if not 'upserted' in result.raw_result:
                logger.debug(f'Field not modified, old value likely the same as new. Key: {key}, Fields: {dots}, Value: {value}')

    def __delitem__(self, fields):
        """
        Atomically does either:
        Deletes 'self[key]'
        Unsets 'self[key][field1][field2][...]'
        """
        key, dots, subscripts = self._process_inputs(fields)
        if not subscripts:
            result = self._db.delete_one({'_id': key})
            if result.raw_result['n'] == 0:
                logger.debug(f"No document with '_id' found. Key:{key}, Fields:{dots}")
        else:
            result = self._db.update_one(
                filter={'_id': key},
                update={'$unset': {f'{dots}': ''}})
            if result.raw_result['nModified'] == 0:
                logger.debug(f"Doc with specified fields not found. Key:{key}, Fields:{dots}")

    def __iter__(self):
        for cursor in self._db.find():
            yield cursor['_id']
        # return self._db.scan_iter()

    def __len__(self):
        """
        Returns the estimated document count of a store to give length
        We don't use '.count_documents()' as it's O(N) versus O(1) of estimated
        Length for a document or subdocument comes from len(store['key']['field1'][...]) using dict len()
        """
        try:
            return self._db.estimated_document_count()
        except TypeError:
            return 0

    def __repr__(self):
        """
        Returns a pretty string of the entire store with '_id' visible for developer use
        """
        return pprint.pformat(list(self._db.find()))

    def _process_inputs(self, fields):
        """
        Takes in fields and returns the key corresponding with '_id', dot notation
        for getting to a specific field in a Mongo query/filter (ex. 'field1.field2.field3.field4')
        and the subscript notation for returning a specified field from a result dictionary
        (ex. `['field1']['field2']['field3']['field4']`)
        """
        if isinstance(fields, str):
            key = dots = fields
            subscripts = ''
        elif isinstance(fields, list) and len(fields) == 1:
            key = dots = fields[0]
            subscripts = ''
        else:
            key = fields[0]
            dots = '.'.join(fields[1:])
            subscripts = "['" + "']['".join(fields[1:]) + "']"
        return key, dots, subscripts

    def _prepset(self, value):
        if type(value) is bytes:
            return value.decode('utf-8')
        return value

    def pop_field(self, fields):
        """
        Atomically pops 'self[key] = value' or 'self[key][field1][field2][...] = value'
        """
        key, dots, subscripts = self._process_inputs(fields)
        if not subscripts:
            result = self._db.find_one(
                {'_id': key},
                projection={'_id': False})
            if result == None:
                raise KeyError(f"'_id' of '{key}' not found")
            del_result = self._db.delete_one({'_id': key})
            if del_result.raw_result['n'] == 0:
                raise KeyError(f"No document deleted")
            return result
        else:
            result = self._db.find_one_and_update(
                filter={'_id': key},
                update={'$unset': {dots: ''}})
            try:
                return eval('result' + subscripts)
            except KeyError:
                raise KeyError(f"Subscript of {subscripts} does not exist in document of '_id' {key}")

    def set_with_expiry(self, fields, value):
        """
        Atomically:
        Sets 'self[key] = value' or 'self[key][field1][field2][...] = value'
        Creates 'exp' subdocument in document root with current time for use with MongoDB TTL expiration index
        Note: MongoDB TTL checks every 60 secs to delete files
        """
        key, dots, _ = self._process_inputs(fields)
        if len(fields) == 1 and isinstance(value, dict):
            result = self._db.update_one(
                filter={'_id': key},
                update={'$set': {'exp': datetime.utcnow()},
                        '$set': value},
                upsert=True)
        else:
            result = self._db.update_one(
                filter={'_id': key},
                update={'$set': {'exp': datetime.utcnow(), dots: self._prepset(value)}},
                upsert=True)

    def full_update(self, key, value):
        result = self._db.update_one(key, value, upsert=True)
        return result

    def getset(self, fields, value):
        """
        Atomically does either:
        Sets 'self[key] = value' and returns previous 'self[key]'
        Sets 'self[key][field1][field2][...] = value' and returns previous 'self[key][field1][field2][...]'
        """
        key, dots, subscripts = self._process_inputs(fields)
        result = self._db.find_one_and_update(
            filter={'_id': key, dots: {'$exists': True}},
            update={'$set': {dots: value}})
        if result == None:
            raise KeyError(f"1Subscript of {subscripts} does not exist in document of '_id' {key}")   
        try:
            if len(fields) == 1:
                return eval(f"result['{key}']")
            else:
                return eval('result' + subscripts)
        except KeyError:
            raise KeyError(f"Subscript of {subscripts} does not exist in document of '_id' {key}")

    def items(self, filter_inp=None, proj_inp={'_id': False}):
        " Either returns all with no inputs, or filters when given filters"
        return list(self._db.find(
            filter=filter_inp,
            projection=proj_inp))

    def add_if_empty(self, fields, value):
        """
        Atomically:
        Sets 'self[key] = value' or 'self[key][field1][field2][...] = value'
        Only if the specified key/field(s) combo does not exist or is empty
        Returns the value if it was added; otherwise, returns None
        Note: Will not override a field set to a value in order to create a subfield
        """
        key, dots, _ = self._process_inputs(fields)
        try:
            if len(fields) == 1 and isinstance(value, dict):
                result = self._db.update_one(
                    filter={'_id': key},
                    update={'$setOnInsert': value},
                    upsert=True)
                if result.upserted_id:
                    return key
            elif len(fields) == 1:
                result = self._db.update_one(
                    filter={'_id': key},
                    update={'$setOnInsert': {dots: value}},
                    upsert=True)
                if result.upserted_id:
                    return key
            else:
                try:
                    result = self._db.update_one(
                        filter={'_id': key},
                        update={'$setOnInsert': {dots: value}},
                        upsert=True)
                    if result.upserted_id:
                        return fields
                except WriteError:
                    print("Likely due to trying to set a subfield of a field that is already set to one value")
                    pass
            return None
        except DuplicateKeyError:
            return None
        
    def _delete_based_on_query(self, filter_inp):
        self._db.delete_many(filter = filter_inp)

test2 = testStore()

In [7]:
test2

[{'DEV-DEVELOP_AJjrM07NgeqrK': {'A6xyGZDWXxJN5': {'actor_id': 'DEV-DEVELOP_AJjrM07NgeqrK',
                                                  'api_server': 'https://dev.tenants.aloedev.tacc.cloud',
                                                  'cpu': 435491032,
                                                  'executor': 'testuser',
                                                  'exit_code': 0,
                                                  'final_state': {'Dead': False,
                                                                  'Error': '',
                                                                  'ExitCode': 0,
                                                                  'FinishedAt': '2020-04-30T14:00:24.8592758Z',
                                                                  'OOMKilled': False,
                                                                  'Paused': False,
                                                                  'Pid': 

In [14]:
cat = {'dog':'2'}

In [16]:
if cat.get('dogd'):
    print('hey')

In [13]:
if 'final_state' in None:
    print('he')

TypeError: argument of type 'NoneType' is not iterable

In [11]:
test2['DEV-DEVELOP_BeJe6eZJ1OpLK_jzwbRzkDMrK0w']['final_state']['StartedAt']

datetime.datetime(2020, 4, 29, 16, 0, 9, 81000)

In [42]:
import pytz

In [43]:
AUS_time = pytz.timezone("America/Chicago")

In [52]:
datetime.utcnow()

datetime.datetime(2020, 4, 28, 14, 3, 31, 971116)

In [32]:
time.time()

datetime.time(13, 35, 1, 524000)

In [38]:
assert type(time) == datetime

In [58]:
time3 < datetime.utcnow()

True

In [62]:
time3 = test2['DEV-DEVELOP_ak8VX8xAkJMPl_XPBQz6QAybvD', 'message_received_time']
print(time3)
time3 = time3 + timedelta(seconds=5)
print(time3)

KeyError: "'_id' of 'DEV-DEVELOP_ak8VX8xAkJMPl_XPBQz6QAybvD' not found"

In [63]:
time3.isoformat()

'2020-04-28T13:34:43.524000'

In [57]:
list(db['3'].find({}))

[{'_id': 'DEV-DEVELOP_ak8VX8xAkJMPl_XPBQz6QAybvD',
  'actor_id': 'DEV-DEVELOP_ak8VX8xAkJMPl',
  'api_server': 'https://dev.tenants.aloedev.tacc.cloud',
  'cpu': 0,
  'executor': 'testuser',
  'exit_code': None,
  'final_state': None,
  'id': 'XPBQz6QAybvD',
  'io': 0,
  'message_received_time': datetime.datetime(2020, 4, 28, 13, 34, 38, 524000),
  'runtime': 0,
  'start_time': None,
  'status': 'RUNNING',
  'tenant': 'DEV-DEVELOP',
  'worker_id': 'KKkJZgyYo08Nw'}]

## NEW STUFF

In [24]:
test2.full_update({'_id': 'stats'},
                  {'$inc': {'actor_total': 1}})

<pymongo.results.UpdateResult at 0x7fa532b83f08>

In [15]:
car = {'dog':1, 'dsad':'2'}

In [13]:
if '1' in car:
    print('hey')

In [8]:
car = {'hey':{'1':'1'}, 'hey2':{'2':'2'}}

In [11]:
car.popitem()

('hey2', {'2': '2'})

In [12]:
car = [{'1':'1'}, {'2':'2'}]

In [17]:
if not ['dsa']:
    print('hey')

In [31]:
for dd in car.values():
    print(dd)

{'1': '1'}
{'2': '2'}


In [22]:
test2['DEV-DEVELOP_1YYx66mz4NNrm_jmm6PXAqB1Mpw']

KeyError: "'_id' of 'DEV-DEVELOP_1YYx66mz4NNrm_jmm6PXAqB1Mpw' not found"

In [23]:
car.add({'hey':'3'})

AttributeError: 'dict' object has no attribute 'add'

In [20]:
if 'dog' in car:
    try:
        car['dsdddd']
    except KeyError:
        contimu
    print('1')
    
print('2')

SyntaxError: 'break' outside loop (<ipython-input-20-7f1cecc07a05>, line 8)

In [7]:
test2.items()

[{'actor_id': 'DEV-DEVELOP_4lx804R0GJyb',
  'id': '7KPGM6E0PjGY',
  'status': 'READY',
  'tenant': 'DEV-DEVELOP',
  'ch_name': 'worker_7KPGM6E0PjGY',
  'cid': '911dd33b4cb8f6fdf361a72439692d71accaf18953fb7fdabbb0b74eb51dbfc9',
  'create_time': '1586185087.076951',
  'host_id': '0',
  'host_ip': '172.17.0.1',
  'image': 'jstubbs/abaco_test',
  'last_execution_time': 0,
  'last_health_check_time': '1586185087.073098',
  'location': 'unix://var/run/docker.sock'},
 {'actor_id': 'DEV-DEVELOP_amA61zqaa58z',
  'id': '80x7NMy5oBQm',
  'status': 'READY',
  'tenant': 'DEV-DEVELOP',
  'ch_name': 'worker_80x7NMy5oBQm',
  'cid': 'edc7c8a2fda01d2f99b7b4c2b6dbd713e9c7d6aa93fc7c11968a9300fe86f499',
  'create_time': '1586185089.174073',
  'host_id': '0',
  'host_ip': '172.17.0.1',
  'image': 'jstubbs/abaco_test',
  'last_execution_time': 0,
  'last_health_check_time': '1586185089.169984',
  'location': 'unix://var/run/docker.sock'},
 {'actor_id': 'DEV-DEVELOP_jbEoD5DjywRK',
  'id': 'Ab8NWaoYeGRr',
  's

In [21]:
test2['stats', 'actor_dbids']

['heyitme', 'dsadsa', 'fasdfskdf']

In [None]:
test2.add_if_empty([''])

In [12]:
    for worker in test2.items(proj_inp=None):
        pprint.pprint(worker)
        actor_id = worker['actor_id']
        w = Worker(**worker)
        actor_display_id = Actor.get_display_id(worker.get('tenant'), actor_id)
        w.update({'actor_id': actor_display_id})
        w.update({'actor_dbid': actor_id})
        # convert additional fields to case, as needed
        logger.debug(f"worker before case conversion: {w}")
        last_execution_time_str = w.pop('last_execution_time')
        last_health_check_time_str = w.pop('last_health_check_time')
        create_time_str = w.pop('create_time')
        w['last_execution_time'] = display_time(last_execution_time_str)
        w['last_health_check_time'] = display_time(last_health_check_time_str)
        w['create_time'] = display_time(create_time_str)
        if case == 'camel':
            w = dict_to_camel(w)
        workers_result.append(w)
        summary['total_workers'] += 1
        if worker.get('status') == codes.REQUESTED:
            summary['requested_workers'] += 1
        elif worker.get('status') == codes.READY:
            summary['ready_workers'] += 1
        elif worker.get('status') == codes.ERROR:
            summary['error_workers'] += 1
        elif worker.get('status') == codes.BUSY:
            summary['busy_workers'] += 1
    logger.info("workers retrieved.")
    if case == 'camel':
        summary = dict_to_camel(summary)
    result = {'summary': summary,
              'workers': workers_result}

    
    
    break

{'_id': 'DEV-DEVELOP_BxKzr8Q08Wgqj_b8w33p8RXWea6',
 'actor_id': 'DEV-DEVELOP_BxKzr8Q08Wgqj',
 'ch_name': 'worker_b8w33p8RXWea6',
 'cid': '7039f9ca82b3edf4b0581a6263e748986481455a6285bad102e14581d2a97622',
 'create_time': '1585863175.328464',
 'host_id': '0',
 'host_ip': '172.17.0.1',
 'id': 'b8w33p8RXWea6',
 'image': 'notchristiangarcia/flops_test',
 'last_execution_time': '1585863177.824644',
 'last_health_check_time': '1585863175.325031',
 'location': 'unix://var/run/docker.sock',
 'status': 'READY',
 'tenant': 'DEV-DEVELOP'}
DEV-DEVELOP_BxKzr8Q08Wgqj
DEV-DEVELOP_BxKzr8Q08Wgqj
<class 'str'>


AttributeError: 'str' object has no attribute 'decode'

In [36]:
len(test2)

0

In [29]:
listofaid = set()
listofaid.add('dsadsa')

In [21]:
listofaid = listofaid + ['dsadsa'] +['dsajdsadjsa']

In [27]:
listofaid

{'dsadsa'}

In [22]:
test2.delete_based_on_query({'tenant': 'DEV-DEVELOP'})

In [7]:
for execution in test2.items({'actor_id': 'dsajdija'}):
    print(execution['id'])

In [9]:
test2['DEV-DEVELOP_K1eBp0xe5Lwox_AAzkDGWrWoWd7G']

KeyError: "'_id' of 'DEV-DEVELOP_K1eBp0xe5Lwox_AAzkDGWrWoWd7G' not found"

In [14]:
test2.pop_field(['DEV-DEVELOP_8RKPbK5ZzQm76'])

{'jMyZE8rl3xPG5': {'status': 'SHUTDOWN_REQUESTED',
  'id': 'jMyZE8rl3xPG5',
  'tenant': 'DEV-DEVELOP'},
 'bbO5APjJz5rKA': {'status': 'SHUTDOWN_REQUESTED',
  'tenant': 'DEV-DEVELOP',
  'id': 'bbO5APjJz5rKA'}}

# Archive

## Destroy Mongo

In [4]:
import multiprocessing

In [84]:
def threaded_thing(nonce_store):
    print('hey')
    res = nonce_store.update_one(
        {'_id': nonce_key, nonce_id + '.remaining_uses': {'$eq': -1}},
        {'$inc': {nonce_id + '.current_uses': 1},
        '$set': {nonce_id + '.last_use_time': 'hey'}})
    if res.raw_result['updatedExisting'] == True:
        logger.debug("nonce has infinite uses. updating nonce.")
        return 'hey!'

    # Check for remaining uses greater than 0
    res = nonce_store.update_one(
        {'_id': nonce_key, nonce_id + '.remaining_uses': {'$gt': 0}},
        {'$inc': {nonce_id + '.current_uses': 1,
                nonce_id + '.remaining_uses': -1},
        '$set': {nonce_id + '.last_use_time': 'hey'}})

In [6]:
nonce_key = 'DEV-DEVELOP_jane'
nonce_id = 'DEV-DEVELOP_BPbPk0xvz0Llj'
print('hey')
res = nonce_store.update_one(
    {'_id': nonce_key, nonce_id + '.remaining_uses': {'$eq': -1}},
    {'$inc': {nonce_id + '.current_uses': 1},
    '$set': {nonce_id + '.last_use_time': 'hey'}})

hey


In [14]:
nonce_store.update_one(
    {'_id':'did'},
    {'$set':{'stopper':'dopper'}},
    upsert=True)

<pymongo.results.UpdateResult at 0x10a6adaa0>

In [24]:
list(actors_store.find({}))

[{'_id': 'bop', 'hey': 'there'}, {'_id': 'dop', 'hey': 'there'}]

In [23]:
actors_store.update_one(
    {'_id': 'dop'},
    {'$set':{'hey':'there'}},
    upsert=True)

<pymongo.results.UpdateResult at 0x10a775050>

In [11]:
def newBoy(num):
    print(num)
    return(num)

In [None]:
pool = multiprocessing.Pool(processes=4)
hey = pool.map(newBoy, range(128))
pool.join()
pool.close()

In [176]:
nonce_store.update_one(
    {'_id': 'DEV-DEVELOP_jane'},
    {'$set': {'DEV-DEVELOP_KvVArG21Y1GJ': {'cat':2}}})

<pymongo.results.UpdateResult at 0x107329cd0>

## Mongo Store

In [571]:
#### Things to implement:
## getset needs to be fixed for non-maze
## add_if_empty. These two just need to be checked. They work, but errors when you're creating new documents with not yet made fields.

### Store class only:
## set_with_expiry
## insert_one?
## set_with_expiry


### maze
## __setitem__
## __getitem__
## __delitem__
## __len__
## _prepset
## pop_field
## getset
## add_if_empty
## set_with_expiry


###### Need to finish
## insert_one?
## __iter__


class testStore():
    def __init__(self):
        db = client.abaco
        test = db['5']
        self._db = test
        
    def __getitem__(self, key):
        return mongoMaze(self._db, key, [])
        
    def __setitem__(self, key, value):
        self._db.update_one(
            filter={'_id': key},
            update={'$set': {key: value}})
        
    def __delitem__(self, key):
        self._db.delete_one({'_id': key})
    
    def __len__(self):
        return self._db.estimated_document_count()
    
    def __repr__(self):
        return pprint.pformat(list(self._db.find(projection={'_id': False})))
    
    def set_with_expiry(self, key, field, value):
        self._db.update_one(
            filter={'_id': key},
            update={'$set': {'exp': datetime.utcnow(), field: self._prepset(value)}},
            upsert=True)
    
    def _prepset(self, value):
        if type(value) is bytes:
            return value.decode('utf-8')
        return value


# MongoCrawl?
class mongoMaze():
    def __init__(self, db, key, field):
        self._db = db
        self.key = key
        self.field = field
        
        _id = key
        fields -> doc1.subdoc2.subsubdoc3
        
    def __getitem__(self, field):
        self.field.append(field)
        return mongoMaze(self._db, self.key, self.field)
    
    def __setitem__(self, field, value):
        search = self._get_search(field)
        self._db.update_one(
            filter={'_id': self.key},
            update={'$set': {search: value}},
            upsert=True)
        
    def __delitem__(self, field):
        search = self._get_search(field)
        self._db.update_one(
            filter={'_id': self.key},
            update={'$unset': {f'{search}': ''}})
        
    def __len__(self):
        result = self._db.find_one(
            {'_id': self.key},
            projection={'_id': False})
        return len(eval('result' + self._get_index()))
        
    def __repr__(self):
        result = self._db.find_one(
            {'_id': self.key},
            projection={'_id': False})
        return pprint.pformat(eval('result' + self._get_index()))
    
    def pop_field(self, field):
        search = self._get_search(field)
        result = self._db.find_one_and_update(
            filter={'_id': self.key},
            update={'$unset': {f'{search}': ''}})
        return pprint.pformat(eval('result' + self._get_index()))
    
    def getset(self, value):
        search = self._get_search()
        result = self._db.find_one_and_update(
            filter={'_id': self.key},
            update={'$set': {search: value}})
        print(self._get_index())
        return eval('result' + self._get_index())
    
    def add_if_empty(self, value):
        search = self._get_search()
        res = self._db.update_one(
            {'_id': self.key},
            {'$setOnInsert': {search: value}},
            upsert=True)
        print(self.key, search, value, res.raw_result)

        if res.upserted_id:
            return field
        else:
            return None
    
    def _get_search(self, field=None):
        if field:
            self.field.append(field)

        if self.field:
            search = '.'.join(self.field)
        else:
            search = self.key
        return search
            
    def _get_index(self):
        if self.field:
            index = "['" + "']['".join(self.field) + "']"
        else:
            index = ''
        return index
    
    def _prepset(self, value):
        if type(value) is bytes:
            return value.decode('utf-8')
        return value
    
testme = testStore()
#This feels wrong, thus it's discontinued. Also I don't know how to fix it.

In [115]:
testme

{'1': {'2': {'newSubDoc': 'a newer hello'}}}

In [110]:
testme = {'1': {'2': {'3': {'4': 'hello.'}}}}

In [111]:
testme

{'1': {'2': {'3': {'4': 'hello.'}}}}

In [112]:
testme['1']['2']['3']['4'] = hello

'hello.'

In [113]:
testme['1']['2'] = {'newSubDoc': 'a newer hello'}

In [114]:
testme

{'1': {'2': {'newSubDoc': 'a newer hello'}}}