In [35]:
import pandas as pd

from pyspark.sql import SQLContext

In [36]:
carriers_data = pd.DataFrame()
carriers_data = sqlContext.read.parquet('gs://iskldl01-projectby-local-bucket/raw/carriers').toPandas()
flights_data = pd.DataFrame()
flights_data = sqlContext.read.parquet('gs://iskldl01-projectby-local-bucket/raw/flights').toPandas()
airports_data = pd.DataFrame()
airports_data = sqlContext.read.parquet('gs://iskldl01-projectby-local-bucket/raw/airports').toPandas()

In [37]:
def check_completeness_non_null(df, name):
    null_counts = df.isna().sum()
    bad_data = null_counts[null_counts > 0].sort_values(ascending=False)
    df.name = name
    if len(bad_data) == 0:
        status = 'Passed'
    else:
        status = 'Failed'
        bad_data = null_counts[null_counts > 0].sort_values(ascending=False)
    result = [df.name, 'Completeness non null', 'All', status, str(bad_data)]
    return result
    print(result)

In [38]:
def check_uniqueness_by_pk(df, name, keys):
    df.name = name
    for key in keys:
        df_test =df.loc[:, key]
        bad_data = df_test[df_test.duplicated()]
        if bad_data is None:
            status = 'Passed'
        else:
            status = 'Failed'
        result = [name,'Uniqueness by PK', keys,status, str(bad_data)]
    return result

In [39]:
def check_consistency_w_formula(df, name, column_name, arg1, arg2):
    df.name = name
    count = 0
    bad_data = []
    status = 'Passed'
    main_data = df[column_name].apply(lambda x: str(x).replace('NA', '0') if str(x)=='NA' else x)
    arg1_data = df[arg1].apply(lambda x: str(x).replace('NA', '0') if str(x)=='NA' else x)
    arg2_data = df[arg2].apply(lambda x: str(x).replace('NA', '0') if str(x)=='NA' else x)
    df_test = pd.to_numeric(main_data)
    df_test2 = pd.to_numeric(arg1_data) - pd.to_numeric(arg2_data)

    for item in df_test:
        if df_test[count] == df_test2[count]:
            count +=1
        else:
            status = 'Failed'
            bad_data.append(count)
            count +=1
    result = [df.name,'Consistency', column_name, status, str(bad_data)]
    return result

In [40]:
def check_constistency(df, name, test_col, sample_df, sample_col):
    bad_data = []
    status = 'Passed'
    df.name = name
    sample = sample_df.loc[:, sample_col].unique()
    test = df.loc[:, test_col].unique()
    for item in test:
        if item in sample:
            continue
        else:
            status = 'Failed'
            bad_data.append(item)
    result = [df.name, 'Consistency', test_col, status,str(bad_data)]
    return result

In [41]:
test_data = [check_completeness_non_null(carriers_data, 'Carrier'),
            check_uniqueness_by_pk(airports_data, 'Airports', ['iata']),
            check_consistency_w_formula(flights_data, 'Flights', 'DepDelay','DepTime','CRSDepTime'),
            check_constistency(flights_data, 'Flights', 'UniqueCarrier', carriers_data, 'code')]

In [42]:
df = pd.DataFrame(test_data, index  = [1,2,3,4], columns = ['Table', 'DQ Check', 'Columns', 'Status', 'Bad Data']) 
display(df)

Unnamed: 0,Table,DQ Check,Columns,Status,Bad Data
1,Carrier,Completeness non null,All,Passed,"Series([], dtype: int64)"
2,Airports,Uniqueness by PK,[iata],Failed,1686 Z08\n1687 Z09\n1688 Z13\n1689 ...
3,Flights,Consistency,DepDelay,Failed,"[0, 1, 2, 5, 15, 16, 17, 18, 19, 20, 36, 39, 4..."
4,Flights,Consistency,UniqueCarrier,Failed,"['AXX', '1B9']"
