***
# Распознание данных деклараций

Заказчику необходимо создать инструмент автоматизации составления отчетности по отходам для пунктов таможенных деклараций, попадающих под действие законодательства по экосборам. Заказчик предоставляет таможенные декларации.

### Задачи:
- Использовать модель OCR для распознания необходимых данных из предоставленных таможенных деклараций и Постановления Правительства РФ:
- Сопоставить перечень кодов ТН ВЭД с Постановлением Правительства РФ от 29.12.2023 № 2414 и выделить перечень товаров, попадающих по действие Постановления.

### Результат
- В формируемом отчете должны быть данные:
    - Имя файла декларации;
    - Количество товаров;
    - Код товара ТН ВЭД;
    - Суммарный вес товаров каждого типа без упаковки (нетто);
    - Суммарный вес товаров каждого типа в упаковке (брутто);
    - Типы упаковок и их количество;
    - Входит ли код товара в перечень Постановления.

Отчет должен быть предоставлен в вормате .xlsx.
***
В одном исходном файле могут содержаться несколько деклараций.
В некоторых декларациях поле с общим количеством товара не заполнено.

### Общий план работы:
- Обработка Постановления:
    - Границы необходимых данных Постановления Правительства РФ будут определяться путём привязки к линиям;
    - Распознание текста в указанных областях и формирование датасета;
- Обработка деклараций:
    - Объединение многостраничных деклараций в один файл;
    - Определение таблиц Деклараций (привязка к ключевым словам);
    - Разделение таблицы на ячейки (привязки к линиям);
    - Распознание текста в необходимых ячейках;
    - Очистка данных регулярными выражениями;
    - Формирование датасета;
    - Сравнение кодов ТН ВЭД;
    - Формирование отчета.

Не заполненные поля декларации будут заполняться пустыми строками.
***

In [1]:
import os
import re
from numpy import array, floor, concatenate, pi

import cv2
import easyocr
import matplotlib.pyplot as plt
from pymupdf import pymupdf
from PIL import Image, ImageDraw
from pandas import DataFrame, Series, concat, read_excel

In [2]:
DECLARATION_FOLDER = ''
DECISION_PATH = ''
CODES_PATH = ''
REPORT_PATH = ''
DEBUG_MODE = False

In [3]:
def logger(message) -> None:
    print(message)

In [4]:
def ocr_img(img: Image, crop_box: list = None, lang_list: list = None, detail: bool = True, paragraph: bool = False,
            smb_unity_width: float = 1.0, contrast: float = 0.7, max_size: int = 2580, min_smb_size: int = 5) -> list:
    """
    Распознаёт символы на изображении или его части.
    :param img: Исходное изображение;
    :param crop_box: Координаты участка изображения для сканирования;
    :param lang_list: Список языков для сканирования, согласно документации easyocr. Например, ["ru", "en"];
    :param detail: Возвращение только значений без координат и точности;
    :param paragraph: Объединение значений в параграфы;
    :param smb_unity_width: Максимальное расстояние между символами для объединения в одну строку;
    :param contrast: Значения с контрастностью ниже этого значения будут переданы в модель дважды. В изначальном виде
    и в высокой контрастности;
    :param max_size: Максимальный размер изображения. Изображение с размером выше этого значения будет уменьшено;
    :param min_smb_size: Минимальный размер определяемого значения, в пикселях;
    :return: Список найденных значений в формате [координаты][значение][точность].
    """
    if lang_list is None:
        lang_list = ["ru", "en"]
    reader = easyocr.Reader(lang_list)
#    img = Image.fromarray(img)

    if crop_box:
        try:
            img = img.crop(crop_box)
        except AttributeError as ex:
            if DEBUG_MODE:
                logger(f"Передан неверный объект. Функция ocr_img принимает только объекты PIL.Image. Текст ошибки: {ex})")
            raise AttributeError('Функция ocr_img принимает только объекты PIL.Image')

    ocr_page = reader.readtext(
        array(img),
        detail=detail,
        paragraph=paragraph,
        width_ths=smb_unity_width,
        contrast_ths=contrast,
        canvas_size=max_size,
        # Максимальный размер изображения. По-умолчанию 2580. Необходимо повысить, чтобы распознавать
        # большие изображения. Если распознавать большое изображение с меньшим значением, текст может
        # отсутствовать или будут появляться значительные искажения. При создании скрипта использовалось значение 7740.
        min_size=min_smb_size)
    
    return ocr_page if ocr_page else None

