In [16]:
import datetime
import os
import pandas as pd
import sqlalchemy
import xml.etree.ElementTree as ET
from typing import Callable
import logging
import warnings
import threading
import time
import logging
logging.basicConfig(filename='parsing.log', level=logging.INFO, filemode='w', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.getLogger('').addHandler(console)
logging.getLogger('postgresql').setLevel(logging.WARNING)
warnings.filterwarnings("ignore") 

In [17]:
connection_string = 'postgresql+psycopg2://postgres:5555@db.mpkazantsev.ru/demo'
engine = sqlalchemy.create_engine(connection_string)

In [18]:
TAGS = ['MONTHLY_DETAIL', 'LOANS_OVERVIEW', 'LOAN', 'MAIN', 'NAME', 'SCORE', 'FRAUD']

TABLE_NAMES = ['singleformattype', 'monthlydetailtype', 'loansoverviewtype', 'loanstype', 'maintype', 'nametype', 'scoretype', 'fraudtype']

TAGS_TO_TABLE_NAMES_MAPPING = {
                              'MONTHLY_DETAIL': 'monthlydetailtype',
                              'LOANS_OVERVIEW':'loansoverviewtype',
                              'LOAN':'loanstype',
                              'MAIN':'maintype',
                              'NAME':'nametype',
                              'SCORE':'scoretype',
                              'FRAUD':'fraudtype',
                              }

FIELD_NAMES_TO_EXCLUDE = ['cbtypecode', 'nextpmtprincipal']

def get_table_name_by_tag(tag):
    return TAGS_TO_TABLE_NAMES_MAPPING[tag]

In [19]:
def recreate_tables(tables):
    for table in tables:
        query = "create table  "+prefix+table+"  as select * from ebp."+table+" where 1<>1"
        try:
            engine.execute(query)
            logging.info("Table "+table+" created")
        except:
            drop_query="drop table "+prefix+table
            try:
                engine.execute(drop_query)
                logging.info("Error:Table droped")
            except:
                pass
            engine.execute(query)
            logging.info("Table "+table+" recreated")
            

In [20]:
def get_df_names_to_db_names_dict(table_name, engine): # словарь для поиска имен полей по тегу в низком регистре и без _ через describe таблицы
    describe_query = "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{}';".format(table_name)
    df = pd.read_sql_query(describe_query, engine)
    result_dict = {}
    for _, row in df[['column_name']].iterrows():
        result_dict[row['column_name'].lower().replace('_','')] = row['column_name']
    return result_dict

TAG_TO_DF_NAMES_AND_DB_NAMES_DICT = {}
for tag in TAGS:
    TAG_TO_DF_NAMES_AND_DB_NAMES_DICT[tag] = get_df_names_to_db_names_dict("eb_" + get_table_name_by_tag(tag), engine)

In [21]:
def get_table_type_dict(tag): # получение словаря поле-тип по тегу (имена полей записаны в нижнем регистре без нижних подчеркиваний)
    xml_root = ET.parse(os.getcwd() + '/SingleFormat.xsd').getroot()
    table_type = xml_root.findall(".//{http://www.w3.org/2001/XMLSchema}element[@name='" + tag + "']")[0].attrib['type']
    result_dict = {}
    for element in xml_root.findall(".//{http://www.w3.org/2001/XMLSchema}complexType[@name='" + table_type + "']/{http://www.w3.org/2001/XMLSchema}sequence/{http://www.w3.org/2001/XMLSchema}element"):
        element_name = element.attrib['name']
        element_type = element.attrib['type']
        if element_type[:3] == "xs:":
            element_type = element_type[3:]
        result_dict[element_name.lower().replace('_','')] = element_type.lower()
    return result_dict

TAG_TO_TABLE_TYPES_DICT = {}
for tag in TAGS:
    TAG_TO_TABLE_TYPES_DICT[tag] = get_table_type_dict(tag)

In [22]:
def get_field_value(expected_type, field_name, str_value):
    if expected_type == None:
        logging.info("Не найдено значение по полю "+field+" в словаре,значение: "+value)
        return None
    elif expected_type == 'int' or field_name == 'recentlegalupdatedate':  # костыль на interestrate пока в БД поле int а не float
        return int(str_value)
    elif field_name == 'interestrate':
        return int(str_value.split('.')[0])
    elif expected_type == 'float' or expected_type == 'moneyvaluetype':
        return float(str_value)
    else:
        return str_value

In [23]:
def get_df_from_SF_item(SF_item, table_name, hjid, tables_current_hjid):
    field_values = []
    field_names = []

    for SF_subitem in SF_item:
        field_name = SF_subitem.tag.lower().replace('_','')
        if len(SF_subitem) > 0 or field_name in FIELD_NAMES_TO_EXCLUDE or TAG_TO_DF_NAMES_AND_DB_NAMES_DICT[SF_item.tag].get(field_name) is None:
            logging.info("Либо есть вложенная структура, либо отсутствует значение, либо поле есть в списке исключений")
            continue
        field_value = get_field_value(expected_type=TAG_TO_TABLE_TYPES_DICT[SF_item.tag].get(field_name), field_name=field_name, str_value=SF_subitem.text)
        if field_value != None:
            field_names.append(field_name)
            if isinstance(field_value, int) or isinstance(field_value, float):
                field_values.append(field_value)
            else:
                field_values.append("'" + field_value + "'")

    field_names.append('hdp_datetime')
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    field_values.append("'" + current_time + "'")

    if TABLES_HJID[table_name] != 'hjid':
        field_for_hjid_name = table_name + "_hjid"
        tables_current_hjid[field_for_hjid_name] = tables_current_hjid.get(field_for_hjid_name, 0) + 1
        field_names.append('hjid')
        field_values.append(tables_current_hjid.get(field_for_hjid_name))

    if table_name != 'monthlydetailtype':
        field_names.append(TABLES_HJID[table_name])
        field_values.append(hjid)
    else:
        field_names.append('loan_id')
        field_values.append(tables_current_hjid.get('loanstype_hjid'))
    return pd.DataFrame(data=[field_values], columns=field_names)

In [24]:
def parse_xml_file(path_to_xml_file, hjid, table_name_to_df_dict, tables_current_hjid):
    for tag in TAGS: # парсим данные по тегу и накапливаем во фрейм
        if tag == 'MONTHLY_DETAIL':
                continue
        table_name = get_table_name_by_tag(tag)
        xml_root = ET.parse(path_to_xml_file).getroot()
        SF_items = xml_root.findall(".//" + tag)
        iterr = 0
        for SF_item in SF_items:
            df_to_append = get_df_from_SF_item(SF_item, table_name, hjid, tables_current_hjid)
            table_name_to_df_dict[table_name] = pd.concat([table_name_to_df_dict[table_name], df_to_append], ignore_index=True)
            if tag == 'LOAN':
                MD_SF_items = SF_item.findall(".//MONTHLY_DETAIL")
                for MD_SF_item in MD_SF_items:
                    MD_table_name = get_table_name_by_tag(MD_SF_item.tag)
                    df_to_append = get_df_from_SF_item(MD_SF_item, MD_table_name, hjid, tables_current_hjid)
                    table_name_to_df_dict[MD_table_name] = pd.concat([table_name_to_df_dict[MD_table_name], df_to_append], ignore_index=True)
            iterr= iterr + 1
        logging.info('Вставлено '+str(iterr)+' строк в таблицу '+tag+'.')

In [36]:
def execute_insert_query(engine, insert_query, table_name, number_of_rows):
    engine.execute(insert_query)

In [32]:
def save_df_to_db(df, tag, table_name, engine, threads): # сохранение датафрема в таблицу
    if df.shape[0] > 0:
        names_dict = TAG_TO_DF_NAMES_AND_DB_NAMES_DICT[tag]
        str_for_columns = ", ".join(map(lambda x: names_dict.get(x, x), df.columns.values))
        values_list = []
        for _, row in df.iterrows():
            values_list.append("(" + ", ".join(map(str, row)).replace("nan","NULL") + ")")
        insert_query = "INSERT INTO {} ({}) VALUES {}".format(table_name, str_for_columns, ",".join(values_list))
        thread = threading.Thread(target=execute_insert_query, args=(engine, insert_query, table_name, len(values_list)))
        thread.start()
        threads.append(thread)
    else:
        logging.info("0 строк во фрейме " + table_name + " нечего записывать")

In [33]:
def save_singleformattype_to_db(hjids, prefix, engine,threads):  # сохранение данных из list в singleformattype      
    if len(hjids) > 0:
        values_list = []
        for hj in hjids:
            values_list.append("(" + ",".join([str(hj)] * 8) + ")")
        insert_query = '''INSERT INTO {}singleformattype (hjid, names_, loansoverview, loans, frauds, documents, scores, main)
                      VALUES {};'''.format(prefix, ",".join(values_list))
        thread = threading.Thread(target=execute_insert_query, args=(engine, insert_query, prefix+'singleformattype', len(values_list)))
        thread.start()
        threads.append(thread)
  
        logging.info("запись в singleformattype окончена")
    else:
        logging.info("len(hjids)=0")

In [34]:
def final_parsing(recreate_tables, engine, prefix, path_to_folder_with_xml_files, loading_size):

    tables_current_hjid = {}
    recreate_tables(TABLE_NAMES)

    table_name_to_df_dict = {}
    for tag in TAGS:
        table_name = get_table_name_by_tag(tag)
        table_name_to_df_dict[table_name] = pd.DataFrame() 
        table_name_to_df_dict[table_name + "_hjid"] = 0 
    threads = []
    hjids = []
    for filename in os.listdir(path_to_folder_with_xml_files):
        if filename.endswith(".xml"):
            path_to_xml_file = path_to_folder_with_xml_files + "/" + filename
            hjid = int(filename.split('.')[0])
            hjids.append(hjid)

            parse_xml_file(path_to_xml_file, hjid, table_name_to_df_dict, tables_current_hjid)

            if len(hjids) == loading_size:
                for tag in TAGS:
                    table_name = get_table_name_by_tag(tag)
                    save_df_to_db(df=table_name_to_df_dict[table_name], tag=tag, table_name=prefix+table_name, engine=engine,threads = threads) # сохраняем df в БД
                    table_name_to_df_dict[table_name] = pd.DataFrame() 
                save_singleformattype_to_db(hjids, prefix, engine, threads = threads) 
                hjids=[]        
    else:
        for tag in TAGS:
            table_name = get_table_name_by_tag(tag)
            save_df_to_db(df=table_name_to_df_dict[table_name], tag=tag, table_name=prefix+table_name, engine=engine, threads = threads) # сохраняем df в БД
            table_name_to_df_dict[table_name] = pd.DataFrame()
        save_singleformattype_to_db(hjids, prefix, engine, threads = threads) 

In [35]:
import time
start_time = time.time()
TABLES_HJID = {
              'monthlydetailtype': 'loan_id', 
              'loansoverviewtype': 'hjid',
              'loanstype': 'loanstypes_loan_hjid',
              'maintype': 'hjid',
              'nametype': 'nametypes_name__hjid',
              'scoretype': 'scoretypes_score_hjid',
              'fraudtype': 'fraudtypes_fraud_hjid',
              }

prefix = 'ebp.eb_'
path_to_folder_with_xml_files = os.getcwd() + '/parser_test' # Путь до наших файлов
loading_size = 5 # Размер батча  

final_parsing(recreate_tables=recreate_tables,engine=engine,prefix=prefix, path_to_folder_with_xml_files=path_to_folder_with_xml_files,
             loading_size = loading_size)
end_time = time.time()
print('Время выполнения: {} секунд'.format(end_time - start_time))

2023-05-04 09:04:09,624 - root - INFO - Error:Table droped
2023-05-04 09:04:09,624 - root - INFO - Error:Table droped
2023-05-04 09:04:09,629 - root - INFO - Table singleformattype recreated
2023-05-04 09:04:09,629 - root - INFO - Table singleformattype recreated
2023-05-04 09:04:09,632 - root - INFO - Error:Table droped
2023-05-04 09:04:09,632 - root - INFO - Error:Table droped
2023-05-04 09:04:09,637 - root - INFO - Table monthlydetailtype recreated
2023-05-04 09:04:09,637 - root - INFO - Table monthlydetailtype recreated
2023-05-04 09:04:09,641 - root - INFO - Error:Table droped
2023-05-04 09:04:09,641 - root - INFO - Error:Table droped
2023-05-04 09:04:09,646 - root - INFO - Table loansoverviewtype recreated
2023-05-04 09:04:09,646 - root - INFO - Table loansoverviewtype recreated
2023-05-04 09:04:09,651 - root - INFO - Error:Table droped
2023-05-04 09:04:09,651 - root - INFO - Error:Table droped
2023-05-04 09:04:09,656 - root - INFO - Table loanstype recreated
2023-05-04 09:04:09,

486 rows were inserted into table ebp.eb_monthlydetailtype
2 rows were inserted into table ebp.eb_loansoverviewtype
17 rows were inserted into table ebp.eb_loanstype
3 rows were inserted into table ebp.eb_nametype
2 rows were inserted into table ebp.eb_maintype
2 rows were inserted into table ebp.eb_scoretype
Время выполнения: 2.687958240509033 секунд
2 rows were inserted into table ebp.eb_singleformattype
