In [2]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
"""
Author: Ashyam Zubair
Created Date: 14-02-2019
"""
import json
import pandas as pd
import traceback
import sqlalchemy
import os

from MySQLdb._exceptions import OperationalError
from sqlalchemy import create_engine, exc
from time import time

# try:
#     from app.ace_logger import Logging
# except:
#     from ace_logger import Logging
     
# logging = Logging()

import logging
class DB(object):
    def __init__(self, database, host='127.0.0.1', user='root', password='', port='3306', tenant_id=None):
        """
        Initialization of databse object.

        Args:
            databse (str): The database to connect to.
            host (str): Host IP address. For dockerized app, it is the name of
                the service set in the compose file.
            user (str): Username of MySQL server. (default = 'root')
            password (str): Password of MySQL server. For dockerized app, the
                password is set in the compose file. (default = '')
            port (str): Port number for MySQL. For dockerized app, the port that
                is mapped in the compose file. (default = '3306')
        """

        if host in ["common_db","extraction_db", "queue_db", "template_db", "table_db", "stats_db", "business_rules_db", "reports_db"]:
            self.HOST = os.environ['HOST_IP']
            self.USER = 'root'
            self.PASSWORD = os.environ['LOCAL_DB_PASSWORD']
            self.PORT = '3306'
            self.DATABASE = f'{tenant_id}_{database}' if tenant_id is not None and tenant_id else database
        else:
            self.HOST = host
            self.USER = user
            self.PASSWORD = password
            self.PORT = port
            self.DATABASE = f'{tenant_id}_{database}' if tenant_id is not None and tenant_id else database
        
        logging.info(f'Host: {self.HOST}')
        logging.info(f'User: {self.USER}')
        logging.info(f'Password: {self.PASSWORD}')
        logging.info(f'Port: {self.PORT}')
        logging.info(f'Database: {self.DATABASE}')

        self.connect()

    def connect(self, max_retry=5):
        retry = 1

        try:
            start = time()
            logging.debug(f'Making connection to `{self.DATABASE}`...')
            config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{self.DATABASE}?charset=utf8'
            self.db_ = create_engine(config, connect_args={'connect_timeout': 2}, pool_recycle=300)
            logging.info(f'Engine created for `{self.DATABASE}`')
            while retry <= max_retry:
                try:
                    self.engine = self.db_.connect()
                    logging.info(f'Connection established succesfully to `{self.DATABASE}`! ({round(time() - start, 2)} secs to connect)')
                    break
                except Exception as e:
                    logging.warning(f'Connection failed. Retrying... ({retry}) [{e}]')
                    retry += 1
                    self.db_.dispose()
        except:
            logging.exception(f'Something went wrong while connecting. Check trace.')
            return

    def execute(self, query, database=None, **kwargs):
        """
        Executes an SQL query.

        Args:
            query (str): The query that needs to be executed.
            database (str): Name of the database to execute the query in. Leave
                it none if you want use database during object creation.
            params (list/tuple/dict): List of parameters to pass to in the query.

        Returns:
            (DataFrame) A pandas dataframe containing the data from the executed
            query. (None if an error occurs)
        """
        data = None

        # Use new database if a new databse is given
        if database is not None:
            try:
                config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
                engine = create_engine(config, pool_recycle=300)
            except:
                logging.exception(f'Something went wrong while connecting. Check trace.')
                return False
        else:
            engine = self.engine

        try:
            logging.debug(f'Query: {query}')
            data = pd.read_sql(query, engine, index_col='id', **kwargs)
        except exc.ResourceClosedError:
            logging.warning('Query does not have any value to return.')
            return True
        except (exc.StatementError, OperationalError) as e:
            logging.warning(f'Creating new connection. Engine/Connection is probably None. [{e}]')
            self.connect()
            data = pd.read_sql(query, self.engine, index_col='id', **kwargs)
        except:
            logging.exception('Something went wrong executing query. Check trace.')
            params = kwargs['params'] if 'params' in kwargs else None
            return False

        return data.where((pd.notnull(data)), None)

    def execute_(self, query, database=None, **kwargs):
        """
        Executes an SQL query.

        Args:
            query (str): The query that needs to be executed.
            database (str): Name of the database to execute the query in. Leave
                it none if you want use database during object creation.
            params (list/tuple/dict): List of parameters to pass to in the query.

        Returns:
            (DataFrame) A pandas dataframe containing the data from the executed
            query. (None if an error occurs)
        """
        data = None

        # Use new database if a new databse is given
        if database is not None:
            try:
                config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
                engine = create_engine(config, pool_recycle=300)
            except:
                logging.exception(f'Something went wrong while connecting. Check trace.')
                return False
        else:
            engine = self.engine

        try:
            data = pd.read_sql(query, engine, **kwargs)
        except exc.ResourceClosedError:
            return True
        except:
            logging.exception(f'Something went wrong while connecting. Check trace.')
            params = kwargs['params'] if 'params' in kwargs else None
            return False

        return data.replace({pd.np.nan: None})


    def insert(self, data, table, database=None, **kwargs):
        """
        Write records stored in a DataFrame to a SQL database.

        Args:
            data (DataFrame): The DataFrame that needs to be write to SQL database.
            table (str): The table in which the rcords should be written to.
            database (str): The database the table lies in. Leave it none if you
                want use database during object creation.
            kwargs: Keyword arguments for pandas to_sql function.
                See https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html
                to know the arguments that can be passed.

        Returns:
            (bool) True is succesfully inserted, else false.
        """
        logging.info(f'Inserting into `{table}`')

        # Use new database if a new databse is given
        if database is not None:
            try:
                config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
                engine = create_engine(config, pool_recycle=300)
            except:
                logging.exception(f'Something went wrong while connecting. Check trace.')
                return False
        else:
            engine = self.engine

        try:
            data.to_sql(table, engine, **kwargs)
            try:
                self.execute(f'ALTER TABLE `{table}` ADD PRIMARY KEY (`id`);')
            except:
                pass
            return True
        except:
            logging.exception('Something went wrong inserting. Check trace.')
            return False

    def insert_dict(self, data, table):
        """
        Insert dictionary into a SQL database table.

        Args:
            data (DataFrame): The DataFrame that needs to be write to SQL database.
            table (str): The table in which the rcords should be written to.

        Returns:
            (bool) True is succesfully inserted, else false.
        """
        logging.info(f'Inserting dictionary data into `{table}`...')
        logging.debug(f'Data:\n{data}')

        try:
            column_names = []
            params = []

            for column_name, value in data.items():
                column_names.append(f'`{column_name}`')
                params.append(value)

            logging.debug(f'Column names: {column_names}')
            logging.debug(f'Params: {params}')

            columns_string = ', '.join(column_names)
            param_placeholders = ', '.join(['%s'] * len(column_names))

            query = f'INSERT INTO {table} ({columns_string}) VALUES ({param_placeholders})'

            return self.execute(query, params=params)
        except:
            logging.exception('Error inserting data.')
            return False

    def update(self, table, update=None, where=None, database=None, force_update=False):
        # Use new database if a new databse is given
        if database is not None:
            try:
                config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
                self.engine = create_engine(config, pool_recycle=300)
            except:
                logging.exception(f'Something went wrong while connecting. Check trace.')
                return False

        logging.info(f'Updating table: `{table}`')
        logging.info(f'Update data: `{update}`')
        logging.info(f'Where clause data: `{where}`')
        logging.info(f'Force update flag: `{force_update}`')

        try:
            set_clause = []
            set_value_list = []
            where_clause = []
            where_value_list = []

            if where is not None and where:
                for set_column, set_value in update.items():
                    set_clause.append(f'`{set_column}`=%s')
                    set_value_list.append(set_value)
                set_clause_string = ', '.join(set_clause)
            else:
                logging.error(f'Update dictionary is None/empty. Must have some update clause.')
                return False

            if where is not None and where:
                for where_column, where_value in where.items():
                    where_clause.append(f'{where_column}=%s')
                    where_value_list.append(where_value)
                where_clause_string = ' AND '.join(where_clause)
                query = f'UPDATE `{table}` SET {set_clause_string} WHERE {where_clause_string}'
            else:
                if force_update:
                    query = f'UPDATE `{table}` SET {set_clause_string}'
                else:
                    message = 'Where dictionary is None/empty. If you want to force update every row, pass force_update as True.'
                    logging.error(message)
                    return False

            params = set_value_list + where_value_list
            self.execute(query, params=params)
            return True
        except:
            logging.exception('Something went wrong updating. Check trace.')
            return False

    def get_column_names(self, table, database=None):
        """
        Get all column names from an SQL table.

        Args:
            table (str): Name of the table from which column names should be extracted.
            database (str): Name of the database in which the table lies. Leave
                it none if you want use database during object creation.

        Returns:
            (list) List of headers. (None if an error occurs)
        """
        try:
            logging.info(f'Getting column names of table `{table}`')
            return list(self.execute(f'SELECT * FROM `{table}`', database))
        except:
            logging.exception('Something went wrong getting column names. Check trace.')
            return

    def execute_default_index(self, query, database=None, **kwargs):
        """
        Executes an SQL query.

        Args:
            query (str): The query that needs to be executed.
            database (str): Name of the database to execute the query in. Leave
                it none if you want use database during object creation.
            params (list/tuple/dict): List of parameters to pass to in the query.

        Returns:
            (DataFrame) A pandas dataframe containing the data from the executed
            query. (None if an error occurs)
        """
        data = None

        # Use new database if a new databse is given
        if database is not None:
            try:
                config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
                engine = create_engine(config, pool_recycle=300)
            except:
                logging.exception(f'Something went wrong while connecting. Check trace.')
                return False
        else:
            engine = self.engine

        try:
            data = pd.read_sql(query, engine, **kwargs)
        except exc.ResourceClosedError:
            return True
        except:
            logging.exception(f'Something went wrong while executing query. Check trace.')
            params = kwargs['params'] if 'params' in kwargs else None
            return False

        return data.where((pd.notnull(data)), None)


    def get_all(self, table, database=None, discard=None):
        """
        Get all data from an SQL table.

        Args:
            table (str): Name of the table from which data should be extracted.
            database (str): Name of the database in which the table lies. Leave
                it none if you want use database during object creation.
            discard (list): columns to be excluded while selecting all
        Returns:
            (DataFrame) A pandas dataframe containing the data. (None if an error
            occurs)
        """
        logging.info(f'Getting all data from `{table}`')
        if discard:
            logging.info(f'Discarding columns `{discard}`')
            columns = list(self.execute_default_index(f'SHOW COLUMNS FROM `{table}`',database).Field)
            columns = [col for col in columns if col not in discard]
            columns_str = json.dumps(columns).replace("'",'`').replace('"','`')[1:-1]
            return self.execute(f'SELECT {columns_str} FROM `{table}`', database)

        return self.execute(f'SELECT * FROM `{table}`', database)

    def get_latest(self, data, group_by_col, sort_col):
        """
        Group data by a column containing repeated values and get latest from it by
        taking the latest value based on another column.

        Example:
        Get the latest products
            id     product   date
            220    6647     2014-09-01
            220    6647     2014-10-16
            826    3380     2014-11-11
            826    3380     2015-05-19
            901    4555     2014-09-01
            901    4555     2014-11-01

        The function will return
            id     product   date
            220    6647     2014-10-16
            826    3380     2015-05-19
            901    4555     2014-11-01

        Args:
            data (DataFrame): Pandas DataFrame to query on.
            group_by_col (str): Column containing repeated values.
            sort_col (str): Column to identify the latest record.

        Returns:
            (DataFrame) Contains the latest records. (None if an error occurs)
        """
        try:
            logging.info('Grouping data...')
            logging.info(f'Data: {data}')
            logging.info(f'Group by column: {group_by_col}')
            logging.info(f'Sort column: {sort_col}')
            return data.sort_values(sort_col).groupby(group_by_col).tail(1)
        except KeyError as e:
            logging.errot(f'Column `{e.args[0]}` does not exist.')
            return None
        except:
            logging.exception('Something went wrong while grouping data.')
            return None