In [5]:
def merge_pdf_pages(path: "pdf", key: str or list, neg_key: str or list, top: int = 0, down: int = 10) -> list:
    """
    Объединяет страницы pdf файла по наличию заданного ключа. Страница без ключа будет объединена с предыдущей страницей
    с ключом. Поиск критерия происходит между заданными верхней и нижней границей.
    :param path: Путь к pdf файлу;
    :param key: Ключ для объединения страниц;
    :param neg_key: Черный список ключей. Страницы с этими ключами будут игнорироваться;
    :param top: Верхняя граница поиска по оси "y", в процентах. Значение не может быть больше "down";
    :param down: Нижняя граница поиска по оси "y", в процентах. Значение не может быть меньше "top";
    :return: Список объединенных изображений в формате numpy.ndarray.
    """

    merged_image_list = None
    for page in pymupdf.open(path).pages():
        home_page = None

        image = page.get_pixmap(dpi=200).pil_image()
        header_crop_box = [0, floor(image.size[1] * (top / 100)), image.size[0], floor(image.size[1] * (down / 100))]
        page_head_data = ocr_img(image, header_crop_box, detail=False, max_size=7740)

        if isinstance(neg_key, str):
            if [header_data for header_data in page_head_data if neg_key in header_data.lower()]:
                continue
        elif isinstance(neg_key, list):
            for neg_crt in neg_key:
                if [header_data for header_data in page_head_data if neg_crt in header_data.lower()]:
                    break


        if isinstance(key, str):
            home_page = [header_data for header_data in page_head_data if key in header_data.lower()]
        elif isinstance(key, list):
            for crt in key:
                home_page = [header_data for header_data in page_head_data if crt in header_data.lower()]
                if home_page:
                    break

        image = array(image)
        if home_page:
            try:
                merged_image_list.append(image)
            except AttributeError:
                merged_image_list = [image]
        else:
            try:
                merged_image_list[-1] = concatenate((merged_image_list[-1], image), axis=0)
            except IndexError as ex:
                logger(F'Не найден критерий для слияния. Проверьте входные данные. Текст ошибки: {ex}')
#                merged_image_list = [image]
    return merged_image_list

In [6]:
def lines_recognition(img: 'PIL.Image' or "numpy.array", crop_box: list = None, tolerance: int = 10,
            weak_line_value: int = 50, strong_line_value: int = 100, threshold: int = 100, min_line_length: int = 100,
            max_line_gap: int = 5, rho: float = 0.1, aperture_size: int = 7, l2_gradient: bool = True) -> tuple[DataFrame, DataFrame, DataFrame]:
    """
    Распознаёт вертикальные и горизонтальные линии на исходном изображении;
    :param img: Исходное изображение;
    :param crop_box: Координаты области для распознания;
    :param tolerance: Порог близости точек. Координаты, находящиеся на расстоянии меньше этого значения, будут
     объединены;
    :param weak_line_value: Порог заметности слабых линий. Слабые линии распознаются только если соединены с сильными;
    :param strong_line_value: Порог заметности сильных линий;
    :param threshold: Количество "голосов", необходимое, чтобы подтвердить наличие линии;
    :param min_line_length: Минимальная длина линий;
    :param max_line_gap: Максимальное расстояние между линиями для их объединения;
    :param L2gradient: Использование L2gradient;
    :return: Списки вертикальных линий, горизонтальных линий и прочих линий.
    """
    nested_rows = []

    if crop_box:
        try:
            img = img.crop(crop_box)
        except AttributeError:
            img = Image.fromarray(img)
            img = img.crop(crop_box)
    img = array(img)
    lines_image = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)    
    canny_image = cv2.Canny(lines_image, 
                            threshold1=weak_line_value, 
                            threshold2=strong_line_value, 
                            apertureSize=aperture_size,
                            L2gradient=l2_gradient)
    lines = cv2.HoughLinesP(canny_image,
                       rho=rho,
                       theta=pi / 180,
                       threshold=threshold,
                       minLineLength=min_line_length,
                       maxLineGap=max_line_gap
                       )

    try:
        lines = lines.squeeze()
    except:
        if DEBUG_MODE:
            logger(lines)
        return None, None, None
    coordinates = DataFrame(lines)
    try:
        coordinates.columns = ['x1', 'y1', 'x2', 'y2']
    except ValueError:
        logger
        return None, None, None
    # слияние ближайших координат разных строк
    for _, source_row in coordinates.iterrows():
        for _, matching_row in coordinates.iterrows():
            for pos in coordinates.keys():
                if abs(source_row[pos] - matching_row[pos]) <= tolerance:
                    if source_row[pos] < matching_row[pos]:
                        matching_row[pos] = source_row[pos]

    # выпрямление линий
    for _, matching_row in coordinates.iterrows():
        if abs(matching_row['x1'] - matching_row['x2']) <= tolerance:
            matching_row['x2'] = matching_row['x1']
        if abs(matching_row['y1'] - matching_row['y2']) <= tolerance:
            matching_row['y2'] = matching_row['y1']

    # удаление вложенных линий
    for coordinates_id, source_row in coordinates.query('x1 == x2').iterrows():
        for _, matching_row in coordinates.query('x1 == x2').iterrows():
            if source_row['y1'] > matching_row['y1'] and source_row['y2'] < matching_row['y2']:
                nested_rows.append(coordinates_id)
    for coordinates_id, source_row in coordinates.query('y1 == y2').iterrows():
        for _, matching_row in coordinates.query('y1 == y2').iterrows():
            if source_row['x1'] > matching_row['x1'] and source_row['x2'] < matching_row['x2']:
                nested_rows.append(coordinates_id)
    
    coordinates.drop(nested_rows, axis=0)

    # исправление направления линий
    for _, coord_row in coordinates.iterrows():
        if coord_row['x1'] > coord_row['x2']:
            coord_row['x1'], coord_row['x2'] = coord_row['x2'], coord_row['x1']
        if coord_row['y1'] > coord_row['y2']:
            coord_row['y1'], coord_row['y2'] = coord_row['y2'], coord_row['y1']

    coordinates = coordinates.drop_duplicates().reset_index(drop=True)

    horizontal_lines = coordinates[coordinates['y1'] == coordinates['y2']]
    vertical_lines = coordinates[coordinates['x1'] == coordinates['x2']]
    other_lines = coordinates.query('y1 != y2 and x1!= x2').reset_index(drop=True)
    return horizontal_lines.sort_values('y1'), vertical_lines.sort_values('x1'), other_lines

