In [1]:
from boto3.dynamodb.conditions import Attr
res = (Attr('test').eq(1) & Attr('a').lt(2)) | Attr('b').contains('a')

In [12]:
test = "(( A__eq:str:1)OR(B__lt:str:2)AND(C__gt:int:3)AND(D:str:a))"

In [13]:
# STEP 1
# Split string into a list of untyped tokens
import re
separators = ['\(', '\)', ':']

# re.partition(re.split(''))
rstr = "({})".format('|'.join(separators))
splitted = re.split(re.compile(rstr), test)
tokens_to_ignore = {':', ''}
no_type_tokens = list(filter(lambda tok: tok not in tokens_to_ignore, [s.strip() for s in splitted]))
no_type_tokens

['(',
 '(',
 'A__eq',
 'str',
 '1',
 ')',
 'OR',
 '(',
 'B__lt',
 'str',
 '2',
 ')',
 'AND',
 '(',
 'C__gt',
 'int',
 '3',
 ')',
 'AND',
 '(',
 'D',
 'str',
 'a',
 ')',
 ')']

In [14]:
# Get token primary and secondary types


class Token:
    def __init__(self, literal, primary_type, secondary_type):
        if primary_type not in registered_token_types or secondary_type not in registered_token_types[primary_type]:
            raise Exception('Attempted to tokenize an unregistered primary/secondary type')
        
        self.literal = literal
        self.primary_type = primary_type
        self.secondary_type = secondary_type
        self.string_notation = f'{primary_type}:{secondary_type}'
    
    def set_string_notation(self):
        self.string_notation = f'{primary_type}:{secondary_type}'
    
    def update_secondary_type(self, new_secondary_type):
        self.secondary_type = new_secondary_type
        self.set_string_notation()
    
    def __str__(self):
        return self.string_notation


class ContextToken(Token):
    pass


import operator
token_operator = {'and': operator.and_, 'or': operator.or_}
token_precedence = {'and': 2, 'or': 1}

class LogicalToken(Token):
    def __init__(self, literal, primary_type, secondary_type):
        super().__init__(literal, primary_type, secondary_type)
        self.precedence = token_precedence[self.secondary_type]
        self.operator = token_operator[self.secondary_type]

from decimal import Decimal
from datetime import datetime, date
value_types = {
    'int': int,
    'str': str,
    'float': float,
    'decimal': Decimal,
    'date': date,
    'datetime': datetime
}
    
class ExpressionToken(Token):
    def get_field_and_lookup_expression(self):
        if not self.secondary_type == 'field':
            return None
        
        splitted_field = self.literal.split('__')
        splitted_count = len(splitted_field)
        if splitted_count == 1:
            lookup_expression = 'eq'
        elif splitted_count in [2, 3]:
            lookup_expression = splitted_field[-1]
        else:
            raise Exception('Invalid lookup expression!')
        
        return splitted_field[0], lookup_expression
    
    def get_value_type_casting_method(self):
        if not self.secondary_type == 'value_type':
            return None
        
        try:
            return value_types[self.literal]
        except KeyError:
            raise Exception('Invalid value type!')
        


registered_token_types = {
    'context': {'open', 'close'},
    'logical': {'and', 'or'},
    'expression': {'field', 'value_type', 'value'}
}

general_token_types = {
    '(': ('context', 'open'),
    ')': ('context', 'close'),
    'AND': ('logical', 'and'),
    'OR': ('logical', 'or')
}

field_lambda = lambda _: ('expression', 'field')
expression_token_types = {
    # previous token: (typ, subtype)
    None: field_lambda,
    'context:open': field_lambda,
    'expression:field': lambda next_literal: ('expression', 'value') if next_literal in [None, ')'] else ('expression', 'value_type'),
    'expression:value_type': lambda _: ('expression', 'value')
}

primary_type_to_Token = {
    'context': ContextToken,
    'logical': LogicalToken,
    'expression': ExpressionToken
}