In [4]:
import os
os.environ['HOST_IP'] = '35.173.139.208'
os.environ['LOCAL_DB_USER'] = 'root'
os.environ['LOCAL_DB_PASSWORD'] = 'AlgoTeam123'
os.environ['LOCAL_DB_PORT'] = '3306'

tenant_id = 'deloitte.acelive.ai'

db_config = {
    'host': os.environ['HOST_IP'],
    'user': os.environ['LOCAL_DB_USER'],
    'password': os.environ['LOCAL_DB_PASSWORD'],
    'port': os.environ['LOCAL_DB_PORT']
}

In [7]:
%load apply_business_rule.py
try:
    from ace_logger import Logging
    logging = Logging()
except:
    import logging 
    logger=logging.getLogger() 
    logger.setLevel(logging.DEBUG) 



import json
import os
from db_utils import DB 

try:
    from BusinessRules import BusinessRules
except:
    from .BusinessRules import BusinessRules


# one configuration
db_config = {
    'host': os.environ['HOST_IP'],
    'user': os.environ['LOCAL_DB_USER'],
    'password': os.environ['LOCAL_DB_PASSWORD'],
    'port': os.environ['LOCAL_DB_PORT']
}

def to_DT_data(parameters):
    """Amith's processing for parameters"""
    output = []
    try:
        for param_dict in parameters:
            print(param_dict)    
            if param_dict['column'] == 'Add_on_Table':
                output.append({'table': param_dict['table'],'column': param_dict['column'],'value': param_dict['value']})
                # Need to add a function to show this or tell Kamal check if its addon table and parse accordingly
            else:                
                output.append({'table': param_dict['table'],'column': param_dict['column'],'value': param_dict['value']})
    except:
        print("Error in to_DT_data()")
        traceback.print_exc()
        return []
    try:
        output = [dict(t) for t in {tuple(d.items()) for d in output}]
    except:
        print("Error in removing duplicate dictionaries in list")
        traceback.print_exc()
        pass
    return output