In [7]:
def print_lines(cropped_image, lines=None) -> None:
    draw = ImageDraw.Draw(cropped_image, 'RGBA')

    try:
        for _, row in lines.iterrows():
            draw.rectangle(row.tolist(), outline=(255, 0, 0, 100), width=3)
    except AttributeError:
        pass
    img_array = array(cropped_image)
    plt.figure(figsize=(15, 15))
    plt.imshow(img_array)
    plt.show()

In [8]:
def get_actual_page_list(declaration_path: str):
    """
    Распознаёт данные о таре, коде товара, количестве товара, весе брутто и нетто на форме декларации.
    :param declaration_path: Абсолютный путь к файлу декларации.
    :return: Список распознанных данных декларации.
    """
    table = []
    actual_page_list = []
    for actual_image in merge_pdf_pages(declaration_path, ['добавочный лист', 'декларация', 'добавочный лист',
                                                           'декларации на'], 'дополнение'):
        actual_image = Image.fromarray(actual_image)
        table = [actual_image]
        start_coord, end_coord = None, None
        ocr_all_page_data = ocr_img(
            actual_image,
            smb_unity_width=0.5,
            detail=True,
            max_size=7740
        )
        if not ocr_all_page_data:
            if DEBUG_MODE:
                logger(f'В файле {declaration_path} на странице {actual_image_list.index(actual_image)} обнаружена\
                 предположительно пустая таблица. Она будет проигнорирована. Необходимо проверить достоверность данных')
            continue

        for (topleft, topright, downright, downleft), value, probability in ocr_all_page_data:
            value = value.lower()

            if (re.search('маркировка и количество', value)
                or re.search('маркировка', value)
                or re.search('грузовые места', value)
                or re.search('код товара', value)
                or re.search(r'\w{5,7} контейнеров', value)
                or re.search('отличительные особенности', value)
            ):
                if not start_coord:
                    start_coord = [0, downleft[1] - 5]

            if (re.search('цена товара', value)
                    or re.search('код мо', value)
                    or re.search(r'дополнит[. ]?единицы', value)):
                if start_coord:
                    end_coord = [actual_image.size[0], downleft[1] + 60]

            if start_coord and end_coord:
                crop_box = start_coord
                crop_box.extend(end_coord)
                table.append(crop_box)
                crop_box, start_coord, end_coord = None, None, None
        actual_page_list.append(table)
    return actual_page_list

