In [6]:
"""
Author: Ashyam Zubair
Created Date: 14-02-2019
"""
import json
import pandas as pd
import traceback
import sqlalchemy
import os

from MySQLdb._exceptions import OperationalError
from sqlalchemy import create_engine, exc
from time import time

# try:
#     from app.ace_logger import Logging
# except:
#     from ace_logger import Logging
     
# logging = Logging()

import logging
class DB(object):
    def __init__(self, database, host='127.0.0.1', user='root', password='', port='3306', tenant_id=None):
        """
        Initialization of databse object.

        Args:
            databse (str): The database to connect to.
            host (str): Host IP address. For dockerized app, it is the name of
                the service set in the compose file.
            user (str): Username of MySQL server. (default = 'root')
            password (str): Password of MySQL server. For dockerized app, the
                password is set in the compose file. (default = '')
            port (str): Port number for MySQL. For dockerized app, the port that
                is mapped in the compose file. (default = '3306')
        """

        if host in ["common_db","extraction_db", "queue_db", "template_db", "table_db", "stats_db", "business_rules_db", "reports_db"]:
            self.HOST = os.environ['HOST_IP']
            self.USER = 'root'
            self.PASSWORD = os.environ['LOCAL_DB_PASSWORD']
            self.PORT = '3306'
            self.DATABASE = f'{tenant_id}_{database}' if tenant_id is not None and tenant_id else database
        else:
            self.HOST = host
            self.USER = user
            self.PASSWORD = password
            self.PORT = port
            self.DATABASE = f'{tenant_id}_{database}' if tenant_id is not None and tenant_id else database
        
        logging.info(f'Host: {self.HOST}')
        logging.info(f'User: {self.USER}')
        logging.info(f'Password: {self.PASSWORD}')
        logging.info(f'Port: {self.PORT}')
        logging.info(f'Database: {self.DATABASE}')

        self.connect()

    def connect(self, max_retry=5):
        retry = 1

        try:
            start = time()
            logging.debug(f'Making connection to `{self.DATABASE}`...')
            config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{self.DATABASE}?charset=utf8'
            self.db_ = create_engine(config, connect_args={'connect_timeout': 2}, pool_recycle=300)
            logging.info(f'Engine created for `{self.DATABASE}`')
            while retry <= max_retry:
                try:
                    self.engine = self.db_.connect()
                    logging.info(f'Connection established succesfully to `{self.DATABASE}`! ({round(time() - start, 2)} secs to connect)')
                    break
                except Exception as e:
                    logging.warning(f'Connection failed. Retrying... ({retry}) [{e}]')
                    retry += 1
                    self.db_.dispose()
        except:
            logging.exception(f'Something went wrong while connecting. Check trace.')
            return

    def execute(self, query, database=None, **kwargs):
        """
        Executes an SQL query.

        Args:
            query (str): The query that needs to be executed.
            database (str): Name of the database to execute the query in. Leave
                it none if you want use database during object creation.
            params (list/tuple/dict): List of parameters to pass to in the query.

        Returns:
            (DataFrame) A pandas dataframe containing the data from the executed
            query. (None if an error occurs)
        """
        data = None

        # Use new database if a new databse is given
        if database is not None:
            try:
                config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
                engine = create_engine(config, pool_recycle=300)
            except:
                logging.exception(f'Something went wrong while connecting. Check trace.')
                return False
        else:
            engine = self.engine

        try:
            logging.debug(f'Query: {query}')
            data = pd.read_sql(query, engine, index_col='id', **kwargs)
        except exc.ResourceClosedError:
            logging.warning('Query does not have any value to return.')
            return True
        except (exc.StatementError, OperationalError) as e:
            logging.warning(f'Creating new connection. Engine/Connection is probably None. [{e}]')
            self.connect()
            data = pd.read_sql(query, self.engine, index_col='id', **kwargs)
        except:
            logging.exception('Something went wrong executing query. Check trace.')
            params = kwargs['params'] if 'params' in kwargs else None
            return False

        return data.where((pd.notnull(data)), None)

    def execute_(self, query, database=None, **kwargs):
        """
        Executes an SQL query.

        Args:
            query (str): The query that needs to be executed.
            database (str): Name of the database to execute the query in. Leave
                it none if you want use database during object creation.
            params (list/tuple/dict): List of parameters to pass to in the query.

        Returns:
            (DataFrame) A pandas dataframe containing the data from the executed
            query. (None if an error occurs)
        """
        data = None

        # Use new database if a new databse is given
        if database is not None:
            try:
                config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
                engine = create_engine(config, pool_recycle=300)
            except:
                logging.exception(f'Something went wrong while connecting. Check trace.')
                return False
        else:
            engine = self.engine

        try:
            data = pd.read_sql(query, engine, **kwargs)
        except exc.ResourceClosedError:
            return True
        except:
            logging.exception(f'Something went wrong while connecting. Check trace.')
            params = kwargs['params'] if 'params' in kwargs else None
            return False

        return data.replace({pd.np.nan: None})


    def insert(self, data, table, database=None, **kwargs):
        """
        Write records stored in a DataFrame to a SQL database.

        Args:
            data (DataFrame): The DataFrame that needs to be write to SQL database.
            table (str): The table in which the rcords should be written to.
            database (str): The database the table lies in. Leave it none if you
                want use database during object creation.
            kwargs: Keyword arguments for pandas to_sql function.
                See https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html
                to know the arguments that can be passed.

        Returns:
            (bool) True is succesfully inserted, else false.
        """
        logging.info(f'Inserting into `{table}`')

        # Use new database if a new databse is given
        if database is not None:
            try:
                config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
                engine = create_engine(config, pool_recycle=300)
            except:
                logging.exception(f'Something went wrong while connecting. Check trace.')
                return False
        else:
            engine = self.engine

        try:
            data.to_sql(table, engine, **kwargs)
            try:
                self.execute(f'ALTER TABLE `{table}` ADD PRIMARY KEY (`id`);')
            except:
                pass
            return True
        except:
            logging.exception('Something went wrong inserting. Check trace.')
            return False

    def insert_dict(self, data, table):
        """
        Insert dictionary into a SQL database table.

        Args:
            data (DataFrame): The DataFrame that needs to be write to SQL database.
            table (str): The table in which the rcords should be written to.

        Returns:
            (bool) True is succesfully inserted, else false.
        """
        logging.info(f'Inserting dictionary data into `{table}`...')
        logging.debug(f'Data:\n{data}')

        try:
            column_names = []
            params = []

            for column_name, value in data.items():
                column_names.append(f'`{column_name}`')
                params.append(value)

            logging.debug(f'Column names: {column_names}')
            logging.debug(f'Params: {params}')

            columns_string = ', '.join(column_names)
            param_placeholders = ', '.join(['%s'] * len(column_names))

            query = f'INSERT INTO {table} ({columns_string}) VALUES ({param_placeholders})'

            return self.execute(query, params=params)
        except:
            logging.exception('Error inserting data.')
            return False

    def update(self, table, update=None, where=None, database=None, force_update=False):
        # Use new database if a new databse is given
        if database is not None:
            try:
                config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
                self.engine = create_engine(config, pool_recycle=300)
            except:
                logging.exception(f'Something went wrong while connecting. Check trace.')
                return False

        logging.info(f'Updating table: `{table}`')
        logging.info(f'Update data: `{update}`')
        logging.info(f'Where clause data: `{where}`')
        logging.info(f'Force update flag: `{force_update}`')

        try:
            set_clause = []
            set_value_list = []
            where_clause = []
            where_value_list = []

            if where is not None and where:
                for set_column, set_value in update.items():
                    set_clause.append(f'`{set_column}`=%s')
                    set_value_list.append(set_value)
                set_clause_string = ', '.join(set_clause)
            else:
                logging.error(f'Update dictionary is None/empty. Must have some update clause.')
                return False

            if where is not None and where:
                for where_column, where_value in where.items():
                    where_clause.append(f'{where_column}=%s')
                    where_value_list.append(where_value)
                where_clause_string = ' AND '.join(where_clause)
                query = f'UPDATE `{table}` SET {set_clause_string} WHERE {where_clause_string}'
            else:
                if force_update:
                    query = f'UPDATE `{table}` SET {set_clause_string}'
                else:
                    message = 'Where dictionary is None/empty. If you want to force update every row, pass force_update as True.'
                    logging.error(message)
                    return False

            params = set_value_list + where_value_list
            self.execute(query, params=params)
            return True
        except:
            logging.exception('Something went wrong updating. Check trace.')
            return False

    def get_column_names(self, table, database=None):
        """
        Get all column names from an SQL table.

        Args:
            table (str): Name of the table from which column names should be extracted.
            database (str): Name of the database in which the table lies. Leave
                it none if you want use database during object creation.

        Returns:
            (list) List of headers. (None if an error occurs)
        """
        try:
            logging.info(f'Getting column names of table `{table}`')
            return list(self.execute(f'SELECT * FROM `{table}`', database))
        except:
            logging.exception('Something went wrong getting column names. Check trace.')
            return

    def execute_default_index(self, query, database=None, **kwargs):
        """
        Executes an SQL query.

        Args:
            query (str): The query that needs to be executed.
            database (str): Name of the database to execute the query in. Leave
                it none if you want use database during object creation.
            params (list/tuple/dict): List of parameters to pass to in the query.

        Returns:
            (DataFrame) A pandas dataframe containing the data from the executed
            query. (None if an error occurs)
        """
        data = None

        # Use new database if a new databse is given
        if database is not None:
            try:
                config = f'mysql://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{database}?charset=utf8'
                engine = create_engine(config, pool_recycle=300)
            except:
                logging.exception(f'Something went wrong while connecting. Check trace.')
                return False
        else:
            engine = self.engine

        try:
            data = pd.read_sql(query, engine, **kwargs)
        except exc.ResourceClosedError:
            return True
        except:
            logging.exception(f'Something went wrong while executing query. Check trace.')
            params = kwargs['params'] if 'params' in kwargs else None
            return False

        return data.where((pd.notnull(data)), None)


    def get_all(self, table, database=None, discard=None):
        """
        Get all data from an SQL table.

        Args:
            table (str): Name of the table from which data should be extracted.
            database (str): Name of the database in which the table lies. Leave
                it none if you want use database during object creation.
            discard (list): columns to be excluded while selecting all
        Returns:
            (DataFrame) A pandas dataframe containing the data. (None if an error
            occurs)
        """
        logging.info(f'Getting all data from `{table}`')
        if discard:
            logging.info(f'Discarding columns `{discard}`')
            columns = list(self.execute_default_index(f'SHOW COLUMNS FROM `{table}`',database).Field)
            columns = [col for col in columns if col not in discard]
            columns_str = json.dumps(columns).replace("'",'`').replace('"','`')[1:-1]
            return self.execute(f'SELECT {columns_str} FROM `{table}`', database)

        return self.execute(f'SELECT * FROM `{table}`', database)

    def get_latest(self, data, group_by_col, sort_col):
        """
        Group data by a column containing repeated values and get latest from it by
        taking the latest value based on another column.

        Example:
        Get the latest products
            id     product   date
            220    6647     2014-09-01
            220    6647     2014-10-16
            826    3380     2014-11-11
            826    3380     2015-05-19
            901    4555     2014-09-01
            901    4555     2014-11-01

        The function will return
            id     product   date
            220    6647     2014-10-16
            826    3380     2015-05-19
            901    4555     2014-11-01

        Args:
            data (DataFrame): Pandas DataFrame to query on.
            group_by_col (str): Column containing repeated values.
            sort_col (str): Column to identify the latest record.

        Returns:
            (DataFrame) Contains the latest records. (None if an error occurs)
        """
        try:
            logging.info('Grouping data...')
            logging.info(f'Data: {data}')
            logging.info(f'Group by column: {group_by_col}')
            logging.info(f'Sort column: {sort_col}')
            return data.sort_values(sort_col).groupby(group_by_col).tail(1)
        except KeyError as e:
            logging.errot(f'Column `{e.args[0]}` does not exist.')
            return None
        except:
            logging.exception('Something went wrong while grouping data.')
            return None


