## Цикличный загрузчик данных по папке в одну таблицу

___
Реализованный функционал:
- Считывание файлов из папки.
- Перед загрузкой просматривать файл на ошибки в разметке столбцов. По возможности логировать этот процесс.
- Загрузка файлов в указанную таблицу.
___
Развитие:
- Рассмотреть возможность загрузки файлов из локальной папки или из HDFS.
___

In [1]:
import numpy as np
import pandas as pd
import os
import datetime
import contextlib

import ibis 
from impala.dbapi import connect
from impala.util import as_pandas

import warnings
warnings.filterwarnings('ignore')

from hdfs.ext.kerberos import KerberosClient

pd.set_option('max_columns', None)

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:98% !important; }</style>"))

In [2]:
# Получение нового тикета доступа
login = 'gpbu16371'
password = '***'
!klist

os.system('echo ' + password + ' | kinit ' + login)
!klist

Ticket cache: FILE:/tmp/krb5cc
Default principal: gpbu16371@INT.GAZPROMBANK.RU

Valid starting       Expires              Service principal
07/01/2021 12:40:11  07/01/2021 22:40:11  krbtgt/INT.GAZPROMBANK.RU@INT.GAZPROMBANK.RU
	renew until 07/08/2021 12:40:11
Ticket cache: FILE:/tmp/krb5cc
Default principal: gpbu16371@INT.GAZPROMBANK.RU

Valid starting       Expires              Service principal
07/01/2021 12:40:29  07/01/2021 22:40:29  krbtgt/INT.GAZPROMBANK.RU@INT.GAZPROMBANK.RU
	renew until 07/08/2021 12:40:29


In [3]:
# Папка, из которой мы загружаем файлы
folder = '/work/data_loader/20210603_fraud'

# База, в которую грузим (в т.ч. временную таблицу)
database = 'sbx_dml'

# Название временной таблицы
temp_table = 'pde_temp_ext'

# Название итоговой таблицы
table = 'pde_fraud_temp'

# Разделитель в файлах
sep = ';'

In [8]:
# Считывание всех файлов из указанной папки
files = []

for file in os.listdir(folder):
    full_path = os.path.join(folder, file)
    if os.path.isfile(full_path):
        files.append(file)

len(files)

2

In [9]:
# Поиск строк с ошибочными разделителями
print('Начало анализа:', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

with open('log.txt', 'w') as log:
    # print записывает имя файла в лог
    with contextlib.redirect_stdout(log):
        # Если есть ошибка в разметке, она запишется в лог
        with contextlib.redirect_stderr(log):
            for file in sorted(files):
                print('Файл:', file)
                df = pd.read_csv(folder + '/' + file, sep = sep, encoding = 'Windows-1251', error_bad_lines = False, dtype = np.str)
                
print ('Конец анализа: ', datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

Начало анализа: 2021-07-01 12:46:05
Конец анализа:  2021-07-01 12:46:06


In [10]:
# Загружаем лог для анализа
with open('log.txt', 'r') as log:
    log_data = log.read()[:-1]

log_data = log_data.replace("'\nb'", "")       
log_data = log_data.replace("\\n", "; ")    
    
# Преобразуем его в DataFrame для удобства
df_log = pd.DataFrame(log_data.split('\n'))

# Удаляем строки, по которым нет ошибок
df_log_error = pd.concat([df_log.shift(periods = 1, fill_value = 0), df_log], axis = 1)[1:]
df_log_error.columns = ['data', 'shift_data']
df_log_error['shift_data_index'] = df_log_error['shift_data'].str.find('Файл')
df_log_error = df_log_error[df_log_error['shift_data_index'] < 0]
df_log_error.drop(['shift_data_index'], inplace = True, axis = 1)

# Разбиение DataFrame в случае нескольких ошибок для одной строки
df_log_error['shift_data'] = df_log_error['shift_data'].str[2:-3]
df_log_error = df_log_error.set_index(['data']).apply(lambda x: x.str.split('; ').explode()).reset_index()
df_log_error['data'] = df_log_error['data'].str.replace('Файл: ', '')
df_log_error.columns = ['file_name', 'error']

df_log_error

Unnamed: 0,file_name,error


In [11]:
for index, file in enumerate(sorted(files), 1):
    # Вывод даты начала загрузки
    start = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('Начало загрузки: ' + start + ', файл: ' + str(index) + ". " + file)

    # Загрузка файла в память
    df_for_load = pd.read_csv(folder + '/' + file, sep = sep, encoding = 'Windows-1251', dtype = np.str)
    df_for_load['file_name'] = file


    # Сколько записей считано в файле
    print('Считано записей: ' + str(df_for_load.shape[0]))

    # Подключение к БД для запросов
    conn = connect(host = 'hdp-p3pml', 
                   port = 21050,  
                   use_ssl = 'true', 
                   auth_mechanism = 'GSSAPI',
                   database = database)
    cursor = conn.cursor()  
    cursor.execute("Drop Table If Exists " + database + "." + temp_table)

    # Загрузка данных во временную таблицу
    file_schema = ibis.expr.schema.infer(df_for_load)
    db_name = database
    table_name = temp_table
    impala_tbl_folder = '/data/sbx/' + database[database.find("_") + 1:]
    client = ibis.impala.connect(host = 'hdp-p3pml', 
                                 port = 21050,  
                                 use_ssl = 'true', 
                                 auth_mechanism = 'GSSAPI')
    db = client.database(db_name)
    db.create_table(table_name, schema = file_schema)
    df_for_load.to_parquet(table_name + '.parq', index = False)
    client = KerberosClient('https://hdp-p3pml:14000')
    client.upload(impala_tbl_folder + '/' + table_name + '/', table_name + '.parq')
    db.table(table_name).refresh()

    # Реально выполняется только для первого файла (из-за IF NOT EXISTS)
    # Создание итоговой таблицы, структура которой соотвествует структуре временной
    query = ''

    cursor.execute("SHOW CREATE TABLE " + database + '.' + temp_table)
    query_result = cursor.fetchall()
    for string in query_result:
        str_beg = string[0].find('(')             # Оставляем запрос, начиная с первой скобки
        str_end = string[0].find('PARQUET') + 7   # Обрезаем запрос до конца PARQUET
        query += "CREATE TABLE IF NOT EXISTS " + database + '.' + table + " " + string[0][str_beg:str_end].replace('  ', '\t')

    cursor.execute(query)

    # Перенос данных из временной таблицы
    cursor.execute("Insert Into " + database + "." + table + " Select * From " + database + "." + temp_table)

    del df_for_load

Начало загрузки: 2021-07-01 12:46:10, файл: 1. fraud 21y.csv
Считано записей: 6462
Without an HDFS connection, certain functionality may be disabled
Начало загрузки: 2021-07-01 12:46:30, файл: 2. fraud_100 (1).csv
Считано записей: 69658
Without an HDFS connection, certain functionality may be disabled
