In [None]:
import heapq
import random

In [None]:
import tqdm
import pandas as pd

from nile.api.v1 import Record
from nile.api.v1 import filters as nf
from nile.api.v1 import extractors as ne

In [None]:
pd.options.display.max_columns = 100

In [None]:
from zoo.remote_quality_control.dashboard import project_config as config
from pymlaas.models.remote_quality_control import constants
from zoo.utils.nile_helpers import dates

reload(config)

In [None]:
cluster = config.get_project_cluster()
job = cluster.job(). env(parallel_operations_limit=10)

In [None]:
START_DATE = '2018-12-01'
FINISH_DATE = '2019-05-29'

In [None]:
def exam_group_reducer(groups):

    for exam_id, records in groups:

        qc_type_list = []
        assessor_type_list = []
        resolution_list = []
        assessor_comment_list = []
        ml_list = []
        country_list = []

        final_resolution = None
        final_resolver = None

        for record in records:
            qc_type_list.append(record.get('qc_type'))
            assessor_type_list.append(record.get('assessor_type'))
            resolution_list.append(record.get('resolution'))
            assessor_comment_list.append(record.get('assessor_comment'))
            ml_list.append(record.get('ml'))
            country_list.append(record.get('country'))
            
            if record.get('resolution') == u'success':
                final_resolution = u'success'
                final_resolver = record.get('assessor')
            elif record.get('resolution') == u'block':
                final_resolution = u'block'

        if (config.RESOLUTION_SET.intersection(set(resolution_list))
                and u'human' in assessor_type_list):
            final_resolver = u'assessor'
        elif (config.RESOLUTION_SET.intersection(set(resolution_list))
                and set(assessor_type_list) == set([u'ml', None])):
            final_resolver = u'ml'
        elif (config.RESOLUTION_SET.intersection(set(resolution_list))
                and set(assessor_type_list) != set([u'ml'])
                and u'human' not in assessor_type_list):
            final_resolver = u'toloka'

        
        country = filter(None, country_list)
        if country:
            country = country[0]
        else:
            country = None
        city = record.get('city').decode('utf-8').lower()
        car_brand = record.get('car_brand')
        car_model = record.get('car_model')
        car_color = record.get('car_color')
        car_number = record.get('car_number')
        car_year = record.get('car_year')
        qc_date = record.get('qc_date')

        yield Record(exam_id=exam_id,
                     qc_type_list=qc_type_list,
                     assessor_type_list=assessor_type_list,
                     resolution_list=resolution_list,
                     assessor_comment_list=assessor_comment_list,
                     ml_list=ml_list,
                     city=city,
                     car_brand=car_brand,
                     car_model=car_model,
                     car_color=car_color,
                     car_number=car_number,
                     car_year=car_year,
                     qc_date=qc_date,
                     final_resolution=final_resolution,
                     final_resolver=final_resolver,
                     country=country)