In [5]:
import os
os.environ['HOST_IP'] = '18.207.230.116'
os.environ['LOCAL_DB_USER'] = 'root'
os.environ['LOCAL_DB_PASSWORD'] = 'AlgoTeam123'
os.environ['LOCAL_DB_PORT'] = '3306'

tenant_id = 'test'

db_config = {
    'host': os.environ['HOST_IP'],
    'user': os.environ['LOCAL_DB_USER'],
    'password': os.environ['LOCAL_DB_PASSWORD'],
    'port': os.environ['LOCAL_DB_PORT']
}

In [8]:
# %load apply_business_rule.py
# comment below two for local testing
# from ace_logger import Logging
# logging = Logging()

# uncomment these below lines for local testing
import logging 
logger=logging.getLogger() 
logger.setLevel(logging.DEBUG) 



import json
import os
# from db_utils import DB 

from BusinessRules import BusinessRules

# one configuration
db_config = {
    'host': os.environ['HOST_IP'],
    'user': os.environ['LOCAL_DB_USER'],
    'password': os.environ['LOCAL_DB_PASSWORD'],
    'port': os.environ['LOCAL_DB_PORT']
}

def get_data_sources(tenant_id, case_id, master=False):
    """Helper to get all the required table data for the businesss rules to apply
    """
    get_datasources_query = "SELECT * from `data_sources`"
    business_rules_db = DB('business_rules', tenant_id=tenant_id, **db_config)
    data_sources = business_rules_db.execute(get_datasources_query)

    # case_id based
    case_id_based_sources = json.loads(list(data_sources['case_id_based'])[0])
    
    data = {}
    for database, tables in case_id_based_sources.items():
        db = DB(database, tenant_id=tenant_id, **db_config)
        for table in tables:
            if master:
                query = f"SELECT * from `{table}`"
                df = db.execute(query)
            else:
                query = f"SELECT * from `{table}` WHERE case_id = %s"
                params = [case_id]
                df = db.execute(query, params=params)
            if not df.empty:
                data[table] = df.to_dict(orient='records')[0]
            else:
                data[table] = {}
    
    
    case_id_based_sources = json.loads(list(data_sources['case_id_based'])[0])
    
    return data
                