def get_instanced_token(current_literal: str, previous_token_string_notation: str, next_literal: str):
    try:
        primary_type, secondary_type = general_token_types[current_literal]
    except KeyError:
        try:
            primary_type, secondary_type = expression_token_types[previous_token_string_notation](next_literal)
        except KeyError:
            raise Exception(f'Couldn\'t determine token type!\nCurrent -> {current_literal}\nPrevious -> {previous_token_string_notation}\nNext -> {next_literal}')
    
    if primary_type == None or secondary_type == None:
        return None
    
    return primary_type_to_Token[primary_type](current_literal, primary_type, secondary_type)
        
        


In [15]:
instanced_tokens = []

previous_token = None
for index, current_token in enumerate(no_type_tokens):

    try:
        next_token = no_type_tokens[index + 1]
    except IndexError:
        next_token = None

    instanced_token = get_instanced_token(current_token, previous_token, next_token)
    instanced_tokens.append(instanced_token)
    previous_token = str(instanced_token)

print(test)
[f'{str(t)} -> {t.literal}' for t in instanced_tokens]

(( A__eq:str:1)OR(B__lt:str:2)AND(C__gt:int:3)AND(D:str:a))


['context:open -> (',
 'context:open -> (',
 'expression:field -> A__eq',
 'expression:value_type -> str',
 'expression:value -> 1',
 'context:close -> )',
 'logical:or -> OR',
 'context:open -> (',
 'expression:field -> B__lt',
 'expression:value_type -> str',
 'expression:value -> 2',
 'context:close -> )',
 'logical:and -> AND',
 'context:open -> (',
 'expression:field -> C__gt',
 'expression:value_type -> int',
 'expression:value -> 3',
 'context:close -> )',
 'logical:and -> AND',
 'context:open -> (',
 'expression:field -> D',
 'expression:value_type -> str',
 'expression:value -> a',
 'context:close -> )',
 'context:close -> )']

In [17]:
class Node:
    uid = 0
    def __init__(self, left_child=None, right_child=None):
        self.left_child = left_child
        self.right_child = right_child
        Node.uid +=1
        self._uid = Node.uid

class ExpressionNode(Node):
    def __init__(self, *args):
        
        if len(args) not in [2, 3]:
            raise Exception(f'Couldn\'t build ExpressionNode! Tokens -> {args}')
        
        field_token = args[0]
        value_token = args[-1]
        value_type_token = args[1] if len(args) == 3 else None
            
        if any(not isinstance(tok, ExpressionToken) for tok in [field_token, value_token]):
            raise Exception('Cannot build expression node out of non ExpressionTokens!')
        
        field, lookup_expression = field_token.get_field_and_lookup_expression()
        self.field = field
        self.lookup_expression = lookup_expression
        
#         import ipdb; ipdb.set_trace
        value_type = None if not value_type_token else value_type_token.get_value_type_casting_method()
        self.value = value_type(value_token.literal) if value_type else value_token.literal
        super().__init__()
        
class LogicalNode(Node):
    def __init__(self, logical_token):
        self.operator = logical_token.operator
        super().__init__()


next_expected_tokens = {
    None: {'context:open', 'expression:field'},
    'context:open': {'context:open', 'expression:field'},
    'context:close': {'context:close', 'logical:and', 'logical:or'},
    'logical:and': {'context:open'},
    'logical:or': {'context:open'},
    'expression:field': {'expression:value_type', 'expression:value'},
    'expression:value_type': {'expression:value'},
    'expression:value': {'context:close'}
}



# build AST
from copy import deepcopy

contexts_stack = [{'head': None, 'last_node': None, 'last_logical': None}]
current_expression_tokens = []
expected_tokens = next_expected_tokens[None]

