In [109]:
import sqlite3
import cerealizer

WHY NOT JUST AST INSTEAD OF CEREALIZER?

class DiskDict:
    '''
    Holds a dictionary in disk using the serializing package "cerealized"
        Keys must be text.
        Values must support cerealization.
    '''
    
    def __init__(self, file):
        self.connection = sqlite3.connect(file)
        self.connection.execute('''
            CREATE TABLE IF NOT EXISTS objects(
                key TEXT NOT NULL PRIMARY KEY,
                cereal TEXT NOT NULL
            )
        ''')
        self.connection.execute('''
            CREATE TABLE IF NOT EXISTS indexed(
                key TEXT NOT NULL
            )
        ''')
    
    def __iter__(self):
        q = (f'SELECT cereal FROM objects',)
        yield from map(self.loads, self._column(*q))
    
    def __contains__(self, key):
        q = (f'SELECT key FROM objects WHERE key=?', (key,))
        yield any(k==key for k in self._column(*q))
    
    def __getitem__(self, key):
        q = (f'SELECT cereal FROM objects WHERE key=?', (key,))
        try:
            return next(map(self.loads, self._column(*q)))
        except StopIteration:
            pass
        raise KeyError(key)
    
    def __setitem__(self, key, value):
        q = (f'REPLACE INTO objects VALUES (?,?)', (key, self.dumps(value)))
        self.connection.execute(*q)
        self.connection.commit()
    
    def get(self, key, default=None):
        q = (f'SELECT cereal FROM objects WHERE key=?', (key,))
        return next(map(self.loads, self._column(*q)), default)
    
    def keys(self):
        return list(self)
    
    def _column(self, *q):
        it = (t for t,*_ in self.connection.execute(*q))
        yield from it
    
    def __iter__(self):
        q = (f'SELECT key FROM objects',)
        yield from self._column(*q)
    
    def values(self):
        q = (f'SELECT cereal FROM objects',)
        return list(map(self.loads, self._column(*q)))
    
    def where(self, params):
        '''
        Usage:
            where((column, '==', value))
            where((column, '<=', value))
            where(((('n', '==', 3), 'and', ('k', '>', 4)), 'or' ('name', '!=', 'carlos')))
        '''
        indices = self.indices()
        values = []
        def parse(column, eq, value):
            if eq.lower() in ('and', 'or'):
                return f'({parse(*column)} {eq} {parse(*value)})'
            else:
                assert eq in ('==', '<','<=', '>', '>='), f'Invalid operator: {eq}'
                assert column in indices, f'Can not run query on non-indexed column "{column}"'
                values.append(value)
                if eq=='!=':
                    eq='<>'
                if eq=='==':
                    eq='='
                return f'({column} {eq} ?)'
        query = (f'SELECT key FROM indexed WHERE {parse(*params)}', values)
        for key in self._column(*q):
            yield from map(self.loads, self.connection.execute(*q))
    
    def add_index(self, column, type):
        ...
    
    def del_index(self, column, type):
        ...
    
    def indices(self):
        'columns of indexed - key'
        return []
    
    def loads(self, s):
        return cerealizer.loads(s)
    
    def dumps(self, obj):
        return cerealizer.dumps(obj)
    
    def __len__(self):
        q = (f'SELECT COUNT(key) FROM objects',)
        return next(self.connection.execute(*q))[0]


d = DiskDict(':in-memory:')

print(len(d))
d['a'] = 3
print(len(d))
print(list(d))
print(d['a'])
print(d.get('x'))
print(list(d.where(('a', '==', 3))))


1
1
['a']
3
None


AssertionError: Can not run query on non-indexed column "a"

In [70]:
from collections import deque, Counter,OrderedDict
import json


class CustomJSON:
    
    default_encoders = {
        tuple: list,
        deque: list,
        set: list,
        frozenset: list,
        Counter: dict,
        OrderedDict: dict,
    }
    
    def __init__(self, encoders=None, defaults=True):
        'encoders is a dict [class]->converter, where class has a __name__ and'
        'converter converts class into a list or a dict'
        defaults = self.__class__.default_encoders if defaults else {}
        self.encoders = {**defaults, **(encoders or {})}
    
    def stringify(self, text):
        return json.dumps(self.pre_encode(text))
    
    def parse(self, text):
        return self.post_decode(json.loads(text))
    
    def pre_encode(self, obj):
        'convert obj into a JSON serializable object recursively'
        out = self._pre_encode(obj)
        if isinstance(out, dict):
            out = {k:self.pre_encode(v) for k,v in out.items()}
        elif isinstance(out, list):
            out = [self.pre_encode(v) for v in out]
        return out

    def _pre_encode(self, obj):
        'converts obj to a JSON serializable only on the first depth level'
        if isinstance(obj, dict):
            return {(f'**{k}' if k.startswith('*') else k):v for k,v in obj.items()}
        for cls, to_js in self.encoders.items():
            if isinstance(obj, cls):
                out = to_js(obj)
                if isinstance(out, dict):
                    out = {'*':cls.__name__, **self._pre_encode(out)}
                else:
                    out = {'*':cls.__name__, '**': out}
                return out
        if any(isinstance(obj, cls) for cls in (dict, list, int, str, float, bool)) or obj is None:
            return obj
        raise Exception(f'Unknown class {type(obj)} of object:\n{obj}')
    
    def post_decode(self, obj):
        decoders = {cls.__name__:cls for cls in self.encoders}
        
        def post_decode(obj):
            out = obj
            if isinstance(out, dict):
                out = {k:post_decode(v) for k,v in out.items()}
            elif isinstance(out, list):
                out = [post_decode(v) for v in out]
            if isinstance(out, dict) and '*' in out:
                clsname = out.pop('*')
                cls = decoders[clsname]
                out = cls(out.pop('**'))
            if isinstance(out, dict):
                out = {(k[2:] if k.startswith('**') else k):v for k,v in out.items()}
            return out
        
        return post_decode(obj)

JSON = CustomJSON()


AttributeError: type object 'CustomJSON' has no attribute 'defaults'

In [None]:

objs = [
    set(),
]

for obj in objs:
    r = JSON.parse(JSON.stringify(obj))
    print(obj == r)
    print(obj)
    print(JSON.stringify(obj))
    print(r)
    print()
