In [1]:
import sys
sys.path.append("..")


In [2]:
import time
import pickle as pkl
import numpy as np
import pandas as pd

In [13]:
from pandarallel import pandarallel
pandarallel.initialize(nb_workers=256)

New pandarallel memory created - Size: 2000 MB
Pandarallel will run on 256 workers


In [14]:
import meta_qa

In [15]:
from meta_qa.integrations import BigQueryIntegration
from meta_qa.integrations import BigQueryColumnOperators

In [16]:
project_id = "idwall-data"
dataset = "dw_idwall"

In [17]:
bqi = BigQueryIntegration(project_id, dataset)

In [18]:
def apply_operators(row):
    """
    Applies the associated column functions for an given database column
    """
    table_name = row.name[-2]
    column_name = row.name[-1]    
    operator = bqi.column_operators(project_id,
                                    dataset,
                                    table_name,
                                    column_name)
    return row.map(operator.function)


def run_qa_operators(tasks):
    """
    Apply the QA operators on an tasks dataset, from self.get_tasks.
    """
    # Apply the QA functions associated with each column
    pipeline_out = tasks.parallel_apply(apply_operators, axis=1)
    
    # Wrangle the data for having something more intuitive
    pipeline_stack = pipeline_out.stack().droplevel(-1)
    operators = pipeline_stack.apply(lambda x: x["operator"])
    unique_ops = operators.values.flatten()
    unique_ops = unique_ops[unique_ops != None]
    unique_ops = np.unique(unique_ops)
    output = []
    for operator in operators.unique():
        operator_results = (pd.DataFrame(pipeline_stack.where(operators == operator)
                                    .dropna()).assign(operator=operator)
                                              .set_index("operator", append=True)
                                              .rename(columns={0: "result_object"})
                                              .unstack())
        operator_results.columns = [operator]
        output.append(operator_results)
    pipeline_result = pd.concat(output, sort=False, axis=1)
    return pipeline_result


def beautify_pipeline_cell(cell):
    """
    Transforms the QA pipeline elements into something more human readable.
    """
    nice_result = ""
    if type(cell) is dict:
        raw_result = cell["raw_result"]
        if issubclass(type(raw_result), Exception):
            parenthesis = "error"
        else:
            parenthesis = raw_result
        nice_result = "{} ({})".format(cell["result"], parenthesis)
    return nice_result


def run_qa_pipeline(bqi):
    metadata = bqi.get_metadata()
    tasks = bqi.get_tasks(metadata)
    pipeline_result = run_qa_operators(tasks)
    return pipeline_result.applymap(beautify_pipeline_cell)

In [19]:
tim1 = time.time()
out = run_qa_pipeline(bqi)
tim2 = time.time()
print(tim2 - tim1)

Error: {'result': False, 'raw_result': GenericGBQException('Reason: 400 No matching signature for operator < for argument types: BOOL, STRING. Supported signatures: ANY < ANY at [4:15]',), 'operator': 'not_window_percentile_null', 'project_id': 'idwall-data', 'dataset': 'dw_idwall', 'table_name': 'ft_bpo_services', 'column_name': 'sla_result'} 

Error: {'result': False, 'raw_result': GenericGBQException('Reason: 400 No matching signature for operator < for argument types: INT64, STRING. Supported signatures: ANY < ANY at [4:15]',), 'operator': 'not_window_percentile_null', 'project_id': 'idwall-data', 'dataset': 'dw_idwall', 'table_name': 'ft_bpo_operations', 'column_name': 'time_seconds'} 

Error: {'result': False, 'raw_result': GenericGBQException('Reason: 400 Name id_user not found inside t2 at [4:80]',), 'operator': 'related_to', 'project_id': 'idwall-data', 'dataset': 'dw_idwall', 'table_name': 'ft_bpo_operations', 'column_name': 'id_user'} 

Error: {'result': False, 'raw_result':


Error: {'result': False, 'raw_result': GenericGBQException('Reason: 400 GET https://www.googleapis.com/bigquery/v2/projects/idwall-data/queries/f01c3449-854f-474f-8368-fa122175b0b6?maxResults=0&timeoutMs=900&location=southamerica-east1: division by zero: 0 / 0',), 'operator': 'not_null', 'project_id': 'idwall-data', 'dataset': 'dw_idwall', 'table_name': 'lk_user_cs', 'column_name': 'desc_user'} 

Error: {'result': False, 'raw_result': GenericGBQException('Reason: 400 Name id_status_field_audit not found inside t2 at [4:98]',), 'operator': 'related_to', 'project_id': 'idwall-data', 'dataset': 'dw_idwall', 'table_name': 'ft_bpo_fields', 'column_name': 'id_status_field_audit'} 

Error: {'result': False, 'raw_result': GenericGBQException('Reason: 400 GET https://www.googleapis.com/bigquery/v2/projects/idwall-data/queries/f5aa414c-f96c-4dfb-9dd7-4356710d556a?maxResults=0&timeoutMs=900&location=southamerica-east1: division by zero: 3846 / 0',), 'operator': 'related_to', 'project_id': 'idwal

In [28]:
convert = lambda x: "lk_" + "_".join(x.split("_")[1:])

In [29]:
convert("id_attempt_message_type")

'lk_attempt_message_type'