In [9]:
 def get_images_with_required_data(target_img: "PIL.Image", crop_box: list) -> tuple[Image, Image, Image, Image]:
    """
    Обрезает исходное изображение по координатам необходимых данных в декларациях
    :param target_img: Исходное изображение
    :param crop_box: Базовая область на изображении для распознания
    :param debug_mode: Предоставляет дополнительные данные для определения багов
    :return:
    """
    # Выделение таблицы
    cropped_image = target_img.crop(crop_box)
    _, v_lines, _ = lines_recognition(
        target_img,
        crop_box,
        tolerance=5,
        threshold=10,
        min_line_length=50,
        max_line_gap=1,
        weak_line_value=1,
        strong_line_value=1,
        rho=0.01,
        aperture_size=3,
        l2_gradient=False
    )
    v_lines['y1'] = 0
    v_lines['y2'] = cropped_image.size[1]
    v_lines = v_lines.drop_duplicates().reset_index(drop=True)
    if v_lines.shape[0] >= 2:
        if DEBUG_MODE:
            logger('Успех разделения на блоки товара и данных.')
    else:
        if DEBUG_MODE:
            logger('Провал разделения на блоки товара и данных.')
        return

    # Нарезка изображения на ячейки
    if v_lines.shape[0] >= 2:
        if DEBUG_MODE:
            logger('Грубая нарезка таблицы')
        # Грубая обрезка блоков
        data_img = cropped_image.crop((
            v_lines.loc[0, 'x1'] + 1, 
            v_lines.loc[0, 'y1'] + 1, 
            v_lines.loc[1, 'x1'], 
            v_lines.loc[1, 'y2']
        ))
        
        codes_weight_img = cropped_image.crop((
            v_lines.loc[1, 'x1'] + 3, 
            v_lines.loc[1, 'y1'] + 1, 
            cropped_image.size[0] * 0.9, 
            cropped_image.size[1]
        ))
            
        if DEBUG_MODE:
            print_lines(cropped_image, v_lines)
        
        # Распознание линий на блоке кодов и массы
        if DEBUG_MODE:
            logger('Распознание линий на блоке кодов и массы')
        try:
            codes_weight_h_lines, _, _ = lines_recognition(
                codes_weight_img,
                tolerance=10,
                threshold=10,
                min_line_length=200,
                max_line_gap=1,
                weak_line_value=1,
                strong_line_value=1,
                rho=0.001,
                aperture_size=3,
                l2_gradient=False
            )
            codes_weight_h_lines['x1'] = 0
            codes_weight_h_lines['x2'] = cropped_image.size[0]
            codes_weight_h_lines = codes_weight_h_lines.drop_duplicates().reset_index(drop=True)
        except:
            if DEBUG_MODE:
                logger('Ошибка при распознании линий на блоке кодов и массы: "NoneType" object does not support item assignment')
                logger(codes_weight_h_lines)
                print_lines(codes_weight_img)
            
        # Распознание линий на блоке описания товара
        if DEBUG_MODE:
            logger('Распознание линий на блоке описания товара')
        try:
            data_h_lines, _, _ = lines_recognition(
                data_img,
                tolerance=20,
                threshold=5,
                min_line_length=500,
                max_line_gap=1,
                weak_line_value=1,
                strong_line_value=1,
                rho=0.001,
                aperture_size=3,
                l2_gradient=False
            )
            data_h_lines['x1'] = 0
            data_h_lines['x2'] = data_img.size[0]
            data_h_lines = data_h_lines.drop_duplicates().reset_index(drop=True)

            if data_h_lines.shape[0] == 1:
                data_img = data_img.crop((
                    0, 
                    0, 
                    data_h_lines.loc[0, 'x2'], 
                    data_h_lines.loc[0, 'y2']
                ))
            else:
                if DEBUG_MODE:
                    print_lines(data_img, data_h_lines)
                    logger(f'Количество найденных строк отличается от ожидаемого. {data_h_lines.shape[0]}/1. \
    Необходимо изменить параметры распознания горизонтальных линий для изображения data_img.')
        except TypeError:
            if DEBUG_MODE:
                print_lines(data_img, data_h_lines)

        if DEBUG_MODE:
            logger('Нарезка блока кодов и массы')
        if codes_weight_h_lines.shape[0] >= 5:
            product_code_img = codes_weight_img.crop((
                0, 
                0,
                codes_weight_h_lines.loc[0, 'x2'] * 0.1,
                codes_weight_h_lines.loc[0, 'y2']
            ))
            brutto_img = codes_weight_img.crop((
                codes_weight_h_lines.loc[1, 'x2'] * 0.095,
                codes_weight_h_lines.loc[0, 'y1'] + (codes_weight_h_lines.loc[1, 'y2'] - codes_weight_h_lines.loc[0, 'y1']) / 3,
                codes_weight_h_lines.loc[1, 'x2'] * 0.2,
                codes_weight_h_lines.loc[1, 'y2']
            ))
            netto_img = codes_weight_img.crop((
                codes_weight_h_lines.loc[2, 'x2'] * 0.095,
                codes_weight_h_lines.loc[1, 'y1'] + (codes_weight_h_lines.loc[2, 'y2'] - codes_weight_h_lines.loc[1, 'y1']) / 3,
                codes_weight_h_lines.loc[2, 'x2'] * 0.2,
                codes_weight_h_lines.loc[2, 'y2']
            ))
            product_amount_img = codes_weight_img.crop((
                codes_weight_h_lines.loc[codes_weight_h_lines.index[-2], 'x1'] + 1,
                codes_weight_h_lines.loc[codes_weight_h_lines.index[-2], 'y1'] + (
                    (codes_weight_h_lines.loc[codes_weight_h_lines.index[-1], 'y2'] - 
                     codes_weight_h_lines.loc[codes_weight_h_lines.index[-2], 'y1']) / 4),
                codes_weight_h_lines.loc[codes_weight_h_lines.index[-1], 'x2'] * 0.1,
                codes_weight_h_lines.loc[codes_weight_h_lines.index[-1], 'y2']
            ))
            if DEBUG_MODE:
                print_lines(codes_weight_img, codes_weight_h_lines)
                print_lines(product_code_img)
                print_lines(brutto_img)
                print_lines(netto_img)
                print_lines(product_amount_img)
        else:
            if DEBUG_MODE:
                logger(codes_weight_h_lines)
                print_lines(codes_weight_img, codes_weight_h_lines)
                logger(f'Количество найденных строк отличается от ожидаемого. {codes_weight_h_lines.shape[0]}/6. \
Необходимо изменить параметры распознания горизонтальных линий для изображения codes_weight_img.')
    else:
        if DEBUG_MODE:
            logger('Не найдено достаточно линий на изображении.')
            logger(v_lines)
            print_lines(cropped_image, v_lines)
        else: 
            pass
    if data_img and product_code_img and brutto_img and netto_img and product_amount_img:
        return data_img, product_code_img, brutto_img, netto_img, product_amount_img
    else:
        return