def get_data_sources(tenant_id, case_id, column_name, master=False):
    """Helper to get all the required table data for the businesss rules to apply
    """
    get_datasources_query = "SELECT * from `data_sources`"
    db_config['tenant_id'] = tenant_id
    business_rules_db = DB('business_rules', **db_config)
    data_sources = business_rules_db.execute(get_datasources_query)


    # sources
    sources = json.loads(list(data_sources[column_name])[0])
    
    
    data = {}
    for database, tables in sources.items():
        db = DB(database, **db_config)
        for table in tables:
            if master:
                query = f"SELECT * from `{table}`"
                try:
                    df = db.execute(query)
                except:
                    df = db.execute_(query)
                    
                data[table] = df.to_dict(orient='records')
            else:
                query = f"SELECT * from `{table}` WHERE case_id = %s"
                params = [case_id]
                df = db.execute(query, params=params)
                if not df.empty:
                    data[table] = df.to_dict(orient='records')[0]
                else:
                    data[table] = {}
    
    
    case_id_based_sources = json.loads(list(data_sources['case_id_based'])[0])
    
    return data
                
def get_rules(tenant_id, group):
    """Get the rules based on the stage, tenant_id"""
    db_config['tenant_id'] = tenant_id
    business_rules_db = DB('business_rules', **db_config)
    get_rules_query = "SELECT * from `sequence_rule_data` where `group` = %s"
    params = [group]
    rules = business_rules_db.execute(get_rules_query, params=params)
    return rules