In [None]:
def create_exam_group_mapper(date_to_map):

    def exam_group_mapper(records):

        counts = {'fielddate': date_to_map}

        # Overall metrics.
        counts['total'] = 0
        counts['total_int'] = 0
        counts['total_internat'] = 0
        counts['total_ml'] = 0
        counts['total_ml_int'] = 0
        counts['total_ml_internat'] = 0

        counts['no_ml'] = 0
        counts['no_ml_int'] = 0
        counts['no_ml_internat'] = 0

        counts['use_ml_false'] = 0
        counts['use_ml_false_int'] = 0
        counts['use_ml_false_ext'] = 0
        counts['use_ml_false_internat'] = 0

        counts['total_no_full_ml_resp'] = 0

        counts['total_exp'] = 0
        counts['total_exp_ml_unknown'] = 0
        counts['total_exp_asr_unknown'] = 0
        counts['total_exp_int'] = 0
        counts['total_exp_internat'] = 0

        counts['success_ml_prod'] = 0
        counts['block_ml_prod'] = 0
        counts['unknown_ml_prod'] = 0

        counts['success_ml_back'] = 0
        counts['block_ml_back'] = 0
        counts['unknown_ml_back'] = 0

        counts['success_ml_ext'] = 0
        counts['block_ml_ext'] = 0
        counts['unknown_ml_ext'] = 0

        counts['success_ml_int'] = 0
        counts['block_ml_int'] = 0
        counts['unknown_ml_int'] = 0

        counts['success_ml_internat'] = 0
        counts['block_ml_internat'] = 0
        counts['unknown_ml_internat'] = 0

        # Reasons of blocks.
        for background_block_reason in config.BACKGROUND_BLOCK_REASON_SET:
            counts[u'back_' + background_block_reason.lower()] = 0
        for background_block_reason in config.BACKGROUND_BLOCK_REASON_SET:
            counts[u'back_' + background_block_reason.lower() + '_internat'] = 0

        for prod_reason in config.PROD_BLOCK_REASON_SET:
            counts[u'prod_' + prod_reason.lower()] = 0
        for prod_reason in config.PROD_BLOCK_REASON_SET:
            counts[u'prod_' + prod_reason.lower() + '_internat'] = 0

        # Experiment metrics.
        for experiment_metric in config.EXPERIMENT_METRICS:
            counts[experiment_metric] = 0

        counts['success_assessor_exp_ml'] = 0
        counts['block_assessor_exp_ml'] = 0
        counts['success_assessor_exp_noml'] = 0
        counts['block_assessor_exp_noml'] = 0

        # Ranking metrics.
        ranking_score_list = []
        ranking_decision_list = []
        for ranking_drop_prop in config.RANKING_DROP_PROPORTIONS:
            counts['success_bottom_' + str(ranking_drop_prop) + '_ml'] = 0
            counts['block_bottom_' + str(ranking_drop_prop) + '_ml'] = 0
            counts['success_bottom_' + str(ranking_drop_prop) + '_random'] = 0
            counts['block_bottom_' + str(ranking_drop_prop) + '_random'] = 0
        
        for rqc_type in config.RELEVANT_RQC_TYPES:
            counts[rqc_type] = 0
        
        for record in records:
            # Flags.
            passed_through_ml = False
            international_exam = (record.get('country')
                                  not in {'Россия', u'Россия'})
            no_ml = True
            use_ml_true = False
            interior_exam_enabled = (record.get('city').lower()
                                     in config.INTERIOR_CITIES)
            full_ml_response = False

            counts['total'] += 1
            if interior_exam_enabled:
                counts['total_int'] += 1
            if international_exam:
                counts['total_internat'] += 1
            
            for rqc_type in config.RELEVANT_RQC_TYPES:
                if rqc_type in record['qc_type_list']:
                    counts[rqc_type] += 1
            
            if not filter(None, record.get('ml_list')):
                ml_response = {}
                counts['no_ml'] += 1
                if interior_exam_enabled:
                    counts['no_ml_int'] += 1
                if international_exam:
                    counts['no_ml_internat'] += 1
            else:
                ml_response = filter(None, record.get('ml_list'))[0]
                
                full_ml_response = all(
                    [field in ml_response for field in config.REQUIRED_FIELDS]
                )
                if not full_ml_response:
                    counts['total_no_full_ml_resp'] += 1
                    continue
                
                use_ml_true = (
                    ml_response.get('result_mismatch_reason')
                    != u'use_ml_false'
                )
                if not use_ml_true:
                    counts['use_ml_false'] += 1
                    counts['use_ml_false_ext'] += 1
                    if international_exam:
                        counts['use_ml_false_internat'] += 1
                    if interior_exam_enabled:
                        counts['use_ml_false_int'] += 1

                passed_through_ml = True
                counts['total_ml'] += 1
                if interior_exam_enabled:
                    counts['total_ml_int'] += 1
                if international_exam:
                    counts['total_ml_internat'] += 1
                
                experiment = (u'experiment_double_check'
                              in ml_response.get('experiments_list', {}))
                passed_to_assessor = (ml_response.get('exam_inspector')
                                      == u'assessor')
                if experiment:
                    counts['total_exp'] += 1
                    if interior_exam_enabled:
                        counts['total_exp_int'] += 1
                    if international_exam:
                        counts['total_exp_internat'] += 1

            if ml_response:

                # Overall.
                if use_ml_true and not passed_to_assessor:
                    if (record.get('final_resolution') == u'success'
                            and ml_response['result'] == u'success'):
                        if u'exam_inspector' in ml_response:
                            if ml_response.get('exam_inspector') == u'ml':
                                counts['success_ml_prod'] += 1
                                if international_exam:
                                    counts['success_ml_internat'] += 1
                        else:
                            counts['success_ml_prod'] += 1
                            if international_exam:
                                counts['success_ml_internat'] += 1
                    elif (record.get('final_resolution') == u'block'
                            and ml_response['result'] == u'block'):
                        if u'exam_inspector' in ml_response:
                            if ml_response.get('exam_inspector') == u'ml':
                                counts['block_ml_prod'] += 1
                                if international_exam:
                                    counts['block_ml_internat'] += 1
                        else:
                            counts['block_ml_prod'] += 1
                            if international_exam:
                                counts['block_ml_internat'] += 1
                    elif ml_response['result'] == u'unknown':
                        counts['unknown_ml_prod'] += 1
                        if international_exam:
                            counts['unknown_ml_internat'] += 1

                    if ml_response['actual_result'] == u'success':
                        counts['success_ml_back'] += 1
                    elif ml_response['actual_result'] == u'block':
                        counts['block_ml_back'] += 1
                    elif ml_response['actual_result'] == u'unknown':
                        counts['unknown_ml_back'] += 1

                    # Exterior.
                    if u'exterior_result' in ml_response:
                        if ml_response['exterior_result'] == u'success':
                            counts['success_ml_ext'] += 1
                        elif ml_response['exterior_result'] == u'block':
                            counts['block_ml_ext'] += 1
                        elif ml_response['exterior_result'] == u'unknown':
                            counts['unknown_ml_ext'] += 1
                    else:
                        if ml_response.get('actual_result') == u'success':
                            counts['success_ml_ext'] += 1
                        elif ml_response.get('result') == u'block':
                            counts['block_ml_ext'] += 1
                        elif (ml_response.get('actual_result') != u'success'
                                  and ml_response.get('result') != u'block'):
                            counts['unknown_ml_ext'] += 1
                    # Interior.
                    if interior_exam_enabled:
                        if u'interior_result' in ml_response:
                            if ml_response['interior_result'] == u'success':
                                counts['success_ml_int'] += 1
                            elif ml_response['interior_result'] == u'block':
                                counts['block_ml_int'] += 1
                            elif ml_response['interior_result'] == u'unknown':
                                counts['unknown_ml_int'] += 1
                        else:
                            counts['unknown_ml_int'] += 1

                    # Reasons.
                    for reason_ext in constants.ReasonsNames.EXTERIOR:
                        if isinstance(ml_response['incorrect'], list):
                            if reason_ext in ml_response['incorrect']:
                                counts['back_' + reason_ext.lower()] += 1
                                if international_exam:
                                    counts['back_' + reason_ext.lower()
                                           + '_internat'] += 1
                    if interior_exam_enabled:
                        for reason_int in constants.ReasonsNames.INTERIOR:
                            if isinstance(ml_response['incorrect'], list):
                                if reason_int in ml_response['incorrect']:
                                    counts['back_' + reason_int.lower()] += 1
                                    if international_exam:
                                        counts['back_' + reason_int.lower()
                                               + '_internat'] += 1

                    for prod_reason in config.PROD_BLOCK_REASON_SET:
                        if isinstance(ml_response['incorrect'], list):
                            if (prod_reason in ml_response['incorrect']
                                    and ml_response.get('result') == u'block'):
                                counts['prod_' + prod_reason.lower()] += 1
                                if international_exam:
                                    counts['prod_' + prod_reason.lower()
                                           + '_internat'] += 1

                if experiment and passed_to_assessor:
                    if (ml_response['result'] == u'success'
                            and record.get('final_resolution') == u'success'):
                        counts['true_negative'] += 1
                    elif (ml_response['result'] == u'block'
                            and record.get('final_resolution') == u'success'):
                        counts['false_positive'] += 1
                    elif (ml_response['result'] == u'block'
                            and record.get('final_resolution') == u'block'):
                        counts['true_positive'] += 1
                    elif (ml_response['result'] == u'success'
                            and record.get('final_resolution') == u'block'):
                        counts['false_negative'] += 1
                    if record.get('final_resolution') not in config.RESOLUTION_SET:
                        counts['total_exp_asr_unknown'] += 1

                    if ml_response['result'] in config.RESOLUTION_SET:
                        if record.get('final_resolution') == u'success':
                            counts['success_assessor_exp_ml'] += 1
                        elif record.get('final_resolution') == u'block':
                            counts['block_assessor_exp_ml'] += 1
                    elif ml_response['result'] == u'unknown':
                        if record.get('final_resolution') == u'success':
                            counts['success_assessor_exp_noml'] += 1
                        elif record.get('final_resolution') == u'block':
                            counts['block_assessor_exp_noml'] += 1
                        counts['total_exp_ml_unknown'] += 1

                        if (u'meta_scores' in ml_response.get('prediction', {})
                                and record.get('final_resolution')
                                in config.RESOLUTION_SET):
                            ranking_score_list.append(
                                ml_response['prediction']['meta_scores']
                                ['meta_score_full_exam']
                            )
                            ranking_decision_list.append(
                                record.get('final_resolution')
                            )

        for ranking_drop_prop in config.RANKING_DROP_PROPORTIONS:

            ranking_drop_number = int(round(
                ((ranking_drop_prop / 100.) * len(ranking_score_list))
            ))

            ranking_drop_exams_ml = heapq.nsmallest(
                ranking_drop_number,
                zip(ranking_score_list, ranking_decision_list),
                key=lambda x: x[0]
            )
            ranking_drop_exams_random = random.sample(
                zip(ranking_score_list, ranking_decision_list),
                ranking_drop_number
            )

            for score, decision in ranking_drop_exams_ml:
                if decision == u'success':
                    counts['success_bottom_'
                           + str(ranking_drop_prop)
                           + '_ml'] += 1
                elif decision == u'block':
                    counts['block_bottom_'
                           + str(ranking_drop_prop)
                           + '_ml'] += 1

            for score, decision in ranking_drop_exams_random:
                if decision == u'success':
                    counts['success_bottom_'
                           + str(ranking_drop_prop)
                           + '_random'] += 1
                elif decision == u'block':
                    counts['block_bottom_'
                           + str(ranking_drop_prop)
                           + '_random'] += 1
        
        yield Record(**counts)
    
    return exam_group_mapper

