In [1]:
!pip install openpyxl
!pip install pymorphy2[fast]

In [34]:
import re
from typing import List, Dict, Union, Tuple

import numpy as np
import pandas as pd
from functools import partial
import pymorphy2
from sklearn.preprocessing import MultiLabelBinarizer
from tqdm import tqdm

# from .prod_safety.parser import Parser
# from .prod_safety.predict import predict_group, predict_regl, predict_tn_ved
# from .prod_safety.dataset import get_ids
# from .prod_safety.config import counts

regl_counts = [10, 20, 200, 2, 100, 50, 30, 40, 100, 25, 200, 150, 50, 20, 70, 30, 30, 200, 60, 60, 200, 80, 20, 20, 20, 100]


def get_ids(df: pd.DataFrame) -> Tuple[List[int]]:
    not_nan_ids = df[~df['Общее наименование продукции'].isna()].index.to_list()
    valid_tn_ved_ids = df.loc[~df['Общее наименование продукции'].isna() & (~df['Коды ТН ВЭД ЕАЭС'].isna())].index.to_list()
    regl_ids = df.loc[~df['Общее наименование продукции'].isna() & (~df['Технические регламенты'].isna())].index.to_list()
    return not_nan_ids, valid_tn_ved_ids, regl_ids


def predict_regl(df: pd.DataFrame, regl: List[str], counts: Union[List[int], None]=None) -> pd.DataFrame:
    def regl_prod(df, regl, counts):
        res = dict()
        if not counts:
            counts = [None] * len(regl)
        for i in range(len(regl)):
            x = df[df[regl[i]] == 1]['parsed_prod'].to_list()
            cap_words = [word for word in (' '.join(x)).split()]
            word_counts = Counter(cap_words)
            res[regl[i]] = [j[0] for j in word_counts.most_common(counts[i])]
        return res

    r_prod = regl_prod(df, regl, counts)

    def check_regl(df, r_prod, regl):
        res_arr = []
        for i in not_nan_ids:
            tmp = set(df['parsed_prod'][i].split())
            res = dict()
            for r in regl:
                tmp1 = set(r_prod[r])
                res[r] = len(tmp.intersection(tmp1))
            sorted_tuple = sorted(res.items(), key=lambda x: x[1], reverse=True)
            res_arr.append(sorted_tuple[0][0])
        return res_arr

    t = check_regl(df, r_prod, regl)
    return pd.DataFrame(t, index=not_nan_ids, columns=['new_regl'])


def predict_group(df: pd.DataFrame) -> list:
    pass


def predict_tn_ved(df: pd.DataFrame, ids_list: List[int], TN_VED_TAGS: Dict[str, List[str]]) -> List[str]:
    predicted_tn_ved = []
    for idx in tqdm(ids_list):
        points = {k: 0 for k, v in TN_VED_TAGS.keys()}
        for k, v in TN_VED_TAGS.items():
            points[k] = len(set(v).intersection(set(df.loc[idx, 'parsed_prod'].split(' '))))
        predicted_tn_ved.append(sorted(points.items(), key=lambda x: x[1], reverse=True)[0][0])
    return predicted_tn_ved


class Parser:
    def __init__(self, morph: pymorphy2.MorphAnalyzer, bar: bool=True):
        self.bar = bar
        self.morph = morph
        self.func_pos = {'INTJ', 'PRCL', 'CONJ', 'PREP'}
        self.ru_en = partial(re.sub, pattern=r'[^а-яА-Яa-zA-Z ]', repl='')
        self.ru = partial(re.sub, pattern=r'[^а-яА-Я ]', repl='')
        
    def check_pos(self, word: str) -> bool:
        return morph.parse(word)[0].tag.POS not in self.func_pos
    
    def preproc_text(self, text: str, parser: str='ru') -> str:
        assert parser in ('ru', 'en'), parser
        text = self.ru(string=text) if parser=='ru' else self.run_en(string=text)
        return [w for w in text.split(' ') if self.check_pos(w) and len(w) > 2]
    
    def parse_text(self, text: str, parser: str='ru') -> str:
        return ' '.join([morph.parse(w)[0].normal_form for w in self.preproc_text(text, parser)])
    
    def parse(self, texts: Union[str, List[str]], parser: str='ru') -> Union[str, List[str]]:
        is_str = False
        if isinstance(texts, str):
            is_str = True
            texts = [texts]
        parsed_texts = []
        for t in (tqdm(texts) if self.bar else texts):
            parsed_texts.append(' '.join(list(filter(lambda w: bool(w), self.parse_text(t, parser).strip().split(' ')))))
        return parsed_texts[0] if is_str else parsed_texts
    
def parse_products(df: pd.DataFrame, parser: Parser) -> List[str]:
    return parser.parse(df[~df['Общее наименование продукции'].isna()]['Общее наименование продукции'].to_list())

def parse_tnved(tnved: str) -> List[str]:
    if isinstance(tnved, int):
        return [str(tnved)[:2]]
    return [t[:2] for t in tnved.split('; ') if bool(t) and bool(t[:2])]
    