In [10]:
def declaration_data_cleanup(df: DataFrame) -> DataFrame:
    container_types = []
    rep = {'С': 'C', 'Т': 'T', 'К': 'K', 'Р': 'P', 'В': 'B', 'Е': 'E', 'Н': 'H', 'М': 'M', 'О': 'O', 'Х': 'X', 'З': '3', 'О': '0', 'O': '0'}
    data = df['наименование']
    # Сжатие списка в списке
    data = data.apply(lambda value: value[0])
    data = data.dropna()
    # Очистка от знаков препинаний и объединение в одну строку
    data = data.apply(lambda values: ','.join([re.sub(r'[:;]', ' ', value) for value in values]))
    # Замена знаков на английские для простоты последующего выделенния
    data = data.apply(lambda value: value.upper().translate(str.maketrans(rep)))
    # Исправление некоторых ошибок распознания и разделение    
    data = data.apply(lambda value: re.sub(r'(\d{1,3})(,)([A-Z]{2})', r'\1-\3', value).split(','))
    # Выделение информации о упаковке
    data = data.apply(lambda value_list: [value.strip() for value in value_list if re.search(
        r'(\b[A-Z]{2}[- ]\d{1,3}\b)|(\b\d[|/\\ ]{2,3}[A-Z]{2}\b)|(\b\d{1,2}\.\d[ -]?[A-Z]{2}\b)', value)]) # <- соответственно: СТ-1 или 1 \ РХ или 2.1 РК
    # Очистка от лишних символов
    data = data.apply(lambda values: [re.sub(
        r'(.*)((\b[A-Z]{2}[- ]\d{1,3}\b)|(\b\d[|/\\ ]{2,3}[A-Z]{2}\b)|(\b\d{1,2}\.\d ?[A-Z]{2}\b))(.*)', r'\2', value.upper()) for value in values])
    # Приведение к единому формату данных тары
    data = data.apply(lambda values: [re.sub(r'(\b[A-Za-z]{2})([- ])(\d{1,5}\b)', r'\1-\3', value) for value in values])
    data = data.apply(lambda values: [re.sub(r'(\d)[|/\\ ]{2,3}([A-Za-z]{2})', r'\2-\1', value) for value in values])
    data = data.apply(lambda values: [re.sub(r'(\b\d{1,2}\.\d)([ -]?)([A-Z]{2}\b)', r'\3-\1', value) for value in values])
    if DEBUG_MODE:
        print(f'Приведено к единому формату данных\n{data}\n')
    # Создание списка уникальных наименований тары
    for entry in data:
        for value in entry:
            value = value.split('-')
            container_types.append(value[0])
    # добавление данных о упаковке в датафрейм
    data = DataFrame(data)
    for container in set(container_types):
        data[container] = data['наименование'].apply(lambda values: [value.split('-')[1] for value in values if value.split('-')[0] == container])
        data[container] = data[container].apply(lambda value: ' '.join(value))
        data[container] = data[container].apply(lambda value: value if value != '' else None)
    
    if DEBUG_MODE:
        logger(f'Контейнеры добавлены\n{data}')
    
    # Слияние с остальными данными
    data = data.drop('наименование', axis=1)
    df = concat((df, data), axis=1)
    return df


