In [4]:
from pprint import pprint
from pymongo import MongoClient
import time
from datetime import date, timedelta
import os
import datetime
from dateutil import tz
import pendulum
import config

import function as func

from schema.fact_document import FactDocumentModel
from schema.fact_performance import FactPerformanceModel
from schema.fact_data_extraction import FactDataExtractionModel
from db_connect import EngineConnect as DatabaseConnect

In [6]:
class LoadingData:
    def get_docs_and_trans(self):
        if self.environment == 'development':
            obj_docs = pickle.load(open('./backup/' + self.project_backup_dir + self.project_docs_dir + self.project_id + '.pickle', 'rb'))
            obj_trans = pickle.load(open('./backup/' + self.project_backup_dir + self.project_trans_dir + self.project_id + '.pickle', 'rb'))
        else: 
            obj_docs = pickle.load(open(self.backup_dir + self.project_backup_dir + self.project_docs_dir + str(self.start.strftime("%Y-%m-%d")) + self.backup_file_type, 'rb'))
            obj_trans = pickle.load(open(self.backup_dir + self.project_backup_dir + self.project_trans_dir + str(self.start.strftime("%Y-%m-%d")) + self.backup_file_type, 'rb'))
        data_docs = [item for item in obj_docs]
        data_trans = [item for item in obj_trans]
        return data_docs, data_trans
    
    def get_performance(self):
        if self.environment == 'development':
            obj_performance = pickle.load(open('./backup/' +self.project_backup_dir + self.project_performance_dir + self.project_id + '.pickle', 'rb'))
        else:
            obj_performance = pickle.load(open(self.backup_dir + self.project_backup_dir + self.project_performance_dir + str(self.start.strftime("%Y-%m-%d")) + self.backup_file_type, 'rb'))
        data_performance = [item for item in obj_performance]
        return data_performance
    
    def get_field(self):
        if self.environment == 'development':
            obj_load = pickle.load(open('./backup/' +self.project_backup_dir + self.project_field_dir + self.project_id + '.pickle', 'rb'))
        else:
            obj_load = pickle.load(open(self.backup_dir + self.project_backup_dir + self.project_field_dir + str(self.start.strftime("%Y-%m-%d")) + self.backup_file_type, 'rb'))
        obj_list = [item for item in obj_load]
        return obj_list