# instanced_tokens.append(None)  # End
for tok in instanced_tokens:
    tok_repr = str(tok)
    if tok_repr not in expected_tokens:
        import pdb; pdb.set_trace()
        raise Exception(f'Unexpected token encountered -> {tok_repr}, one of the following was expected -> {expected_tokens}')
    
    if isinstance(tok, ContextToken):
        if tok.secondary_type == 'open':
            contexts_stack.append({'head': None, 'last_node': None, 'last_logical': None})
        elif tok.secondary_type == 'close':
            if not contexts_stack:
                raise Exception('Attempted to close unexistent context!')

            if current_expression_tokens:
                try:
                    expression_node = ExpressionNode(*current_expression_tokens)
                    current_expression_tokens = []
                except Exception as e:
                    raise Exception(f'Couldn\'t parse expression node! -> {e}')

                node_to_add = deepcopy(expression_node)
                if contexts_stack[-1]['head'] == None:
                    contexts_stack[-1]['head'] = node_to_add
                elif isinstance(contexts_stack[-1]['last_node'], LogicalNode):
                    contexts_stack[-1]['last_node'].right_child = node_to_add
                else:
                    raise Exception('Found an unexpected expression??')
                
                expression_node = None
                contexts_stack[-1]['last_node'] = node_to_add

#             else:
#             import ipdb; ipdb.set_trace()
            context_to_merge = contexts_stack.pop()
            if contexts_stack[-1]['head'] == None:
                contexts_stack[-1]['head'] = context_to_merge['head']
            elif isinstance(contexts_stack[-1]['last_node'], LogicalNode):
                contexts_stack[-1]['last_node'].right_child = context_to_merge['head']
            contexts_stack[-1]['last_node'] = context_to_merge['head']
    
    elif isinstance(tok, LogicalToken):
#         if contexts_stack[-1]['last_logical'].precedence <
#         import ipdb;ipdb.set_trace()
#         print('test')
        logical_node = LogicalNode(tok)
        try:
            logical_node.left_child = contexts_stack[-1]['last_node']
        except:
            raise Exception('No previous node for logical operator!')
        if contexts_stack[-1]['last_node'] == contexts_stack[-1]['head']:
            contexts_stack[-1]['head'] = logical_node
        else:
            contexts_stack[-1]['last_logical'].right_child = logical_node
    
        contexts_stack[-1]['last_node'] = logical_node
        contexts_stack[-1]['last_logical'] = logical_node

    elif isinstance(tok, ExpressionToken):
        current_expression_tokens.append(tok)            
            

    expected_tokens = next_expected_tokens[str(tok)]

if not len(contexts_stack) == 1:
    raise Excpetion('Unexpected end of string')

print(test)
print(contexts_stack[0]['head'].__dict__)
print(contexts_stack[0]['head'].left_child.__dict__)
print(contexts_stack[0]['head'].right_child.left_child.__dict__)
print(contexts_stack[0]['head'].right_child.right_child.left_child.__dict__)
print(contexts_stack[0]['head'].right_child.right_child.right_child.__dict__)

(( A__eq:str:1)OR(B__lt:str:2)AND(C__gt:int:3)AND(D:str:a))
{'operator': <built-in function or_>, 'left_child': <__main__.ExpressionNode object at 0x7f15406e0080>, 'right_child': <__main__.LogicalNode object at 0x7f15406c9b38>, '_uid': 2}
{'field': 'A', 'lookup_expression': 'eq', 'value': '1', 'left_child': None, 'right_child': None, '_uid': 1}
{'field': 'B', 'lookup_expression': 'lt', 'value': '2', 'left_child': None, 'right_child': None, '_uid': 3}
{'field': 'C', 'lookup_expression': 'gt', 'value': 3, 'left_child': None, 'right_child': None, '_uid': 5}
{'field': 'D', 'lookup_expression': 'eq', 'value': 'a', 'left_child': None, 'right_child': None, '_uid': 7}


In [7]:
class NewUser:

    uid = 0
    def __init__(self, username):
        self.username = username
        NewUser.uid += 1
        self._uid = NewUser.uid
    
    def __eq__(self, other):
        if not isinstance(other, NewUser):
            raise Exception('Not implemented!')
        return self._uid == other._uid
 
x = NewUser("User1")
print(x.username)
print(x._uid)
 
y = NewUser("User2")
print(y.username)
print(y._uid)
 
z = NewUser("User3")
print(z.username)
print(z._uid)

x == x


User1
1
User2
2
User3
3


True