def update_tables(case_id, tenant_id, updates):
    """Update the values in the database"""
    db_config['tenant_id'] = tenant_id
    extraction_db = DB('extraction', **db_config) # only in ocr or process_queue we are updating
    queue_db = DB('queues', **db_config) # only in ocr or process_queue we are updating
    
    for table, colum_values in updates.items():
        if table == 'ocr':
            extraction_db.update(table, update=colum_values, where={'case_id':case_id})
        if table == 'process_queue':
            queue_db.update(table, update=colum_values, where={'case_id':case_id})
    return "UPDATED IN THE DATABASE SUCCESSFULLY"

def run_chained_rules(case_id, tenant_id, chain_rules, start_rule_id=None, updated_tables=False, trace_exec=None, rule_params=None):
    """Execute the chained rules"""
    
    # get the mapping of the rules...basically a rule_id maps to a rule
    rule_id_mapping = {}
    for ind, rule in chain_rules.iterrows():
        rule_id_mapping[rule['rule_id']] = [rule['rule_string'], rule['next_if_sucess'], rule['next_if_failure'], rule['stage'], rule['description'], rule['data_source']]
    logging.info(f"\n rule id mapping is \n{rule_id_mapping}\n")
    
    # evaluate the rules one by one as chained
    # start_rule_id = None
    if start_rule_id is None:
        if rule_id_mapping.keys():
            start_rule_id = list(rule_id_mapping.keys())[0]
            trace_exec = []
            rule_params = {}
            
    # if start_rule_id then. coming from other service 
    # get the existing trace and rule params data
    db_config['tenant_id'] = tenant_id
    business_rules_db = DB('business_rules', **db_config)
    rule_data_query = "SELECT * from `rule_data` where `case_id`=%s"
    params = [case_id]
    df = business_rules_db.execute(rule_data_query, params=params)
    try:
        trace_exec = json.loads(list(df['trace_data'])[0])
        logging.info(f"\nexistig trace exec is \n{trace_exec}\n")
    except Exception as e:
        logging.info(f"no existing trace data")
        logging.info(f"{str(e)}")
        trace_exec = []
    
    try:
        rule_params = json.loads(list(df['rule_params'])[0])
        logging.info(f"\nexistig rule_params is \n{rule_params}\n")
    except Exception as e:
        logging.info(f"no existing rule params data")
        logging.info(f"{str(e)}")
        rule_params = {}
      
    # usually master data will not get updated...for every rule
    master_data_tables = get_data_sources(tenant_id, case_id, 'master', master=True)
 
    logging.info(f"\nStart rule id got is {start_rule_id}\n ")
    while start_rule_id != "END":
        # get the rules, next rule id to be evaluated
        rule_to_evaluate, next_if_sucess, next_if_failure, stage, description, data_source = rule_id_mapping[str(start_rule_id)]  
    
        logging.info(f"\nInside the loop \n rule_to_evaluate  {rule_to_evaluate}\n \
                      \nnext_if_sucess {next_if_sucess}\n \
                      \nnext_if_failure {next_if_failure}\n ")
        
        # update the data_table if there is any change
        case_id_data_tables = get_data_sources(tenant_id, case_id, 'case_id_based')
        master_updated_tables = {} 
        if updated_tables:
            master_updated_tables = get_data_sources(tenant_id, case_id, 'updated_tables')
        # consolidate the data into data_tables
        data_tables = {**case_id_data_tables, **master_data_tables, **master_updated_tables} 
        
        # evaluate the rule
        rules = [json.loads(rule_to_evaluate)] 
        BR  = BusinessRules(case_id, rules, data_tables)
        BR.tenant_id = tenant_id
        decision = BR.evaluate_rule(rules[0])
        
        logging.info(f"\n got the decision {decision} for the rule id {start_rule_id}")
        logging.info(f"\n updates got are {BR.changed_fields}")
        
        updates = {}
        # update the updates if any
        if BR.changed_fields:
            updates = BR.changed_fields
            update_tables(case_id, tenant_id, updates)

        
        # update the trace_data
        trace_exec.append(start_rule_id)

        logging.info(f"\n params data used from the rules are \n {BR.params_data}\n")
        # update the rule_params
        trace_dict = {
                        str(start_rule_id):{
                            'description' : description if description else 'No description available in the database',
                            'output' : "",
                            'input' : to_DT_data(BR.params_data['input'])
                            }
                        }
        rule_params.update(trace_dict)
        # update the start_rule_id based on the decision
        if decision:
            start_rule_id = next_if_sucess
        else:
            start_rule_id = next_if_failure
        logging.info(f"\n next rule id to execute is {start_rule_id}\n")
        
    
    # off by one updates...
    trace_exec.append(start_rule_id)
    
    # store the trace_exec and rule_params in the database
    update_rule_params_query = f"INSERT INTO `rule_data`(`id`, `case_id`, `rule_params`) VALUES ('NULL',%s,%s) ON DUPLICATE KEY UPDATE `rule_params`=%s"
    params = [case_id, json.dumps(rule_params,default=str), json.dumps(rule_params,default=str)]
    business_rules_db.execute(update_rule_params_query, params=params)
    
    update_trace_exec_query = f"INSERT INTO `rule_data` (`id`, `case_id`, `trace_data`) VALUES ('NULL',%s,%s) ON DUPLICATE KEY UPDATE `trace_data`=%s"
    params = [case_id, json.dumps(trace_exec), json.dumps(trace_exec)]
    business_rules_db.execute(update_trace_exec_query, params=params)
    
    logging.info("\n Applied chained rules successfully")
    return 'Applied chained rules successfully'