def product_code_cleanup(data: Series) -> Series:
    data = data.apply(lambda value: ' '.join(value[0]) if value[0] else None)
    return data.apply(lambda value: re.sub(r'(.*)(([ |/\\.,:;?]|\b)\w{10}([ |/\\.,:;?]|\b))(.*)', r'\2', value) if value else None)


def brutto_cleanup(data: Series) -> Series:
    data = data.apply(lambda value: ' '.join(value[0]) if value[0] else value[0])
    data = data.apply(lambda value: re.sub(r'(.*)(\b\d{1,7}\.\d{1,3}?\b)(.*)', r'\2', value) if value else None)
    return data.apply(lambda value: re.sub(r'\D', r'', value) if value else None)


def netto_cleanup(data: Series) -> Series:
    data = data.apply(lambda value: ' '.join(value[0]) if value[0] else value[0])
    data = data.apply(lambda value: re.sub(r'(\b\d{1,5}(\.\d{1,3})?[/|\\\[\]]){1,2}(\d{1,5}\.\d{1,3}\b)', r'\3', value) if value else None)
    return data.apply(lambda value: re.sub(r'\D', r'', value) if value else None)


def product_amount_cleanup(data: DataFrame, debug_mode: bool = False) -> Series:
    rep = {'З': '3', 'О': '0', 'O': '0'}
    
    data['количество'] = data['количество'].apply(lambda values: [value.upper().translate(str.maketrans(rep)) for value in values] if values else None)
    if debug_mode:
        logger(f'Начальный датафрейм количества товара\n{data}\n')
    data['количество'] = data['количество'].apply(lambda values: ' '.join([value for value in values if re.search(
        r'(\d{1,5}(\. ?\d{1,3})?)([/ ]{,3})([A-ZА-Я]{1,4})([/ ]{,3})([\d&?%$#]{2,4})', value)]) if values else None)
    data['количество'] = data['количество'].apply(lambda value: re.sub(
        r'(\d{1,5}(\. ?\d{1,3})?)([/ ]{,3})([A-ZА-Я]{1,4})([/ ]{,3})([\d&?%$#]{2,4})', r'\1 \4', value) if value else None)
    
    if DEBUG_MODE:
        logger(f'Количество добавлено\n{data}\n')
    for ind in data.index:
        if data.loc[ind, 'количество'] == '':
            try:
                temp = [value for value in data.loc[ind, 'наименование'][0] if re.search(
                    r'(\b\d{1,5}\.\d{1,3})( )?([А-ЯA-Z]{1,4})( )?(\(\d{2,4}\))', value)]
                if isinstance(temp, list):
                    try:
                        data.loc[ind, 'количество'] = re.sub(
                            r'(.*)(\b\d{1,5}\.\d{1,3})( \d){,3}([А-ЯA-Z ]{2,3})(.{1,5})?(\d{2,4})(.*)', r'\2 \4 <!>', temp[0])
                    except:
                        data.loc[ind, 'количество'] = temp
            except ValueError:
                pass
    return data['количество']

In [11]:
def get_declaration_data(declaration_path):
    actual_page_list = get_actual_page_list(declaration_path)
    result_string = []
    for page in actual_page_list:
        for crop_list in page[1:]:
            data_img, code_img, brutto_img, netto_img, product_amount_img = get_images_with_required_data(page[0], crop_list);
            if data_img:
                data = ocr_img(
                    data_img,
                    detail=False,
                    paragraph=False,
                    max_size = 7740,
                ), 
                product_code = ocr_img(
                    code_img,
                    detail=False,
                    paragraph=False,
                    max_size = 7740,
                ), 
                brutto = ocr_img(
                    brutto_img,
                    detail=False,
                    paragraph=False,
                    max_size = 7740,
                ),
                netto = ocr_img(
                    netto_img,
                    detail=False,
                    paragraph=False,
                    max_size = 7740,
                ),
                product_amount = ocr_img(
                    product_amount_img,
                    lang_list=['ru'],
                    detail=False,
                    paragraph=False,
                    max_size = 7740,
                )
                result_string.append([data, product_code, brutto, netto, product_amount]);
            else:
                if DEBUG_MODE:
                    logger('Отсутствует изображение с описанием товара')
                continue
    result_df = DataFrame(result_string)
    result_df.columns = ['наименование', 'код_продукта', 'вес_брутто', 'вес_нетто', 'количество'];
    result_df = declaration_data_cleanup(result_df)
    result_df['код_продукта'] = product_code_cleanup(result_df['код_продукта'])
    result_df['вес_брутто'] = brutto_cleanup(result_df['вес_брутто']).astype(float)
    result_df['вес_нетто'] = netto_cleanup(result_df['вес_нетто']).astype(float)
    result_df['количество'] = product_amount_cleanup(result_df.loc[:,['наименование', 'количество']]);
    result_df['наименование'] = re.sub(r'(.*[\\/])(.*)', r'\2', declaration_path)
    return result_df