def get_rules(tenant_id, group):
    """Get the rules based on the stage, tenant_id"""
    business_rules_db = DB('business_rules', tenant_id=tenant_id, **db_config)
    get_rules_query = "SELECT * from `sequence_rule_data` where `group` = %s"
    params = [group]
    rules = business_rules_db.execute(get_rules_query, params=params)
    return rules

def update_tables(case_id, tenant_id, updates):
    """Update the values in the database"""
    extraction_db = DB('extraction', tenant_id=tenant_id, **db_config) # only in ocr or process_queue we are updating
    queue_db = DB('queues', tenant_id=tenant_id, **db_config) # only in ocr or process_queue we are updating
    
    for table, colum_values in updates.items():
        if table == 'ocr':
            extraction_db.update(table, update=colum_values, where={'case_id':case_id})
        if table == 'process_queue':
            queue_db.update(table, update=colum_values, where={'case_id':case_id})
    return "UPDATED IN THE DATABASE SUCCESSFULLY"

def run_group_rules(case_id, rules, data):
    """Run the rules"""
    rules = [json.loads(rule) for rule in list(rules['rule_string'])] 
    BR  = BusinessRules(case_id, rules, data)
    updates = BR.evaluate_business_rules()
    
    logging.info(f"\n updates from the group rules are \n{updates}\n")
    return updates