def run_group_rules(case_id,tenant_id, rules, data):
    """Run the rules"""
    rules = [json.loads(rule) for rule in rules] 
    BR  = BusinessRules(case_id, rules, data)
    BR.tenant_id = tenant_id
    updates = BR.evaluate_business_rules()
    
    logging.info(f"\n updates from the group rules are \n{updates}\n")
    return updates

def apply_business_rule(case_id, function_params, tenant_id, start_rule_id=None):
    """Run the business rules based on the stage in function params and tenant_id
    Args:
        case_id: Unique id that we pass
        function_params: Parameters that we get from the configurations
        tenant_id: Tenant on which we have to apply the rules
    Returns:
    """
    try:
        stage = function_params['stage'][0]
        logging.debug(f'Running business rules after {stage}')
    except:
        pass
    updates = {} # keep a track of updates that are being made by business rules
    try:
        # get the stage from the function_parameters...As of now its first ele..
        # need to make generic or key-value pairs
        logging.info(f"\n case_id {case_id} \nfunction_params {function_params} \ntenant_id {tenant_id}\n")
              
        # get the rules
        rules = get_rules(tenant_id, stage)
        
        # get the mapping of the rules...basically a rule_id maps to a rule.
        # useful for the chain rule evaluations
        rule_id_mapping = {}
        for ind, rule in rules.iterrows():
            rule_id_mapping[rule['rule_id']] = [rule['rule_string'], rule['next_if_sucess'], rule['next_if_failure'], rule['stage'], rule['description'], rule['data_source']]

        # making it generic takes to take a type parameter from the database..
        # As of now make it (all others  or chained) only
        is_chain_rule = '' not in rule_id_mapping
        
        # get the required table data on which we will be applying business_rules  
        case_id_data_tables = get_data_sources(tenant_id, case_id, 'case_id_based') 
        master_data_tables = get_data_sources(tenant_id, case_id, 'master', master=True)
        
        # consolidate the data into data_tables
        data_tables = {**case_id_data_tables, **master_data_tables} 
        
        logging.info(f"\ndata got from the tables is\n")
        logging.info(data_tables)
        
        updates = {}
        # apply business rules
        if is_chain_rule:
            run_chained_rules(case_id, tenant_id, rules, start_rule_id)
        else:
            updates = run_group_rules(case_id, tenant_id, list(rules['rule_string']), data_tables)
            
        # update in the database, the changed fields eventually when all the stage rules were got
        update_tables(case_id, tenant_id, updates)
        
        #  return the updates for viewing
        return {'flag': True, 'message': 'Applied business rules successfully.', 'updates':updates}
    except Exception as e:
        logging.exception('Something went wrong while applying business rules. Check trace.')
        return {'flag': False, 'message': 'Something went wrong while applying business rules. Check logs.', 'error':str(e)}