In [7]:
class GdaExecutor(LoadingData, BackupData):
    def __init__(
        self,
        *kwargs,
        environment: str,
        uri: str,
        database_name: str,
        docs_collection_name: str, 
        trans_collection_name: str,
        performance_collection_name: str,
        db: DatabaseConnect
    ):
        self.environment = environment
        self.uri = uri
        self.database_name = database_name
        self.docs_collection_name = docs_collection_name
        self.trans_collection_name = trans_collection_name
        self.performance_collection_name = performance_collection_name
        self.db = db
        self.maxSevSelDelay = 20000
        self.start = config.start
        self.query = config.GDA_QUERY
        self.performance_query = config.GDA_PERFORMANCE_QUERY
        self.project_id = config.GDA_PROJECT_ID
        self.project_name = config.GDA_PROJECT_NAME
        self.backup_dir = config.BACKUP_DIR
        self.project_backup_dir = config.GDA_BACKUP_DIR
        self.project_docs_dir = config.GDA_DOCS_DIR
        self.project_trans_dir = config.GDA_TRANS_DIR
        self.project_field_dir = config.GDA_FIELD_DIR
        self.project_performance_dir = config.GDA_PERFORMANCE_DIR
        self.backup_file_type = config.BACKUP_FILE_TYPE
        self.schema = config.DWH_ANALYTIC_SCHEMA
        self.fact_document_table = config.DWH_FACT_DOCUMENT_TABLE
        self.fact_performance_table = config.DWH_FACT_PERFORMANCE_TABLE
        self.fact_data_extraction_table = config.DWH_FACT_DATA_EXTRACTION_TABLE
        self.fact_report_table = config.DWH_FACT_REPORT_ETL_TABLE
        self.schedule_type = 'daily'
        self.schedule_date_key = 20200113
        self.schedule_time_key = 50000
        self.reports = []
        self.document_key_checks = []
        self.performance_key_checks = []
    
    def clean(self):
        report = initial_report('clean', self.project_id, self.schedule_type, self.schedule_date_key, self.schedule_time_key)
        start_run = time.time()
        try:
            if self.environment == 'development' or self.environment == 'production':
                now = self.start - timedelta(days=1)
                file_name = str(now.strftime("%Y-%m-%d"))
                list_path = [
                    self.backup_dir + self.project_backup_dir + self.project_docs_dir + file_name + self.backup_file_type,
                    self.backup_dir + self.project_backup_dir + self.project_trans_dir + file_name + self.backup_file_type,
                    self.backup_dir + self.project_backup_dir + self.project_performance_dir + file_name + self.backup_file_type,
                    self.backup_dir + self.project_backup_dir + self.project_field_dir + file_name + self.backup_file_type,
                ]
                for item in list_path:
                    if os.path.exists(item):
                        os.remove(item)
                    else:
                        report.description = "The {path} does not exist".format(path = item)
            report.status_code = 'PASSED'
        except Exception as e:
            report.status_code = 'FAILED'
            report.description = str(e)
        finally:
            report.total_time_run_second = time.time()-start_run
            self.reports.append(report)
   
    def fact_document(self):
        report = initial_report('fact_document', self.project_id, self.schedule_type, self.schedule_date_key, self.schedule_time_key)
        start_run = time.time()
        try:
            datas = []
            data_docs, data_trans = self.get_docs_and_trans()
            list_created = [{'doc_id': func.bson_object_to_string(data['_id']), 'created_date': data['created_date']} for data in data_docs]
            _id = self.db.get_max_id_table(schema = self.schema, table = self.fact_document_table, col = 'document_key')
            if _id == None:
                _id = 1
            else:
                _id+=1
            for data in data_trans:
                if len(data['records']) == 0:
                    continue
                created_date_utc_7  = func.created_date_of_docs_by_id(func.bson_object_to_string(data['doc_id']), list_created) + datetime.timedelta(hours = 7)
                last_modified_utc_7 = data['last_modified'] + datetime.timedelta(hours = 7)
                import_date_key_utc_7, import_time_key_utc_7 = func.handle_date_to_date_and_time_id(created_date_utc_7)
                export_date_key_utc_7, export_time_key_utc_7 = func.handle_date_to_date_and_time_id(last_modified_utc_7)
                document_id = func.bson_object_to_string(data['doc_id'])
                doc_set_id = func.bson_object_to_string(data['doc_set_id']),
                _obj = FactDocumentModel(
                    document_key = _id,
                    ori_document_id = func.bson_object_to_string(data['_id']),
                    project_id = self.project_id,
                    document_id = document_id,
                    doc_set_id =  doc_set_id,
                    remark_code = None,
                    remark_description = None,
                    import_date_key = import_date_key_utc_7,
                    import_time_key = import_time_key_utc_7,
                    export_date_key = export_date_key_utc_7,
                    export_time_key = export_time_key_utc_7,
                    import_timestamp = created_date_utc_7,
                    export_timestamp = last_modified_utc_7,
                )
                self.document_key_checks.append({'document_key': _id, 'document_id': document_id, 'doc_set_id': doc_set_id})
                datas.append(_obj)
                _id+=1
            if datas != []:
                pprint(datas[0].__dict__)
            self.db.create([item.__dict__ for item in datas], self.schema, self.fact_document_table)
            report.status_code = 'PASSED'
        except Exception as e:
            report.status_code = 'FAILED'
            report.description = str(e)
        finally:
            report.total_time_run_second = time.time()-start_run
            self.reports.append(report)

    def get_document_key_by_document_id(self, document_id: str):
        document_key = None
        for item in self.document_key_checks:
            if item['document_id'] == document_id:
                document_key = item['document_key']
                break
        return document_key
    
  
    def fact_performance(self):
        report = initial_report('fact_performance', self.project_id, self.schedule_type, self.schedule_date_key, self.schedule_time_key)
        start_run = time.time()
        try:
            datas = []
            data_performance = self.get_performance()
            _id = self.db.get_max_id_table(schema = self.schema, table = self.fact_performance_table, col ='performance_key')
            if _id == None:
                _id = 1
            else:
                _id+=1
            for performance in data_performance:
                captured_date_timestamp_utc_7 = performance['time'] + datetime.timedelta(hours = 7)
                document_key = self.get_document_key_by_document_id(performance['doc_id'])
                obj_ = FactPerformanceModel(
                        performance_key = _id,
                        ori_performance_id = func.bson_object_to_string(performance['_id']),
                        document_key = document_key,
                        project_id = self.project_id,  
                        group_id = performance['group_id'],  
                        document_id = performance['doc_id'],  
                        reworked = func.int_to_bool(performance['rework_count']),  
                        work_type_key = func.get_working_type_id_by_name(performance['work_type']),  
                        process_key = func.get_process_key_performance_gda(performance['type'], performance['task_def_key']),  
                        number_of_record = performance['records'],
                        number_of_item = performance['items'],  
                        number_of_field = performance['fields'],
                        number_of_character = performance['chars'],  
                        user_name = performance['username'], 
                        ip = performance['ip'], 
                        captured_date_timestamp = captured_date_timestamp_utc_7,  
                        captured_date_key = func.time_to_date_key(captured_date_timestamp_utc_7),  
                        captured_time_key = func.time_to_time_key(captured_date_timestamp_utc_7),  
                        total_time_second = performance['total_time']    
                )
                datas.append(obj_)
                self.performance_key_checks.append({'performance_key': _id, 'user_name': performance['username'], 
                                                    'document_key': document_key, 'module_type': performance['type'], 
                                                    'task_def_key': performance['task_def_key']})
                _id+=1
            if datas != []:
                pprint(datas[0].__dict__)
            self.db.create([item.__dict__ for item in datas], self.schema, self.fact_performance_table)
            report.status_code = 'PASSED'
        except Exception as e:
            report.status_code = 'FAILED'
            report.description = str(e)
            pprint(e)
        finally:
            report.total_time_run_second = time.time()-start_run
            self.reports.append(report)
    
    def get_performance_key(self, document_key: str, user_name: str, module_type: str, task_def_key: str):
        performance_key = None
        for item in self.performance_key_checks:
            if item['user_name'] == user_name and item['module_type'] == module_type and item['document_key'] == document_key and item['task_def_key'] == task_def_key:
                performance_key = item['performance_key']
                break
        return performance_key
                                                   
    def fact_data_extract(self):
        report = initial_report('fact_data_extraction', self.project_id, self.schedule_type, self.schedule_date_key, self.schedule_time_key)
        start_run = time.time()
        try:
            data_docs, data_trans = self.get_docs_and_trans()
            col_ignores = ['ImagePath']
            results = []
            for data in data_docs:
                records = data['records']
                document_id = func.bson_object_to_string(data['_id'])          
                document_key = self.get_document_key_by_document_id(document_id)
                doc_set_id = func.bson_object_to_string(data['doc_set_id'])
                for record in records:
                    for key, value in record.items():
                        if key == 'keyed_data':
                            for keyed_data in value:
                                source = keyed_data['source']
                                task_def_key = keyed_data['task_def_key']
                                task_id = keyed_data['task_id']
                                section = keyed_data['section']
                                reason = keyed_data['reason']
                                data_needed = keyed_data['data'][0].items()
                                last_modified_utc_7 = keyed_data['createdtime'] + datetime.timedelta(hours = 7)
                                user_name = keyed_data['keyer']
                                performance_key = None
                                if source != 'queue_transform' and task_def_key.startswith('Type'):
                                    process_key = 3 # human input keyed_data kpi
                                    performance_key = self.get_performance_key(document_key, user_name, 'keying', task_def_key) 
                                if source != 'queue_transform' and task_def_key == 'Verify_Hold_Type':
                                    process_key = 12 # human check bad_image keyed_data not kpi                               
                                elif source == 'queue_transform' and task_def_key.startswith('Type'):
                                    process_key = 4 # 'machine save input keyed_data'
                                elif source != 'queue_transform' and task_def_key.startswith('Proof'):
                                    process_key = 5 # human qc input keyed_data' kpi
                                    performance_key = self.get_performance_key(document_key, user_name, 'keying', task_def_key)                                  
                                elif source == 'queue_transform' and task_def_key.startswith('Proof'):
                                    process_key = 6 # 'machine save qc keyed_data'
                                for field_name, field_value_dict in data_needed:
                                    if field_name in col_ignores:
                                        continue
                                    _obj = FactDataExtractionModel(
                                        document_key = document_key,
                                        performance_key = performance_key,
                                        ori_document_id = document_id,
                                        project_id = self.project_id,
                                        document_id = document_id,
                                        doc_set_id =  doc_set_id,
                                        last_modified_date_key = func.time_to_date_key(last_modified_utc_7),
                                        last_modified_time_key = func.time_to_time_key(last_modified_utc_7),
                                        last_modified_timestamp = last_modified_utc_7,
                                        user_name = user_name,
                                        process_key = process_key,
                                        field_name = field_name,
                                        field_value = field_value_dict['text']
                                    )
                                    results.append(_obj)

                        elif key == 'final_data':
                            final_data = value[0]
                            data_needed = final_data['data'][0].items()
                            last_modified_utc_7 = final_data['createdtime'] + datetime.timedelta(hours = 7)
                            user_name = final_data['keyer']
                            process_key = 10
                            for field_name, field_value_dict in data_needed:
                                if field_name in col_ignores:
                                    continue
                                _obj = FactDataExtractionModel(
                                    document_key = document_key,
                                    performance_key = None,
                                    ori_document_id = document_id,
                                    project_id = self.project_id,
                                    document_id = document_id,
                                    doc_set_id =  doc_set_id,
                                    last_modified_date_key = func.time_to_date_key(last_modified_utc_7),
                                    last_modified_time_key = func.time_to_time_key(last_modified_utc_7),
                                    last_modified_timestamp = last_modified_utc_7,
                                    user_name = user_name,
                                    process_key = process_key,
                                    field_name = field_name,
                                    field_value = field_value_dict['text']
                                )
                                results.append(_obj)

                        elif key == 'qc_ed_data':
                            qc_ed_data = value[0][0]
                            if 'qc_fields_err' not in qc_ed_data.keys():
                                continue
                            qc_ed_data_err = qc_ed_data['qc_fields_err']
                            data_needed = qc_ed_data_err[0].items()
                            last_modified_utc_7 = qc_ed_data['createdtime'] + datetime.timedelta(hours = 7)
                            user_name = qc_ed_data['keyer']
                            process_key = 8
                            performance_key = None
                            performance_key = self.get_performance_key(document_key, user_name, 'qc', task_def_key) 
                            for field_name, field_value_dict in data_needed:
                                if field_name in col_ignores:
                                    continue
                                _obj = FactDataExtractionModel(
                                    document_key = document_key,
                                    performance_key = performance_key,
                                    ori_document_id = document_id,
                                    project_id = self.project_id,
                                    document_id = document_id,
                                    doc_set_id =  doc_set_id,
                                    last_modified_date_key = func.time_to_date_key(last_modified_utc_7),
                                    last_modified_time_key = func.time_to_time_key(last_modified_utc_7),
                                    last_modified_timestamp = last_modified_utc_7,
                                    user_name = user_name,
                                    process_key = process_key,
                                    field_name = field_name,
                                    field_value = field_value_dict['text']
                                )
                                results.append(_obj)

                        elif key == 'apr_ed_data':
                            print('here')
            if len(results) != 0:
                pprint(results[0].__dict__)
            self.db.create([item.__dict__ for item in results], self.schema, self.fact_data_extraction_table)
            report.status_code = 'PASSED'
        except Exception as e:
            report.status_code = 'FAILED'
            report.description = str(e)
            pprint(e)
        finally:
            report.total_time_run_second = time.time()-start_run
            self.reports.append(report)

    def report_upload(self):
        self.db.create([item.__dict__ for item in self.reports], self.schema, self.fact_report_table)
    