In [12]:
def declaration_to_excel(current_df: DataFrame, output_file_path: str) -> None:
        try:
            if os.path.isfile(output_file_path) or len(read_excel(output_file_path).index) != 0:
                file_data = read_excel(output_file_path)
                united_data = concat([file_data, current_df], axis=0)
                united_data.to_excel(
                    output_file_path,
                    index=False)
        except FileNotFoundError:
            current_df.to_excel(output_file_path, index=False)

In [13]:
def update_codes_from_decision(decision_path, output_file_name: str = 'Список_ТН_ВЭД_ЕАЭС.xlsx'):
    """Функция управления обработкой Постановления.
    Принимает:
    - список из изображений страниц документа;
    Возвращает:
    - список кодов ТН ВЭД ЕАЭС"""
    tnved_codes = []
    kpes_codes = []
    crop_box_list = []
    image_size = []
    for page in pymupdf.open(decision_path).pages(start=4, stop=7):
        def data_uniqueness_filter(data):
            """Очищает строковые данные кодов ТН ВЭД ЕАЭС для 10ти, 9ти, 6ти и 4х значных кодов в формате (хххх хх ххх х)"""
            code10 = re.search(r'([а-яА-Яa-zA-Z])*(\d)?(\d{4})( \d{2})( \d{3})( \d)', data)
            code9 = re.search(r'([а-яА-Яa-zA-Z])*(\d)?(\d{4})( \d{2})( \d{3})(\d)?', data)
            code6 = re.search(r'([а-яА-Яa-zA-Z]{,2}?\d? ?)?(\d{4})( \d{2})(\d)?', data)
            code4 = re.search(r'([а-яА-Яa-zA-Z]{,2}?\d? ?)?(\d{4})(\d)?', data)
            if code10:
                return re.sub(r'([а-яА-Яa-zA-Z])*(\d)?(\d{4})( \d{2})( \d{3})( \d)', r'\2\3\4\5\6',
                              code10.group())
            if code9:
                return re.sub(r'([а-яА-Яa-zA-Z])*(\d)?(\d{4})( \d{2})( \d{3})(\d)?', r'\2\3\4', code9.group())
            if code6:
                return re.sub(r'([а-яА-Яa-zA-Z]{,2}?\d? ?)?(\d{4})( \d{2})(\d)?', r'\2\3', code6.group())
            if code4:
                return re.sub(r'([а-яА-Яa-zA-Z]{,2}?\d? ?)?(\d{4})(\d)?', r'\2', code4.group())

        image = page.get_pixmap(dpi=400).pil_image()
        if image.size[1] > image.size[0]:
            continue

        image_size = image.size
        table_lines_coordinates = lines_recognition(image, tolerance=50)
        # self._draw_ocr_img(image, table_lines_coordinates)
        crop_box_list.append([
            table_lines_coordinates['x2'].min(),
            table_lines_coordinates['y2'].max(),
            table_lines_coordinates['x1'].max(),
            image.size[1]
        ])
    crop_box_list = DataFrame(crop_box_list, columns=['x1', 'y1', 'x2', 'y2'])

    crop_box = ([
        crop_box_list['x1'].min() if (crop_box_list['x1'].min() < image_size[0] / 2) else (image_size[0] / 3),
        crop_box_list['y1'].max(),
        crop_box_list['x2'].max() if (crop_box_list['x2'].max() > image_size[0] / 2) else (2 * image_size[0] / 3),
        crop_box_list['y2'].max()
    ])
    for page in pymupdf.open(decision_path).pages():
        image = page.get_pixmap(dpi=200).pil_image()
        if image.size[1] > image.size[0]:
            continue
        ocr_raw_result = ocr_img(image, crop_box, detail=False, max_size=7740, paragraph=False)
        if ocr_raw_result:
            for value in ocr_raw_result:
                tnved_code = re.search(r'(\w{1,2})? ?\d{4,5} ?(\d{2})? ?(\d{3})? ?\d?', value)
                if tnved_code:
                    tnved_codes.append(
                        re.sub(
                            r'([а-яА-Я]{,2}?[0-9]? ?)(\d{4})( \d{2})?( \d{3})?( \d)?',
                            r'\2\3\4\5',
                            tnved_code.group()
                        )
                    )
                kpes_code = re.search(r'\d{2}[.,]\d{2}[.,]\d{2}[.,]\d{3}', value)
                if kpes_code:
                    kpes_codes.append(kpes_code.group())

    tnvd_df = DataFrame(
        DataFrame(
            tnved_codes[2:],  # На первой странице определяет 2 года в заголовках таблицы
            columns=['код_тн_вэд_еаэс']
        ).apply(data_uniqueness_filter).unique()).sort_values(key=lambda x: x.str.len(),
                                                              ascending=False).reset_index(drop=True)
    kpes_df = DataFrame(list(set(kpes_codes)), columns=['код_кпес']).sort_values()

    try:
        concat([tnvd_df, kpes_df], axis=1).to_excel(re.sub(r'(.*)([/\\])(.*)', r'\1\2', decision_path) + output_file_name, index=False)
    except PermissionError as ex:
        logger(f'ОШИБКА ДОСТУПА: Доступ к файлу запрещён. Текст ошибки:\n{ex}.')