In [8]:
case_id = 'DEL7B6DCE0'
function_params = {'stage':['extract']}
tenant_id = 'deloitte.acelive.ai'
r = apply_business_rule(case_id, function_params, tenant_id)

DEBUG:root:Running business rules after extract
INFO:root:
 case_id DEL7B6DCE0 
function_params {'stage': ['extract']} 
tenant_id deloitte.acelive.ai

INFO:root:Host: 35.173.139.208
INFO:root:User: root
INFO:root:Password: AlgoTeam123
INFO:root:Port: 3306
INFO:root:Database: deloitte.acelive.ai_business_rules
DEBUG:root:Making connection to `deloitte.acelive.ai_business_rules`...
INFO:root:Engine created for `deloitte.acelive.ai_business_rules`
INFO:root:Connection established succesfully to `deloitte.acelive.ai_business_rules`! (4.3 secs to connect)
DEBUG:root:Query: SELECT * from `sequence_rule_data` where `group` = %s
INFO:root:Host: 35.173.139.208
INFO:root:User: root
INFO:root:Password: AlgoTeam123
INFO:root:Port: 3306
INFO:root:Database: deloitte.acelive.ai_business_rules
DEBUG:root:Making connection to `deloitte.acelive.ai_business_rules`...
INFO:root:Engine created for `deloitte.acelive.ai_business_rules`
INFO:root:Connection established succesfully to `deloitte.acelive.ai_busi

INFO:root:Host: 35.173.139.208
INFO:root:User: root
INFO:root:Password: AlgoTeam123
INFO:root:Port: 3306
INFO:root:Database: deloitte.acelive.ai_business_rules
DEBUG:root:Making connection to `deloitte.acelive.ai_business_rules`...
INFO:root:Engine created for `deloitte.acelive.ai_business_rules`
INFO:root:Connection established succesfully to `deloitte.acelive.ai_business_rules`! (3.74 secs to connect)
DEBUG:root:Query: SELECT * from `data_sources`
INFO:root:Host: 35.173.139.208
INFO:root:User: root
INFO:root:Password: AlgoTeam123
INFO:root:Port: 3306
INFO:root:Database: deloitte.acelive.ai_extraction
DEBUG:root:Making connection to `deloitte.acelive.ai_extraction`...
INFO:root:Engine created for `deloitte.acelive.ai_extraction`
INFO:root:Connection established succesfully to `deloitte.acelive.ai_extraction`! (3.97 secs to connect)
DEBUG:root:Query: SELECT * from `vendor_master`
DEBUG:root:Query: SELECT * from `client_master`
DEBUG:root:Query: SELECT * from `gst_compliance`
DEBUG:root

True


INFO:root:Connection established succesfully to `deloitte.acelive.ai_extraction`! (3.96 secs to connect)
INFO:root:Host: 35.173.139.208
INFO:root:User: root
INFO:root:Password: AlgoTeam123
INFO:root:Port: 3306
INFO:root:Database: deloitte.acelive.ai_queues
DEBUG:root:Making connection to `deloitte.acelive.ai_queues`...
INFO:root:Engine created for `deloitte.acelive.ai_queues`
INFO:root:Connection established succesfully to `deloitte.acelive.ai_queues`! (3.86 secs to connect)
INFO:root:Updating table: `process_queue`
INFO:root:Update data: `{'queue': 'validation'}`
INFO:root:Where clause data: `{'case_id': 'DEL7B6DCE0'}`
INFO:root:Force update flag: `False`
DEBUG:root:Query: UPDATE `process_queue` SET `queue`=%s WHERE case_id=%s
INFO:root:
 params data used from the rules are 
 {'input': [{'type': 'from_table', 'table': 'ocr', 'column': 'total_value', 'value': '1,260,594.00'}]}

INFO:root:
 next rule id to execute is END

DEBUG:root:Query: INSERT INTO `rule_data`(`id`, `case_id`, `rule_