db_connect = DatabaseConnect(uri = config.DWH_SQLALCHEMY_URI)
executor = GdaExecutor(
    environment=config.ENVIRONMENT,
    uri=config.ELROND_URI,
    database_name=config.ELROND_DATABASE,
    docs_collection_name= config.GDA_DOCS_COLLECTION, 
    trans_collection_name= config.GDA_TRANS_COLLECTION,
    performance_collection_name = config.GDA_PERFORMANCE_COLLECTION,
    db = db_connect
)
executor.clean()
executor.fact_document()
executor.fact_performance()
executor.fact_data_extract() 
executor.report_upload()

{'doc_set_id': ('5ff2df89474eb70010c1a4de',),
 'document_id': '5ff2df89474eb70010c1a5aa',
 'document_key': 1,
 'export_date_key': 20210111,
 'export_time_key': 85508,
 'export_timestamp': datetime.datetime(2021, 1, 11, 8, 55, 8, 149000),
 'import_date_key': 20210104,
 'import_time_key': 162737,
 'import_timestamp': datetime.datetime(2021, 1, 4, 16, 27, 37, 617000),
 'ori_document_id': '5ffbaffc489b01001ed5a29f',
 'project_id': '5e9e7ec598d753001b7efe6b',
 'remark_code': None,
 'remark_description': None}
{'captured_date_key': 2021111,
 'captured_date_timestamp': datetime.datetime(2021, 1, 11, 7, 2, 36, 9000),
 'captured_time_key': 7236,
 'document_id': '5ff2df8b474eb70010c1ad5a',
 'document_key': 1678,
 'group_id': None,
 'ip': '10.1.29.163',
 'number_of_character': 429,
 'number_of_field': 30,
 'number_of_item': 5,
 'number_of_record': 5,
 'ori_performance_id': '5ffb959caa3d950371f96ef3',
 'performance_key': 1,
 'process_key': 3,
 'project_id': '5e9e7ec598d753001b7efe6b',
 'reworked':

In [None]:
#             self.db.create([item.__dict__ for item in results], self.schema, self.fact_data_extraction_table)
#                     elif key == 'qc_ed_data':
#                         qc_ed_data = value[0][0]
#                         if 'qc_fields_err' not in qc_ed_data.keys():
#                             pass
#                         else:
#                             data_needed = qc_ed_data['data'].items()
#                             last_modified_utc_7 = qc_ed_data['createdtime'] + datetime.timedelta(hours = 7)
#                     elif key == 'apr_ed_data':
#                         pprint('here')
#                     elif key == 'final_data':
#                         final_data = value[0]
#                         last_modified_utc_7 = final_data['createdtime'] + datetime.timedelta(hours = 7)
#                         user_name = final_data['keyer']
#                         process_key = 7 # machine save final_data
#                         data_needed = final_data['data'][0].items()
#                         for field_name, field_value_dict in data_needed:
#                             if field_name in col_ignores:
#                                 _obj = FactDataExtractionModel(
#                                     project_id = self.project_id,
#                                     document_id = func.bson_object_to_string(data['_id']),
#                                     doc_set_id =  func.bson_object_to_string(data['doc_set_id']),
#                                     last_modified_date_key = func.time_to_date_key(last_modified_utc_7),
#                                     last_modified_time_key = func.time_to_time_key(last_modified_utc_7),
#                                     last_modified_timestamp = last_modified_utc_7,
#                                     user_name = user_name,
#                                     process_key = process_key,
#                                     field_name = field_name,
#                                     field_value = field_value_dict['text']
#                                 )
#                                 results.append(_obj)

In [None]:
dag_params = {
    'dag_id': "dwh_GDA_project_daily_tmp",
    'start_date': datetime.datetime(2021, 1, 6, tzinfo=config.LOCAL_TIME_ZONE),
    'schedule_interval': '20 5 * * *'
}

dag = DAG(**dag_params)

clean = PythonOperator(task_id='clean', python_callable=executor.clean, dag=dag)
check_connect = PythonOperator(task_id='check_connect', python_callable=executor.check_connect, dag=dag)
backup_docs_json = PythonOperator(task_id='backup_docs_json', python_callable=executor.backup_docs_json, dag=dag, trigger_rule=TriggerRule.ALL_SUCCESS)
backup_trans_json = PythonOperator(task_id='backup_trans_json', python_callable=executor.backup_trans_json, dag=dag, trigger_rule=TriggerRule.ALL_SUCCESS)
backup_performance = PythonOperator(task_id='backup_performance', python_callable=executor.backup_performance, dag=dag, trigger_rule=TriggerRule.ALL_SUCCESS)


fact_performance = PythonOperator(task_id='fact_performance', python_callable=executor.fact_performance, dag=dag, trigger_rule=TriggerRule.ALL_SUCCESS)
fact_document = PythonOperator(task_id='fact_document', python_callable=executor.fact_document, dag=dag, trigger_rule=TriggerRule.ALL_SUCCESS)


report = PythonOperator(task_id='report', python_callable=executor.report, dag=dag, trigger_rule=TriggerRule.ALL_DONE)

clean >> check_connect >> [backup_trans_json, backup_docs_json, backup_performance]

fact_performance.set_upstream(backup_performance)
fact_document.set_upstream([backup_trans_json, backup_docs_json])

[fact_performance, fact_document] >> report

In [None]:
trans = pickle.load(open('./backup/trans/' + '5db5c87345052400142992e9' + '.pickle', 'rb'))
docs = pickle.load(open('./backup/docs/' + '5db5c87345052400142992e9' + '.pickle', 'rb'))
performance = pickle.load(open('./backup/performance/' + '5db5c87345052400142992e9' + '.pickle', 'rb'))
# keyed_data
# qc

# pprint(performance)
x = []
for data in docs:
    pprint(data)
    break
    ocr_results = data['records'][0]['system_data'][0]['ocr_data'][0]['ocr_results']
    for ocr_result in ocr_results:
        field_name = ocr_result['field_name']
        if field_name not in x: 
            x.append(field_name)
print(x)
['address', 'birthday', 'expiry', 'home_town', 'id', 'issue_at', 'issue_date', 'name', 'sex']