In [14]:
%%time
# Загрузка кодов ТНВЭД
try:
    codes = read_excel(CODES_PATH)
    logger('Коды найдены')
except FileNotFoundError:
    logger('Не найден файл с кодами ТНВЭД и КПЕС. Идёт извлечение ->')
    codes = update_codes_from_decision(DECISION_PATH)

# Обработка деклараций
logger('Извлечение данных из деклараций')
if os.path.isdir(DECLARATION_FOLDER):
    for file in os.listdir(DECLARATION_FOLDER):
        if os.path.isfile(f'{DECLARATION_FOLDER}/{file}'):
            current_file_df = get_declaration_data(f'{DECLARATION_FOLDER}/{file}')
            declaration_to_excel(current_file_df, REPORT_PATH)

# Загрузка уникальных кодов и данных деклараций из файла отчета
declaration_df = read_excel(REPORT_PATH)
declaration_df['код_продукта'] = declaration_df['код_продукта'].astype(object);
declaration_df = declaration_df.drop(declaration_df.query('код_продукта.isna() and вес_брутто.isna()').index)
declaration_df = declaration_df.fillna('')
decision_df = read_excel(CODES_PATH)['код_тн_вэд_еаэс'].dropna()

codes = DataFrame()
codes[10] = decision_df.apply(lambda x: x if len(x) == 10 else None)
codes[9] = decision_df.apply(lambda x: x if len(x) == 9 else None)
codes[6] = decision_df.apply(lambda x: x if len(x) == 6 else None)
codes[4] = decision_df.apply(lambda x: x if len(x) == 4 else None)

# Проверка на наличие товаров из деклараций в РОП и сохранение результатов в файл отчета
for code in codes.keys().tolist():
    declaration_df['в_роп'] = declaration_df['код_продукта'].apply(
        lambda value: 'да' if value and (str(value)[:code] in codes[code].values) else 'нет')
declaration_df.to_excel(REPORT_PATH, index=False)

Коды найдены
Извлечение данных из деклараций
CPU times: total: 27min 34s
Wall time: 16min 54s


In [15]:
display(declaration_df)

Unnamed: 0,наименование,код_продукта,вес_брутто,вес_нетто,количество,NG,CT,PX,PK,CN,BX,в_роп
0,10131010_281223_3426654.pdf,6103420001,27000.0,27000.0,32.00 ШТ,,,1.0,,,,да
1,10131010_281223_3426654.pdf,6104620000,8400.0,8400.0,10.00 ШТ,,,1.0,,,,да
2,10131010_281223_3426654.pdf,6109100000,6600.0,6600.0,8.00 ШТ,3.0,,1.0,,,,да
3,10131010_281223_3426654.pdf,6109100О00,9100.0,9100.0,11.00 ШТ,,1.0,,,,,да
4,10131010_281223_3426654.pdf,6109100000,10100.0,10100.0,12.00 ШТ,5.0,1.0,,,,,да
...,...,...,...,...,...,...,...,...,...,...,...,...
409,GTD_10702070_060324_3096871 выпуск.pdf,9603909900,987000.0,900000.0,,,,,,2.0,90.0,нет
412,GTD_10702070_060824_3264842[1].pdf,8467292000,651000.0,551000.0,600 ШТ,,100.0,,2.1,1.0,,да
413,GTD_10702070_060824_3264842[1].pdf,8467292000,986000.0,786000.0,,,,,,1.0,,да
414,GTD_10702070_060824_3264842[1].pdf,8467292000,996530.0,863000.0,,,125.0,,2.1,,,да
