In [3]:
import argparse
import logging
import os
import sys

from redbaron import RedBaron

from toolz.curried import keyfilter

# Loading

In [4]:
source_dir = '../../openfisca-france/openfisca_france/model/'
filenames = []

for root, directories, files in os.walk(source_dir):    
    for filename in files:
        complete_filename = os.path.join(root, filename)
        assert complete_filename[:len(source_dir)] == source_dir
        complete_filename = complete_filename[len(source_dir):]
        filenames.append(complete_filename)
filenames

['mesures.py',
 'datatrees.py',
 'base.py',
 '__init__.py',
 'revenus/__init__.py',
 'revenus/autres.py',
 'revenus/capital/plus_value.py',
 'revenus/capital/foncier.py',
 'revenus/capital/__init__.py',
 'revenus/capital/financier.py',
 'revenus/activite/salarie.py',
 'revenus/activite/non_salarie.py',
 'revenus/activite/__init__.py',
 'revenus/remplacement/chomage.py',
 'revenus/remplacement/indemnites_journalieres_securite_sociale.py',
 'revenus/remplacement/__init__.py',
 'revenus/remplacement/retraite.py',
 'caracteristiques_socio_demographiques/demographie.py',
 'caracteristiques_socio_demographiques/__init__.py',
 'caracteristiques_socio_demographiques/logement.py',
 'prelevements_obligatoires/taxe_habitation.py',
 'prelevements_obligatoires/isf.py',
 'prelevements_obligatoires/__init__.py',
 'prelevements_obligatoires/impot_revenu/charges_deductibles.py',
 'prelevements_obligatoires/impot_revenu/credits_impot.py',
 'prelevements_obligatoires/impot_revenu/reductions_impot.py',
 '

In [5]:
filenames.remove('base.py')
filenames.remove('datatrees.py')
filenames.remove('prelevements_obligatoires/prelevements_sociaux/cotisations_sociales/preprocessing.py')

In [6]:
redbaron_trees = {}
for filename in filenames:
    with open(source_dir + filename) as source_file:
        source_code = source_file.read()
    red = RedBaron(source_code)
    redbaron_trees[filename] = red
    print('{} parsed'.format(filename))

mesures.py parsed
__init__.py parsed
revenus/__init__.py parsed
revenus/autres.py parsed
revenus/capital/plus_value.py parsed
revenus/capital/foncier.py parsed
revenus/capital/__init__.py parsed
revenus/capital/financier.py parsed
revenus/activite/salarie.py parsed
revenus/activite/non_salarie.py parsed
revenus/activite/__init__.py parsed
revenus/remplacement/chomage.py parsed
revenus/remplacement/indemnites_journalieres_securite_sociale.py parsed
revenus/remplacement/__init__.py parsed
revenus/remplacement/retraite.py parsed
caracteristiques_socio_demographiques/demographie.py parsed
caracteristiques_socio_demographiques/__init__.py parsed
caracteristiques_socio_demographiques/logement.py parsed
prelevements_obligatoires/taxe_habitation.py parsed
prelevements_obligatoires/isf.py parsed
prelevements_obligatoires/__init__.py parsed
prelevements_obligatoires/impot_revenu/charges_deductibles.py parsed
prelevements_obligatoires/impot_revenu/credits_impot.py parsed
prelevements_obligatoires

# Custom exceptions

In [7]:
angry_rbnode = None
angry_global_context = None
angry_local_context = None

In [8]:
class ParsingException(Exception):
    def __init__(self, message, rbnode, global_context, local_context):
        global angry_rbnode
        global angry_global_context
        global angry_local_context

        angry_rbnode = rbnode
        angry_global_context = global_context
        angry_local_context = local_context

        super(ParsingException, self).__init__(message)

In [9]:
class NotImplementedParsingError(ParsingException):
    pass

In [10]:
class AssertionParsingError(ParsingException):
    pass

In [11]:
def parsing_assert(cond, rbnode, global_context, local_context):
    if cond:
        return
    
    raise AssertionParsingError('', rbnode, global_context, local_context)

# Helpers

In [12]:
import unicodedata

def rbnode_to_exception(rbnode):
    '''Because exceptions are ASCII only in python2'''
    str1 = rbnode.dumps() # unicode string wrongly known as 'str'
    str2 = unicode(str1, 'utf-8') # unicode string as unicode
    str3 = unicodedata.normalize('NFKD', str2).encode('ascii', 'ignore') # ignore special chars
    
    return str3

In [13]:
def parse_date(atomtrailer, global_context, local_context):
    parsing_assert(atomtrailer.type == 'atomtrailers', atomtrailer, global_context, local_context)
    parsing_assert(len(atomtrailer.value) == 2, atomtrailer, global_context, local_context)
    parsing_assert(atomtrailer.value[0].type == 'name', atomtrailer, global_context, local_context)
    parsing_assert(atomtrailer.value[0].value == 'date', atomtrailer, global_context, local_context)
    call_node = atomtrailer.value[1]
    parsing_assert(call_node.type == 'call', atomtrailer, global_context, local_context)
    parsing_assert(len(call_node.value) == 3, atomtrailer, global_context, local_context)
    parsing_assert(call_node.value[0].type == 'call_argument', atomtrailer, global_context, local_context)
    parsing_assert(not call_node.value[0].target, atomtrailer, global_context, local_context)
    parsing_assert(call_node.value[0].value.type == 'int', atomtrailer, global_context, local_context)
    year = call_node.value[0].value.value
    parsing_assert(call_node.value[1].type == 'call_argument', atomtrailer, global_context, local_context)
    parsing_assert(not call_node.value[1].target, atomtrailer, global_context, local_context)
    parsing_assert(call_node.value[1].value.type == 'int', atomtrailer, global_context, local_context)
    month = call_node.value[1].value.value
    parsing_assert(call_node.value[2].type == 'call_argument', atomtrailer, global_context, local_context)
    parsing_assert(not call_node.value[2].target, atomtrailer, global_context, local_context)
    parsing_assert(call_node.value[2].value.type == 'int', atomtrailer, global_context, local_context)
    day = call_node.value[2].value.value
    
    return {'year': year, 'month': month, 'day': day}

In [14]:
def parse_enum(atomtrailers, global_context, local_context):
    parsing_assert(atomtrailers.type == 'atomtrailers', rbnode, global_context, local_context)
    
    parsing_assert(len(atomtrailers.value) == 2, rbnode, global_context, local_context)
    parsing_assert(atomtrailers.value[0].type == 'name', rbnode, global_context, local_context)
    parsing_assert(atomtrailers.value[0].value == 'Enum', rbnode, global_context, local_context)
    
    call_node = atomtrailers.value[1]
    parsing_assert(call_node.type == 'call', rbnode, global_context, local_context)
    parsing_assert(len(call_node.value) == 1, rbnode, global_context, local_context)
    parsing_assert(call_node.value[0].type == 'call_argument', rbnode, global_context, local_context)
    parsing_assert(not call_node.value[0].target, rbnode, global_context, local_context)
    
    enum_list_node = call_node.value[0].value
    parsing_assert(enum_list_node.type == 'list', rbnode, global_context, local_context)
    
    enum_list = []
    for element in enum_list_node.value:
        parsing_assert(element.type == 'unicode_string', rbnode, global_context, local_context)
        enum_list.append(element.value)
        
    return enum_list

In [15]:
def parse_parameter_path(atomtrailers, first_index, global_context, local_context):
    parsing_assert(atomtrailers.type == 'atomtrailers', atomtrailers, global_context, local_context)
    
    parameter_path = []
    for i in range(first_index, len(atomtrailers.value)):
        path_component = atomtrailers.value[i]
        parsing_assert(path_component.type == 'name', rbnode, global_context, local_context)
        parameter_path.append(path_component.value)

    return parameter_path

In [198]:
def parse_arguments(call, global_index, local_index):
    parsing_assert(call.type == 'call', rbnode, global_context, local_context)
    
    arg_list = []
    arg_dict = {}
    
    no_target = True
    args = call.value
    for arg in args:
        parsing_assert(args.type == 'call_argument', rbnode, global_context, local_context)
        if args[0].target:
            no_target = False
            parsing_assert(arg.type == 'call_argument', rbnode, global_context, local_context)
            parsing_assert(arg.target.type == 'name', rbnode, global_context, local_context)
            target = arg.target.value
        else:
            parsing_assert(no_target, rbnode, global_context, local_context)
            parsing_assert(arg.value.type == 'name', rbnode, global_context, local_context)

        child_context = {
            'keyword': 'expression',
            'local_variables': local_context['local_variables'],
        }
        visit_function_rbnode(arg.value, global_context, child_context)
        value = child_context['tmp_var']
        
        if args[0].target:
            arg_dict[target] = value
        else:
            arg_list.append(value)

        return arg_list, arg_dict

# Module traversal functions

In [16]:
def visit_module_rbnode(rbnode, global_context, local_context):
    visitors = keyfilter(lambda key: key.startswith('visit_module_'), globals()) # should be defined once
    visitor = visitors.get('visit_module_' + rbnode.type)
    if visitor is None:
        raise NotImplementedParsingError(
            'Module visitor not declared for type="{type}"'.format(
                type=rbnode.type,
                ), rbnode, global_context, local_context)
    ofnode = visitor(rbnode, global_context, local_context)
    return ofnode


In [17]:
def visit_module_endl(rbnode, global_context, local_context):
    return

In [18]:
def visit_module_from_import(rbnode, global_context, local_context):
    parsing_assert(local_context['keyword'] == 'module', rbnode, global_context, local_context)
    
    # unmodified (TODO)
    local_context['imports'].append(rbnode)

In [19]:
def visit_module_import(rbnode, global_context, local_context):
    parsing_assert(local_context['keyword'] == 'module', rbnode, global_context, local_context)
    
    # unmodified (TODO)
    local_context['imports'].append(rbnode)

In [20]:
def visit_module_comment(rbnode, global_context, local_context):
    # comments are discarded for the moment (TODO)
    return

In [21]:
def visit_module_class(rbnode, global_context, local_context):
    parsing_assert(local_context['keyword'] == 'module', rbnode, global_context, local_context)
    
    name = rbnode.name
    
    if name == 'rsa_ressource_calculator':
        return
    
    parsing_assert(not rbnode.decorators, rbnode, global_context, local_context)
    
    upper_classes = []
    for upper_class in rbnode.inherit_from:
        parsing_assert(upper_class.type == 'name', rbnode, global_context, local_context)
        upper_classes.append(upper_class.value)
        
    class_obj = {
        'type': 'class',
        'name': name,
        'upper_classes': upper_classes,
        'content': rbnode.value,
        }

    local_context['classes'].append(class_obj)

In [22]:
def visit_module_def(rbnode, global_context, local_context):
    parsing_assert(local_context['keyword'] == 'module', rbnode, global_context, local_context)
    
    if rbnode.name in ['_revprim', 'preload_zone_apl']:
        return
    
    # unmodified (TODO)
    local_context['auxiliary_functions'].append(rbnode)

In [23]:
def visit_module_assignment(rbnode, global_context, local_context):
    parsing_assert(local_context['keyword'] == 'module', rbnode, global_context, local_context)

    parsing_assert(rbnode.operator == '', rbnode, global_context, local_context)
    
    parsing_assert(rbnode.target.type == 'name', rbnode, global_context, local_context)
    name = rbnode.target.value
    
    if name in ['zone_apl_by_depcom']:
        return
    
    if rbnode.value.type == 'int':
        local_context['constants'].append({
                'name': name,
                'type': 'int',
                'value': rbnode.value.value,
            })
        return
    
    if rbnode.value.type == 'name':
        parsing_assert(rbnode.value.value == 'None', rbnode, global_context, local_context)
        local_context['constants'].append({
                'name': name,
                'type': 'None',
                'value': None,
            })
        return
    
    if rbnode.value.type == 'atomtrailers':
        atomtrailers = rbnode.value

        parsing_assert(atomtrailers.value[0].type == 'name', rbnode, global_context, local_context)
        function_name = atomtrailers.value[0].value
        if function_name == 'Enum':
            enum_list = parse_enum(atomtrailers, global_context, local_context)

            local_context['enums'].append({
                'name': name,
                'enum_list': enum_list,
            })
            return

        if function_name == 'logging':
            # ignore logging
            return

        raise ParsingException('Unknown atomtrailers', rbnode, global_context, local_context)

    raise ParsingException('Unknown type', rbnode, global_context, local_context)


# Module parsing

In [24]:
global_context = {}

for name in filenames:
    print('Visiting ' + name)
    red = redbaron_trees[name]
    

    
    local_context = {
        'keyword': 'module',
        'module_name': name,
        'imports': [],
        'classes': [],
        'enums': [],
        'auxiliary_functions': [],
        'constants': [],
        }
    
    for rbnode in red:
        visit_module_rbnode(rbnode, global_context, local_context)
        
    global_context[name] = {
        'imports': local_context['imports'],
        'classes': local_context['classes'],
        'enums': local_context['enums'],
        'auxiliary_functions': local_context['auxiliary_functions'],
        'constants': local_context['constants'],
    }
    
parsed_modules = global_context

Visiting mesures.py
Visiting __init__.py
Visiting revenus/__init__.py
Visiting revenus/autres.py
Visiting revenus/capital/plus_value.py
Visiting revenus/capital/foncier.py
Visiting revenus/capital/__init__.py
Visiting revenus/capital/financier.py
Visiting revenus/activite/salarie.py
Visiting revenus/activite/non_salarie.py
Visiting revenus/activite/__init__.py
Visiting revenus/remplacement/chomage.py
Visiting revenus/remplacement/indemnites_journalieres_securite_sociale.py
Visiting revenus/remplacement/__init__.py
Visiting revenus/remplacement/retraite.py
Visiting caracteristiques_socio_demographiques/demographie.py
Visiting caracteristiques_socio_demographiques/__init__.py
Visiting caracteristiques_socio_demographiques/logement.py
Visiting prelevements_obligatoires/taxe_habitation.py
Visiting prelevements_obligatoires/isf.py
Visiting prelevements_obligatoires/__init__.py
Visiting prelevements_obligatoires/impot_revenu/charges_deductibles.py
Visiting prelevements_obligatoires/impot_rev

# Class traversal functions

In [25]:
def visit_class_rbnode(rbnode, global_context, local_context):
    visitors = keyfilter(lambda key: key.startswith('visit_class_'), globals()) # should be defined once
    visitor = visitors.get('visit_class_' + rbnode.type)
    if visitor is None:
        raise NotImplementedParsingError(
            'Class visitor not declared for type="{type}"'.format(
                type=rbnode.type,
                ), rbnode, global_context, local_context)
    ofnode = visitor(rbnode, global_context, local_context)
    return ofnode


In [26]:
def visit_class_endl(rbnode, global_context, local_context):
    return

In [27]:
def visit_class_assignment(rbnode, global_context, local_context):
    parsing_assert(local_context['keyword'] == 'class', rbnode, global_context, local_context)
    
    parsing_assert(rbnode.operator == '', rbnode, global_context, local_context)
    
    parsing_assert(rbnode.target.type == 'name', rbnode, global_context, local_context)
    target = rbnode.target.value
    
    if target == 'column':
        if rbnode.value.type == 'atomtrailers':
            parsing_assert(len(rbnode.value.value) == 2, rbnode, global_context, local_context)

            parsing_assert(rbnode.value.value[0].type == 'name', rbnode, global_context, local_context)
            column_name = rbnode.value.value[0].value

            call_node = rbnode.value.value[1]
            parsing_assert(call_node.type == 'call', rbnode, global_context, local_context)
            column_args = {}
            for arg in call_node.value:
                parsing_assert(arg.target.type == 'name', rbnode, global_context, local_context)
                column_args[arg.target.value] = arg.value

            parsing_assert('column' not in local_context['class_variables'].keys(), rbnode, global_context, local_context)
            local_context['class_variables']['column'] = column_name
            local_context['class_variables']['column_args'] = column_args

        elif rbnode.value.type == 'name':
            column_name = rbnode.value.value

            parsing_assert('column' not in local_context['class_variables'].keys(), rbnode, global_context, local_context)
            local_context['class_variables']['column'] = column_name
        else:
            raise NotImplementedParsingError('Unknown type', rbnode, global_context, local_context)
    
    elif target == 'entity_class':
        parsing_assert(rbnode.value.type == 'name', rbnode, global_context, local_context)
        
        parsing_assert('entity_class' not in local_context, rbnode, global_context, local_context)
        local_context['entity_class'] = rbnode.value.value
              
    elif target == 'label':
        # can be unicode_string or string_chain ! (TODO)
        # parsing_assert(rbnode.value.type == 'unicode_string', rbnode, global_context, local_context)
        
        parsing_assert('label' not in local_context, rbnode, global_context, local_context)
        local_context['label'] = rbnode.value
        
    elif target == 'start_date':
        date = parse_date(rbnode.value, global_context, local_context)
        
        parsing_assert('start_date' not in local_context, rbnode, global_context, local_context)
        local_context['start_date'] = date
         
    elif target == 'stop_date':
        date = parse_date(rbnode.value, global_context, local_context)

        parsing_assert('stop_date' not in local_context, rbnode, global_context, local_context)
        local_context['stop_date'] = date
        
    elif target == 'url':
        # can be a tuple, see revnet (TODO)
        # parsing_assert(rbnode.value.type in ['string', 'unicode_string'], rbnode, global_context, local_context)
        
        parsing_assert('url' not in local_context, rbnode, global_context, local_context)
        local_context['url'] = rbnode.value.value
        
             
    elif target == 'operation':
        parsing_assert(rbnode.value.type == 'string', rbnode, global_context, local_context)
        
        parsing_assert('operation' not in local_context, rbnode, global_context, local_context)
        local_context['operation'] = rbnode.value.value
        
             
    elif target == 'variable':
        parsing_assert(rbnode.value.type == 'name', rbnode, global_context, local_context)
        
        parsing_assert('variable' not in local_context, rbnode, global_context, local_context)
        local_context['variable'] = rbnode.value.value
              
    elif target == 'cerfa_field':
        # parsing_assert(rbnode.value.type == 'unicode_string', rbnode, global_context, local_context)
        # can be a unicode string or a dict
        
        parsing_assert('cerfa_field' not in local_context, rbnode, global_context, local_context)
        local_context['cerfa_field'] = rbnode.value
                
    elif target == 'is_permanent':
        parsing_assert(rbnode.value.type == 'name', rbnode, global_context, local_context)
        parsing_assert(rbnode.value.value in ['True', 'False'], rbnode, global_context, local_context)        
        
        parsing_assert('is_permanent' not in local_context, rbnode, global_context, local_context)
        local_context['is_permanent'] = rbnode.value.value == 'True'
    
    elif target == 'base_function':
        parsing_assert(rbnode.value.type == 'name', rbnode, global_context, local_context)
        
        parsing_assert('base_function' not in local_context, rbnode, global_context, local_context)
        local_context['base_function'] = rbnode.value.value
              
    elif target == 'calculate_output':
        parsing_assert(rbnode.value.type == 'name', rbnode, global_context, local_context)
        
        parsing_assert('calculate_output' not in local_context, rbnode, global_context, local_context)
        local_context['calculate_output'] = rbnode.value.value
              
    elif target == 'set_input':
        parsing_assert(rbnode.value.type == 'name', rbnode, global_context, local_context)
        
        parsing_assert('set_input' not in local_context, rbnode, global_context, local_context)
        local_context['set_input'] = rbnode.value.value
        
    elif target == 'role':
        parsing_assert(rbnode.value.type == 'name', rbnode, global_context, local_context)
        
        parsing_assert('role' not in local_context, rbnode, global_context, local_context)
        local_context['role'] = rbnode.value.value
              

    else:            
        raise NotImplementedParsingError('Unknown class variable {}'.format(target), rbnode, global_context, local_context)
            


In [28]:
def visit_class_def(rbnode, global_context, local_context):
    parsing_assert(local_context['keyword'] == 'class', rbnode, global_context, local_context)
    name = rbnode.name
    
    decorators = rbnode.decorators

    arguments = []
    for arg in rbnode.arguments:
        parsing_assert(arg.type == 'def_argument', rbnode, global_context, local_context)
        parsing_assert(arg.target.type == 'name', rbnode, global_context, local_context)
        arguments.append(arg.target.value)
        parsing_assert(not arg.value, rbnode, global_context, local_context)
    
    instructions = rbnode.value # unmodified (TODO)
    
    parsing_assert(name not in local_context['class_functions'], rbnode, global_context, local_context)
    local_context['class_functions'][name] = {
        'arguments': arguments,
        'decorators': decorators,
        'instructions': instructions,
    }

In [29]:
def visit_class_comment(rbnode, global_context, local_context):
    # ignored (TODO)
    return

In [30]:
def visit_class_string(rbnode, global_context, local_context):
    # ignored (TODO)
    return

# Class parsing

In [31]:
global_context = {}

for module_name, module in parsed_modules.items():
    print('Visiting module {} to parse its classes.'.format(module_name))
    
    global_context[module_name] = {
        'parsed_classes': {},
    }
    
    for cl in module['classes']:
        class_name = cl['name']
        print('Visiting class {}'.format(class_name))
        
        local_context = {
            'keyword': 'class',
            'class_name': name,
            'class_variables': {},
            'class_functions': {},
            }
    
        for rbnode in cl['content']:
            visit_class_rbnode(rbnode, global_context, local_context)

        global_context[module_name]['parsed_classes'][class_name] = {
            'class_variables': local_context['class_variables'],
            'class_functions': local_context['class_functions'],
            }

parsed_classes = global_context

Visiting module prestations/minima_sociaux/__init__.py to parse its classes.
Visiting module revenus/capital/__init__.py to parse its classes.
Visiting module prelevements_obligatoires/prelevements_sociaux/cotisations_sociales/contrat_professionnalisation.py to parse its classes.
Visiting class professionnalisation
Visiting class remuneration_professionnalisation
Visiting class exoneration_cotisations_employeur_professionnalisation
Visiting module revenus/activite/__init__.py to parse its classes.
Visiting module prelevements_obligatoires/impot_revenu/charges_deductibles.py to parse its classes.
Visiting class f6de
Visiting class f6gi
Visiting class f6gj
Visiting class f6el
Visiting class f6em
Visiting class f6gp
Visiting class f6gu
Visiting class f6eu
Visiting class f6ev
Visiting class f6dd
Visiting class f6ps
Visiting class f6rs
Visiting class f6ss
Visiting class f6aa
Visiting class f6cc
Visiting class f6eh
Visiting class f6da
Visiting class f6cb
Visiting class f6hj
Visiting class f6

# Function traversal visitors

In [32]:
def visit_function_rbnode(rbnode, global_context, local_context):
    visitors = keyfilter(lambda key: key.startswith('visit_function_'), globals()) # should be defined once
    visitor = visitors.get('visit_function_' + rbnode.type)
    if visitor is None:
        raise NotImplementedParsingError(
            'Function visitor not declared for type="{type}"'.format(
                type=rbnode.type,
                ), rbnode, global_context, local_context)
    ofnode = visitor(rbnode, global_context, local_context)
    return ofnode


In [33]:
def visit_function_endl(rbnode, global_context, local_context):
    return

In [34]:
def visit_function_assignment(rbnode, global_context, local_context):
    parsing_assert(local_context['keyword'] == 'function', rbnode, global_context, local_context)
    
    parsing_assert(rbnode.target.type == 'name', rbnode, global_context, local_context)
    name = rbnode.target.value
    
    rbvalue = rbnode.value
    local_context['keyword'] = 'expression'
    visit_function_rbnode(rbvalue, global_context, local_context)
    
    local_context['local_variables'][name] = local_context['tmp_var']
    del local_context['tmp_var']
    local_context['keyword'] = 'function'

In [219]:
def visit_function_atomtrailers(rbnode, global_context, local_context):
    parsing_assert(local_context['keyword'] == 'expression', rbnode, global_context, local_context)

    rb_first_item = rbnode.value[0]
    
    child_context = {
        'keyword': 'expression',
        'local_variables': local_context['local_variables'],
    }
    visit_function_rbnode(rb_first_item, global_context, child_context)
    first_item = child_context['tmp_var']

    if first_item['type'] == 'period':
        parsing_assert(len(rbnode.value) == 2, rbnode, global_context, local_context)

        parsing_assert(rbnode.value[1].type == 'name', rbnode, global_context, local_context)
        period_op = rbnode.value[1].value
        if period_op in ['this_month', 'n_2']:
            tmp_var = {
                'type': 'period', 
                'nodetype': 'period-operation', 
                'operator': period_op, 
                'operands': [first_item],
            }
            local_context['tmp_var'] = tmp_var
            return

        if period_op == 'start':
            tmp_var = {
                'type': 'instant', 
                'nodetype': 'period-to-instant', 
                'operator': 'start', 
                'operands': [first_item],
            }
            local_context['tmp_var'] = tmp_var
            return

        raise NotImplementedParsingError('Unknown period operand', rbnode, global_context, local_context)
            
    if first_item['type'] == 'simulation':

        parsing_assert(rbnode.value[1].type == 'name', rbnode, global_context, local_context)
        simulation_op = rbnode.value[1].value
        if simulation_op in ['calculate', 'compute', 'calculate_add']:
            parsing_assert(len(rbnode.value) == 3, rbnode, global_context, local_context)

            parsing_assert(rbnode.value[2].type == 'call', rbnode, global_context, local_context)
            args = rbnode.value[2].value
            parsing_assert(len(args) in [1, 2], rbnode, global_context, local_context)

            parsing_assert(args[0].type == 'call_argument', rbnode, global_context, local_context)
            parsing_assert(not args[0].target, rbnode, global_context, local_context)
            parsing_assert(args[0].value.type == 'string', rbnode, global_context, local_context)
            called_var = args[0].value.value
            operands = [called_var]

            if len(args) == 2:
                parsing_assert(args[1].type == 'call_argument', rbnode, global_context, local_context)
                parsing_assert(not args[1].target, rbnode, global_context, local_context)
                rb_arg_period = args[1].value
                child_context = {
                    'keyword': 'expression',
                    'local_variables': local_context['local_variables'],
                }
                visit_function_rbnode(rb_arg_period, global_context, child_context)
                arg_period = child_context['tmp_var']
                parsing_assert(arg_period['type'] == 'period', rbnode, global_context, local_context)
                operands.append(arg_period)

            tmp_var = {
                'type': 'value', 
                'nodetype': 'variable_for_period', 
                'operator': simulation_op, 
                'operands': operands,
            }
            local_context['tmp_var'] = tmp_var
            return

        if simulation_op == 'legislation_at':
            parsing_assert(rbnode.value[2].type == 'call', rbnode, global_context, local_context)
            args = rbnode.value[2].value
            parsing_assert(len(args) == 1, rbnode, global_context, local_context)
            rb_instant_arg = args[0]
            parsing_assert(rb_instant_arg.type == 'call_argument', rbnode, global_context, local_context)
            parsing_assert(not rb_instant_arg.target, rbnode, global_context, local_context)

            child_context = {
                'keyword': 'expression',
                'local_variables': local_context['local_variables'],
            }
            visit_function_rbnode(rb_instant_arg.value, global_context, child_context)
            instant_arg = child_context['tmp_var']
            parsing_assert(instant_arg['type'] == 'instant', rbnode, global_context, local_context)

            parameter_path = parse_parameter_path(rbnode, 3, global_context, local_context)

            tmp_var = {
                'type': 'parameter', 
                'nodetype': 'parameter', 
                'instant': instant_arg, 
                'path': parameter_path,
            }
            local_context['tmp_var'] = tmp_var
            return

        raise NotImplementedParsingError('Unknown simulation op.', rbnode, global_context, local_context)
            
    if first_item['type'] == 'self':

        parsing_assert(rbnode.value[1].type == 'name', rbnode, global_context, local_context)
        self_op = rbnode.value[1].value
        if self_op in ['split_by_roles', 'sum_by_entity']:
            parsing_assert(len(rbnode.value) == 3, rbnode, global_context, local_context)
            
            child_context = {
                'keyword': 'expression',
                'local_variables': local_context['local_variables'],
            }
            visit_function_rbnode(rbnode.value[2], global_context, child_context)
            args = child_context['tmp_var']

            tmp_var = {
                'type': 'value', 
                'nodetype': 'self_operation', 
                'operator': self_op, 
                'operands': args,
            }
            local_context['tmp_var'] = tmp_var
            return

        raise NotImplementedParsingError('Unknown self op.', rbnode, global_context, local_context)
            
    if first_item['type'] == 'parameter':
        parameter_path = parse_parameter_path(rbnode, 1, global_context, local_context)

        tmp_var = {
            'type': 'value', 
            'nodetype': 'parameter', 
            'instant': first_item['instant'], 
            'path': first_item['path'] + parameter_path,
        }
        local_context['tmp_var'] = tmp_var
        return
    
    if first_item['type'] == 'arithmetic_operation_tmp':
    
        if first_item['op'] in ['round', 'sum', 'not_']:
            parsing_assert(len(rbnode.value) == 2, rbnode, global_context, local_context)
            parsing_assert(rbnode.value[1].type == 'call', rbnode, global_context, local_context)
            args = rbnode.value[1].value
            
            if (len(args) == 1) and args[0].type == 'argument_generator_comprehension':
                # (TODO)
                tmp_var = {
                    'type': 'value', 
                    'nodetype': 'arithmetic_operation_comprehension',
                    'op': first_item['op'],
                    'rb_argument_generator_comprehension': args[0],
                }
                local_context['tmp_var'] = tmp_var
                return
                
            parsed_args = []
            for arg in args:
                parsing_assert(arg.type == 'call_argument', rbnode, global_context, local_context)
                parsing_assert(not arg.target, rbnode, global_context, local_context)
                
                child_context = {
                    'keyword': 'expression',
                    'local_variables': local_context['local_variables'],
                }
                visit_function_rbnode(arg.value, global_context, child_context)
                parsed_arg = child_context['tmp_var']
                parsed_args.append(parsed_arg)

            tmp_var = {
                'type': 'value', 
                'nodetype': 'arithmetic_operation', 
                'op': first_item['op'], 
                'operands': parsed_args,
            }
            local_context['tmp_var'] = tmp_var
            return
                   
        raise NotImplementedParsingError('Unknown arithmetic_operation_tmp.', rbnode, global_context, local_context)
        
    if first_item['type'] == 'apply_thresholds_tmp':
        tmp_var = {
            'type': 'value', 
            'nodetype': 'apply_thresholds', 
            'rbnode': rbnode,
        }
        local_context['tmp_var'] = tmp_var
        return

    raise NotImplementedParsingError('Unknown first item of an atomtrailers.', rbnode, global_context, local_context)
    

In [213]:
def visit_function_binary_operator(rbnode, global_context, local_context):
    op = rbnode.value
    
    parsed_args = []
    for arg in [rbnode.first, rbnode.second]:
        child_context = {
            'keyword': 'expression',
            'local_variables': local_context['local_variables'],
        }
        visit_function_rbnode(arg, global_context, child_context)
        parsed_arg = child_context['tmp_var']
        parsed_args.append(parsed_arg)

    tmp_var = {
        'type': 'value', 
        'nodetype': 'arithmetic_operation', 
        'op': op, 
        'operands': parsed_args,
    }
    local_context['tmp_var'] = tmp_var
    return


In [214]:
def visit_function_name(rbnode, global_context, local_context):
    name = rbnode.value
    
    if name in local_context['local_variables']:
        local_context['tmp_var'] = local_context['local_variables'][name]
        return

    if name in ['round', 'sum', 'not_']:
        tmp_var = {
            'type': 'arithmetic_operation_tmp', 
            'nodetype': 'arithmetic_operation_tmp', 
            'op': name, 
        }
        local_context['tmp_var'] = tmp_var
        return
    
    if name == 'apply_thresholds':
        # to deal with specifically (TODO)
        tmp_var = {
            'type': 'apply_thresholds_tmp', 
            'nodetype': 'apply_thresholds', 
        }
        local_context['tmp_var'] = tmp_var
        return
            
    raise NotImplementedParsingError('Unknown name {}'.format(name), rbnode, global_context, local_context)


In [215]:
def visit_function_int(rbnode, global_context, local_context):
    tmp_var = {
        'type': 'value', 
        'nodetype': 'int', 
        'value': rbnode.value, 
    }
    local_context['tmp_var'] = tmp_var
    return


In [216]:
def visit_function_float(rbnode, global_context, local_context):
    tmp_var = {
        'type': 'value', 
        'nodetype': 'float', 
        'value': rbnode.value, 
    }
    local_context['tmp_var'] = tmp_var
    return


In [204]:
def visit_function_associative_parenthesis(rbnode, global_context, local_context):
    visit_function_rbnode(rbnode.value, global_context, local_context)

In [205]:
def visit_function_return(rbnode, global_context, local_context):
    parsing_assert(rbnode.value.type == 'tuple', rbnode, global_context, local_context)
    
    returned_tuple = rbnode.value
    parsing_assert(len(returned_tuple.value) ==  2, rbnode, global_context, local_context)

    rb_period = returned_tuple.value[0]
    child_context = {
        'keyword': 'expression',
        'local_variables': local_context['local_variables'],
    }
    visit_function_rbnode(rb_period, global_context, child_context)
    returned_period = child_context['tmp_var']
    parsing_assert(returned_period['type'] == 'period', rbnode, global_context, local_context)

    rb_value = returned_tuple.value[1]
    child_context = {
        'keyword': 'expression',
        'local_variables': local_context['local_variables'],
    }
    visit_function_rbnode(rb_value, global_context, child_context)
    returned_value = child_context['tmp_var']
    parsing_assert(returned_value['type'] == 'value', rbnode, global_context, local_context)

    returned_value = {
        'type': 'return', 
        'nodetype': 'return', 
        'period': returned_period, 
        'value': returned_value,
    }
    parsing_assert('return' not in local_context, rbnode, global_context, local_context)
    local_context['return'] = returned_value
    return

    


In [206]:
def visit_function_comparison(rbnode, global_context, local_context):
    parsing_assert(rbnode.value.type == "comparison_operator", rbnode, global_context, local_context)
    op = rbnode.value.first
    parsing_assert(not rbnode.value.second, rbnode, global_context, local_context)

    parsed_args = []
    for arg in [rbnode.first, rbnode.second]:
        child_context = {
            'keyword': 'expression',
            'local_variables': local_context['local_variables'],
        }
        visit_function_rbnode(arg, global_context, child_context)
        parsed_arg = child_context['tmp_var']
        parsed_args.append(parsed_arg)

    tmp_var = {
        'type': 'value', 
        'nodetype': 'arithmetic_operation', 
        'op': op, 
        'operands': parsed_args,
    }
    local_context['tmp_var'] = tmp_var
    return


In [207]:
def visit_function_comment(rbnode, global_context, local_context):
    # ignored (TODO)
    return

In [208]:
def visit_function_list(rbnode, global_context, local_context):
    # ignored (TODO)
    tmp_var = {
        'type': 'list', 
        'nodetype': 'list', 
        'rbnode': rbnode,
    }
    local_context['tmp_var'] = tmp_var
    return


In [209]:
def visit_function_for(rbnode, global_context, local_context):
    # ignored (TODO)
    return

# Function parsing

In [220]:
global_context = {}

for module_name, module in parsed_classes.items():
    print('Visiting module {}'.format(module_name))
    
    global_context[module_name] = {
        'classes': {},
    }
    
    for class_name, cl in module['parsed_classes'].items():
        print('Visiting class {} to parse its function(s)'.format(class_name))
        
        global_context[module_name]['classes'][class_name] = {
            'parsed_functions': {},
        }
        
        for function_name, fn in cl['class_functions'].items():
            print('Visiting function {}'.format(function_name))
            
            local_context = {
                'keyword': 'function',
                'module_name': module_name,
                'class_name': class_name,
                'function_name': function_name,
                'local_variables': {
                    'period': {'type': 'period', 'nodetype': 'builtin-period'},
                    'simulation': {'type': 'simulation'},
                    'self': {'type': 'self'},
                },
            }
            
            for rbnode in fn['instructions']:
                visit_function_rbnode(rbnode, global_context, local_context)

            global_context[module_name]['classes'][class_name]['parsed_functions'][function_name] = {
                'return': local_context['return'],
                }


Visiting module prestations/minima_sociaux/__init__.py
Visiting module prestations/education.py
Visiting class bourse_lycee to parse its function(s)
Visiting function function


NotImplementedParsingError: Function visitor not declared for type="call"

In [221]:
angry_rbnode

In [187]:
angry_rbnode.help()

[38;5;148mForNode[39m[38;5;197m([39m[38;5;197m)[39m
[38;5;15m  [39m[38;5;242m# identifiers: for, for_, fornode[39m
[38;5;15m  [39m[38;5;15miterator[39m[38;5;15m [39m[38;5;197m->[39m
[38;5;15m    [39m[38;5;148mNameNode[39m[38;5;197m([39m[38;5;197m)[39m
[38;5;15m      [39m[38;5;242m# identifiers: name, name_, namenode[39m
[38;5;15m      [39m[38;5;15mvalue[39m[38;5;197m=[39m[38;5;186m'age_interval'[39m
[38;5;15m  [39m[38;5;15mtarget[39m[38;5;15m [39m[38;5;197m->[39m
[38;5;15m    [39m[38;5;148mNameNode[39m[38;5;197m([39m[38;5;197m)[39m
[38;5;15m      [39m[38;5;242m# identifiers: name, name_, namenode[39m
[38;5;15m      [39m[38;5;15mvalue[39m[38;5;197m=[39m[38;5;186m'salaire_en_smic'[39m
[38;5;15m  [39m[38;5;15melse[39m[38;5;15m [39m[38;5;197m->[39m
[38;5;15m    [39m[38;5;186mNone[39m
[38;5;15m  [39m[38;5;15mvalue[39m[38;5;15m [39m[38;5;197m->[39m
[38;5;15m    [39m[38;5;197m*[39m[38;5;81m AssignmentNo

In [None]:
angry_local_context

SyntaxError: cannot mix bytes and nonbytes literals (<ipython-input-90-573816f18ef9>, line 1)

In [121]:
parsed_classes['prelevements_obligatoires/impot_revenu/reductions_impot.py']['parsed_classes'][
    'patnat']['class_functions']['function_20100101_20101231']['instructions']

Index,node
0,'''  Dépenses de protections du patrimoine naturel (case 7KA)  2010  '''
1,period = period.this_year
2,"f7ka = simulation.calculate('f7ka', period)"
3,P = simulation.legislation_at(period.start).ir.reductions_impots.patnat
4,'\n '
5,max1 = P.max
6,"return period, P.taux * min_(f7ka, max1)"
7,'\n '