def apply_business_rule(case_id, function_params, tenant_id):
    """Run the business rules based on the stage in function params and tenant_id
    Args:
        case_id: Unique id that we pass
        function_params: Parameters that we get from the configurations
        tenant_id: Tenant on which we have to apply the rules
    Returns:

    """
    updates = {} # keep a track of updates that are being made by business rules
    try:
        # get the stage from the function_parameters...As of now its first ele..
        # need to make generic or key-value pairs
        logging.info(f"\n case_id {case_id} \nfunction_params {function_params} \ntenant_id {tenant_id}\n")
        stage = function_params['stage'][0]
        
        
        # get the rules
        rules = get_rules(tenant_id, stage)
        
        # get the mapping of the rules...basically a rule_id maps to a rule.
        # useful for the chain rule evaluations
        rule_id_mapping = {}
        for ind, rule in rules.iterrows():
            rule_id_mapping[rule['rule_id']] = [rule['rule_string'], rule['next_if_sucess'], rule['next_if_failure'], rule['stage'], rule['description'], rule['data_source']]

        # making it generic takes to take a type parameter from the database..
        # As of now make it (all others  or chained) only
        is_chain_rule = '' not in rule_id_mapping
        
        # get the required table data on which we will be applying business_rules  
        data_tables = get_data_sources(tenant_id, case_id) 
        
        logging.info(f"\ndata got from the tables is\n")
        logging.info(data_tables)
        # apply business rules
        if is_chain_rule:
            # updates = run_chained_rules()
            pass
        else:
            updates = run_group_rules(case_id, rules, data_tables)
            
        
        # update in the database, the changed fields eventually when all the stage rules were got
        update_tables(case_id, tenant_id, updates)
        
        #  return the updates for viewing
        return {'flag': True, 'message': 'Applied business rules successfully.', 'updates':updates}
    except Exception as e:
        logging.exception('Something went wrong while applying business rules. Check trace.')
        return {'flag': False, 'message': 'Something went wrong while applying business rules. Check logs.', 'error':str(e)}


ModuleNotFoundError: No module named 'ace_logger'

In [9]:
a = {1:2}

In [10]:
b = {3:4}


In [16]:
c = {}


In [18]:
c.update(a)
c.update(b)
c

{3: 4, 1: 2}