{'type': 'from_table', 'table': 'ocr', 'column': 'total_value', 'value': '1,260,594.00'}


DEBUG:root:Query: INSERT INTO `rule_data` (`id`, `case_id`, `trace_data`) VALUES ('NULL',%s,%s) ON DUPLICATE KEY UPDATE `trace_data`=%s
INFO:root:
 Applied chained rules successfully
INFO:root:Host: 35.173.139.208
INFO:root:User: root
INFO:root:Password: AlgoTeam123
INFO:root:Port: 3306
INFO:root:Database: deloitte.acelive.ai_extraction
DEBUG:root:Making connection to `deloitte.acelive.ai_extraction`...
INFO:root:Engine created for `deloitte.acelive.ai_extraction`
INFO:root:Connection established succesfully to `deloitte.acelive.ai_extraction`! (9.58 secs to connect)
INFO:root:Host: 35.173.139.208
INFO:root:User: root
INFO:root:Password: AlgoTeam123
INFO:root:Port: 3306
INFO:root:Database: deloitte.acelive.ai_queues
DEBUG:root:Making connection to `deloitte.acelive.ai_queues`...
INFO:root:Engine created for `deloitte.acelive.ai_queues`
INFO:root:Connection established succesfully to `deloitte.acelive.ai_queues`! (3.78 secs to connect)