def extract_regl(df: pd.DataFrame) -> List[str]:
    return sorted(set(df[~df['Технические регламенты'].isna()]['Технические регламенты'].apply(lambda t: t.split('; ') if ';' in t else [t]).sum()))

def extract_groups(df: pd.DataFrame) -> List[str]:
    groups = sorted(set(df[~df['Группа продукции'].isna()]['Группа продукции'].apply(lambda t: t.split('; ') if ';' in t else [t]).sum()))
    
def create_one_hot(key: str, df: pd.DataFrame, one_hot: MultiLabelBinarizer) -> np.ndarray:
    return one_hot.fit_transform(df[~df[key].isna()][key].apply(lambda t: t.split('; ') if ';' in t else [t]))

In [3]:
DATA_PATH = '../input/product-safety/15072022.xlsx'
df = pd.read_excel(DATA_PATH)
orig_columns = df.columns

In [35]:
morph = pymorphy2.MorphAnalyzer()
parser = Parser(morph)
not_nan_ids, tn_ved_ids, regl_ids = get_ids(df)

In [8]:
parsed_products = parse_products(df, parser)
df.loc[not_nan_ids, 'parsed_prod'] = parsed_products

In [9]:
tnved_list = df.loc[sorted(set(tn_ved_ids).intersection(set(not_nan_ids)))]['Коды ТН ВЭД ЕАЭС'].apply(parse_tnved).to_list()
df.loc[~df['Общее наименование продукции'].isna() & (~df['Коды ТН ВЭД ЕАЭС'].isna()),
       'tn_ved_tags'] = tnved_list

In [10]:
L = []
for v in tnved_list:
    L.extend(v)

tn_ved_unique = sorted(set(L))

In [14]:
from collections import Counter
tn_ved_tags = {k: [] for k in tn_ved_unique if bool(k)}
for i, idx in tqdm(enumerate(tn_ved_ids)):
    for k in tn_ved_tags.keys():
        if k in df.loc[idx, 'tn_ved_tags']:
            if idx in not_nan_ids:
                tn_ved_tags[k].append(df.loc[idx, 'parsed_prod'])

In [15]:
TN_VED_TAGS = {k: (lambda c: [x[0] for x in c])(Counter(' '.join(v).split(' ')).most_common()) for k, v in tn_ved_tags.items()}

In [18]:
predicted_tn_ved = predict_tn_ved(df, not_nan_ids, TN_VED_TAGS)
df.loc[not_nan_ids, 'new_tn_ved'] = predicted_tn_ved

In [24]:
one_hot = MultiLabelBinarizer()

In [25]:
regl = extract_regl(df)
df.loc[~df['Технические регламенты'].isna(), regl] = one_hot.fit_transform(df[~df['Технические регламенты'].isna()]['Технические регламенты'].apply(lambda t: t.split('; ') if ';' in t else [t]))

In [26]:
df['new_regl'] = predict_regl(df, regl, regl_counts)

In [54]:
def check_regl_mistake(df, not_nan_ids, regl_ids):
    df['regl_mistake'] = 0
    IDS = sorted(set(not_nan_ids).intersection(set(regl_ids)))
    for idx in tqdm(IDS):
        new_r = df.loc[idx, 'new_regl']
        old_r = df.loc[idx, 'Технические регламенты'].split('; ')
        if new_r not in old_r:
            df.loc[idx, 'regl_mistake'] = 1
    return df['regl_mistake'].to_numpy()

In [55]:
def check_tnved_mistake(df, not_nan_ids, tn_ved_ids):
    df['tnved_mistake'] = 0
    IDS = sorted(set(not_nan_ids).intersection(set(tn_ved_ids)))
    for idx in tqdm(IDS):
        new_r = df.loc[idx, 'new_tn_ved']
        old_r = [t[:2] for t in str(df.loc[idx, 'Коды ТН ВЭД ЕАЭС']).split('; ')]
        if new_r not in old_r:
            df.loc[idx, 'tn_ved_mistake'] = 1
    return df['tn_ved_mistake'].to_numpy()

In [56]:
def check_group_mistake(df, not_nan_ids, group_ids):
    df['group_mistake'] = 0
#     IDS = sorted(set(not_nan_ids).intersection(set(group_ids)))
#     for idx in tqdm(IDS):
#         new_r = df.loc[idx, 'new_group']
#         old_r = df.loc[idx, 'Группа продукции'].split('; ')
#         if new_r not in old_r:
#             df.loc[idx, 'group_mistake'] = 1
    return df['group_mistake'].to_numpy()

In [68]:
check_regl_mistake(df, not_nan_ids, regl_ids)
check_tnved_mistake(df, not_nan_ids, tn_ved_ids)
check_group_mistake(df, not_nan_ids, df.index.to_list())

df['Наличие ошибки'] = (df['tnved_mistake'] + df['regl_mistake'] + df['group_mistake'])

In [75]:
df['Наличие ошибки'] = (df['tnved_mistake'] + df['regl_mistake'] + df['group_mistake'])

In [76]:
new_df = df[orig_columns.to_list() + ['Наличие ошибки']].copy()

In [77]:
new_df['Наличие ошибки'].sum()

In [73]:
new_df.to_excel('task1.xlsx')