In [None]:
def get_metrics(job, date):
    
    city_country_table = (job.table(config.YT_CITY_COUNTRY_TABLE_PATH)
                          .project('name', 'country', 'country_id'))
    
    qc_log_reduced = (
        job
        .table(''.join([config.YT_QUALITY_CONTROL_LOG_PREFIX, date]))
        .filter(nf.custom(lambda x: x in config.RELEVANT_RQC_TYPES, 'qc_type'))
        .join(city_country_table, by_left='city', by_right='name',
              type='left', assume_small_right=True)
        .project(ne.all(exclude=config.COLUMNS_TO_DROP_YT))
        .groupby('qc_id')
        .reduce(exam_group_reducer)
    )
    qc_log_mapped = (
        qc_log_reduced
        .map(create_exam_group_mapper(date), memory_limit=4096)
    )

    return qc_log_mapped

In [None]:
tables = []
for d in dates.range_selector(START_DATE, FINISH_DATE).strip('{}').split(','):
    tables.append(get_metrics(job, d))

job.concat(*tables).put(config.YT_DASHBOARD_TABLE_PATH)

In [None]:
# job.flow_graph

In [None]:
job.run()

In [None]:
table = job.table(
    config.YT_DASHBOARD_TABLE_PATH,
    ignore_missing=True
)

df = table.read().as_dataframe()

### Stuff

In [None]:
df = cluster.read(config.YT_QUALITY_CONTROL_LOG_PREFIX
                  + '2019-05-27').as_dataframe()

In [None]:
df_rqc = df[df['qc_type'].apply(lambda x: x in config.RELEVANT_RQC_TYPES)]

In [None]:
df_rqc.head()