In [9]:
r

{'flag': True,
 'message': 'Applied business rules successfully.',
 'updates': {}}

In [135]:
import json

mount_replace = {  'rule_type': 'static',
                        'function': 'Replace',
                        'parameters':{
                        'data':{'source':'input_config','table':'ocr', 'column':'total_value'},
                        'to_replace':{'source':'input','value':','},
                        'replace_with':{'source':'input','value':''}
    }
}

x = json.dumps(mount_replace)
print(x)

{"rule_type": "static", "function": "Replace", "parameters": {"data": {"source": "input_config", "table": "ocr", "column": "total_value"}, "to_replace": {"source": "input", "value": ","}, "replace_with": {"source": "input", "value": ""}}}


In [183]:
a = "1,23,"
b = "1,00,000"
operator = ">="

In [184]:
a = a.replace(',','')
b = b.replace(',','')
if operator == ">=":
    print(float(a) >= float(b))

False


In [None]:
@register_method
def AmountCompare(self,parameters):
    left_param, operator, right_param = parameters['left_param'], parameters['operator'], parameters['right_param'] 
    left_param_value, right_param_value = self.get_param_value(left_param), self.get_param_value(right_param)
    logging.debug(f"left param value is {left_param_value} and type is {type(left_param_value)}")
    logging.debug(f"right param value is {right_param_value} and type is {type(right_param_value)}")
    logging.debug(f"operator is {operator}")
    try:
        left_param_value = left_param_value.replace(',','')
        right_param_value = right_param_value.replace(',','')
        if operator == ">=":
            print(float(left_param_value) >= float(right_param_value))
            return (float(left_param_value) >= float(right_param_value))
    except Exception as e:
        logging.debug(f"error in compare key value {left_param_value} {operator} {right_param_value}")
        logging.debug(str(e))
        return False
    