In [None]:
%sh

pip install pretty_html_table
pip install openpyxl


git clone https://github.com/nedap/dateinfer.git
cd dateinfer
pip install .

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import time
import dateinfer
import unicodedata
import re
from numpy import nansum
from numpy import nanmean
import pandas as pd
from shutil import copyfile
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows


In [None]:
def normalize(column: str) -> str:
    """
    Normalize column name by replacing invalid characters with underscore
    strips accents and make lowercase
    :param column: column name
    :return: normalized column name
    """
    n = re.sub(r"[ ,;{}()\n\t=]+", '_', column.lower())
    return unicodedata.normalize('NFKD', n).encode('ASCII', 'ignore').decode()

def get_most_freq_date_fmt(date_col) -> pd.Series:
  
    date_col = date_col.apply(lambda x: dateinfer.infer([x]))
    date_col_freq_fmt = date_col.mode()[0]
    return date_col_freq_fmt  
  
def auto_date_parser(parse_df, date_cols, final_date_fmt):
    
    
  
  # Convert Date Formats of a column into most frequent date format
    infer_fmt = []

    for column in date_cols:
    # Sample data and convert into Pandas
        pdf = parse_df.filter(parse_df[column].isNotNull()).limit(100)
        pdf = pdf.toPandas()

        infer_fmt.append(get_most_freq_date_fmt(pdf[column]))
        print(column,":",infer_fmt)
        infer_fmt = list(set(infer_fmt))
        infer_fmt = [i.replace("%Y","yyyy").replace("%m","MM").replace("%d","dd")  for i in infer_fmt]
    # In sequence correct date format should be 1st.
        base_fmt = ["yyyy-MM-dd", "yyyy MM dd","yyyy MMMM dd","%m/%d/%Y","dd-MM-yyyy", "MM/dd/yyyy","MM-dd-yyyy", "yyyy-MM-dd", "%d/%m/%Y", "dd/MM/yyyy"]
        def to_date_(col, formats=infer_fmt + base_fmt):
      # Spark 2.2 or later syntax, for < 2.2 use unix_timestamp and cast
          print("formats:",formats)
          return coalesce(*[to_date(col, f) for f in formats])

        parse_df = parse_df.withColumn(column, date_format(to_date_(column), final_date_fmt))
    return parse_df

def preprocess_data(df, numeric_cols):
    #, date_cols, final_dt_fmt = "yyyy-MM-dd"
    for Col in numeric_cols:
        
        df = df.withColumn(Col, regexp_replace(Col,'[^0-9.]',''))
        #df = df.withColumn(Col, regexp_replace(Col,'[\,|\ ]',''))
        #df = auto_date_parser(df, date_cols, final_date_fmt = final_dt_fmt)
    
    return df

In [None]:
def get_null_perc(spark, df, null_cols):
    """ Get null/empty percentage for columns
    Args:
        spark (Spark): SparkSession object
        df (DataFrame): dataframe to perform null/empty analysis on
        null_cols (List): list of columns that need to be considered for analysis 
    Returns:
        DataFrame: dataframe with null check analysis
    """
    schema = StructType([ \
        StructField("Column",StringType(),True), \
        StructField("NullPercentage",StringType(),True)
    ])
    emptyRDD = spark.sparkContext.emptyRDD()
    resultdf = spark.createDataFrame(emptyRDD, schema=schema)

    for x in null_cols:
        if x.upper() in (name.upper() for name in df.columns):
            df_null_count = df.select(col(x)).filter(col(x).isNull() | (col(x) == '')).count()
            df_null = spark.createDataFrame([[x, f'{(df_null_count*100.0/df.count()):.2f}' + "%" ]],schema=schema)
            resultdf = resultdf.union(df_null)
    return resultdf
    

In [None]:
def get_summary_numeric(df, numeric_cols):
    """ Get Summary for numeric columns
    Args:
        df (DataFrame): dataframe to perform analysis on
        numeric_cols (List): list of columns that need to be considered for analysis
    Returns:
        DataFrame: dataframe with summary analysis
    """
    for x in numeric_cols:
        if x.upper() not in (name.upper() for name in df.columns):
            numeric_cols.remove(x)
    return df.select(numeric_cols).summary()

In [None]:
def get_distinct_counts(spark, df, aggregate_cols):
    """ Get distinct count for columns
    Args:
        spark (Spark): SparkSession object
        df (DataFrame): dataframe to perform distinct count analysis on
        aggregate_cols (List): list of columns that need to be considered for analysis
    Returns:
        DataFrame: dataframe with distinct count analysis
    """
    schema = StructType([ \
        StructField("Column",StringType(),True), \
        StructField("DistinctCount",StringType(),True)
    ])

    emptyRDD = spark.sparkContext.emptyRDD()
    resultdf = spark.createDataFrame(emptyRDD, schema=schema)

    for x in aggregate_cols:
        if x.upper() in (name.upper() for name in df.columns):
            df_distinct_count = df.select(col(x)).distinct().count()
            df_distinct = spark.createDataFrame([[x, str(df_distinct_count)]],schema=schema)
            resultdf = resultdf.union(df_distinct)

    return resultdf

In [None]:
def get_distribution_counts(spark, df, aggregate_cols):
    """ Get Distribution Counts for columns
    Args:
        spark (Spark): SparkSession object
        df (DataFrame): dataframe to perform null/empty analysis on
        aggregate_cols (List): list of columns that need to be considered for analysis
    Returns:
        Array: Array of objects with dataframes
    """
    result = []
    for x in aggregate_cols:
        if x.upper() in (name.upper() for name in df.columns):
            result.append(df.groupby(col(x)).count().sort(col("count").desc()))
    ###

    return result

In [None]:
def get_mismatch_perc(spark, df, data_quality_cols_regex):
    """ Get Mismatch Percentage for columns
    Args:
        spark (Spark): SparkSession object
        df (DataFrame): dataframe to perform null/empty analysis on
        data_quality_cols_regex (Dictionary): Dictionary of columns/regex-expression for data quality analysis
    Returns:
        DataFrame: DataFrame with data quality analysis
    """
    schema = StructType([ \
        StructField("Column",StringType(),True), \
        StructField("MismatchPercentage",StringType(),True)
    ])

    emptyRDD = spark.sparkContext.emptyRDD()
    resultdf = spark.createDataFrame(emptyRDD, schema=schema)


    for key, value in data_quality_cols_regex.items():
        if key.upper() in (name.upper() for name in df.columns):
            df_regex_not_like_count = df.select(col(key)).filter(~col(key).rlike(value)).count()
            df_regex_not_like = spark.createDataFrame([[key, str(df_regex_not_like_count*100.0/df.count()) + '%']],schema=schema)
            resultdf = resultdf.union(df_regex_not_like)

    return resultdf

In [None]:
def generate_report(raw_df:pd.DataFrame,err_df:pd.DataFrame,destination_path:str)->str:
    
    
    
    wb = Workbook() # initializing an empty workbook
    ws = wb.active
    ws.title = 'Raw Data' # Sheet contains the raw data
    ws1 = wb.create_sheet("Data Overview") #aggregation of the raw data
    ws2 = wb.create_sheet("DQ_Report") # This is the worksheet where we will add the error df summary
    rows = dataframe_to_rows(raw_df, index = False, header = True)

    for r_idx, row in enumerate(rows, 1):  
        for c_idx, value in enumerate(row, 1):
            ws.cell(row=r_idx, column=c_idx, value=value)
        
    pivot_df =raw_df
    pivot_df = pivot_df.replace(',','', regex=True)
    pivot_df['Week'] = pd.to_datetime(pivot_df['Week'], infer_datetime_format=True)
    pivot_df['Year-Week'] = pivot_df.Week.dt.strftime('%Y-w%V')
    pivot_df['Week'] = pivot_df['Week'].dt.date
    pivot_df['Net Spend (Local)'] = pivot_df['Net Spend (Local)'].replace(' -   ',np.nan, regex=True)
    pivot_df['Net Spend (Local)'] = pivot_df['Net Spend (Local)'].astype(float)
    pivot_df['Impressions'] = pivot_df['Impressions'].replace(' -   ',np.nan, regex=True).replace("\ ", "",regex=True)
    pivot_df['Impressions'] = pivot_df['Impressions'].astype(float)
    pivot_df['Reach'] = pivot_df['Reach'].replace(' -   ',np.nan, regex=True)
    pivot_df['Reach'] = pivot_df['Reach'].replace("\%","",regex=True).replace("",np.nan,regex=True).astype(float)
    pivot_df = pivot_df.groupby(['Brand Generation','Product','Week','Year-Week']).agg({'Net Spend (Local)': nansum, 'Impressions': nansum, 'Reach': nansum}).reset_index() 
  
    rows = dataframe_to_rows(pivot_df, index = False, header = True)
    for r_idx, row in enumerate(rows, 1): 
        for c_idx, value in enumerate(row, 1):
            ws1.cell(row=r_idx, column=c_idx, value=value)
        
        
    rows = dataframe_to_rows(err_df, index = False, header = True)
    for r_idx, row in enumerate(rows, 1): 
        for c_idx, value in enumerate(row, 1):
            ws2.cell(row=r_idx, column=c_idx, value=value)
        
        
    wb.save('/tmp/dq_report.xlsx')
    dest_path =  copyfile('/tmp/dq_report.xlsx',f'{destination_path}DQ_Report.xlsx')

    return dest_path

In [None]:


def add_ingestion_date(input_df):
    output_df = input_df.withColumn("ingestion_date",current_timestamp())
    return output_df

In [None]:
def df_null_percentage(df = None)->pd.Series:
    train_missing = (1 - df.count()/len(df)) * 100
    return train_missing.sort_values(ascending = False)

In [None]:
def get_null_dict(df:pd.DataFrame)->dict:

    null_series = df_null_percentage(df)
    percentage_null_dict = dict()

    for key in null_series.index:
        percentage_null_dict[key] = null_series[key]
    return percentage_null_dict 

In [None]:
def add_row_index_to_df(df:pd.DataFrame)->pd.DataFrame:
    df['ROW_ID'] = df.index + 1
    return df

In [None]:
def err_df_null_rule(null_check_columns:list, df:pd.DataFrame = None,threshold_per = 10)->pd.DataFrame:
    
    """
    Function to return the DataFrame for DQ_Report Tab (Rows containing NULLs)
    Args:
        
        df (Pandas DataFrame): dataframe to perform null/empty analysis on
        null_check_columns (List): Columns to check for Null data
    Returns:
        DataFrame: DataFrame with Rows not passing the Null Rule threshold.
    
    """
    percentage_null_dict = get_null_dict(df)
    df = add_row_index_to_df(df)
     

    return_df = pd.DataFrame()

   
    threshold_nulls = [col for col in null_check_columns if percentage_null_dict[col] > threshold_per]

    try:
        for col in threshold_nulls:
            temp_df = pd.DataFrame()
            temp_df['rule_name'] = df[col].map(lambda x : f'null_value_for_the_{col}' if(pd.isnull(x)) else None)

            temp_df = temp_df[temp_df['rule_name']!=None]
      
            temp_df['ROW_ID'] = temp_df.index+1

            return_df = pd.concat([temp_df,return_df],axis=0)

      
        return_df = return_df[return_df['rule_name'].notnull()]    

        return pd.merge(return_df,df,on='ROW_ID')

    except:
        

        print("Passed Columns have passed null checks")
        return return_df


In [None]:
def reconcilliation_tool(config_str, get_fuzzy_match = False, tol_val = 0.1):
    """Checks data between dataframes for further reconcilliation and creates a report

    Args:
        config_str: Config Path containing the complete details to run the module.
        get_fuzzy_match: Performs Fuzzy Matching using levenshtein distance and recommends closest matches.
    Returns:
        
    Raises:
        ValueError: if config is None or not parsed.
    
    TODO :
        1. Return Flag if the data does not match for adding to automated data quality pipelines based on a tolerance value
        2. Add Reconcile checks between numerical columns 
        3. ignore_records config, so that we can ignore some rows from getting checked

    """
    
    try:
        
        config = yaml.load(config_str, Loader=yaml.FullLoader)
    except:
        raise ValueError('Not Able to Parse Config String')
    
    # Read files from config
    alpha_df = eval(config['datasources']['alpha_df'])
    beta_df = eval(config['datasources']['beta_df'])
    
    for key in config['reconcilation_check']:
        
        displayHTML( '<h1> Matching between {0} and {1} </h1>'.format(key, config['reconcilation_check'][key]))
        left_df =  alpha_df.select(key).drop_duplicates()
        right_df = beta_df.select(config['reconcilation_check'][key]).drop_duplicates()
        displayHTML( '<b> Number of Unique Values in {0} : {1}'.format(key, str(left_df.count())))
        displayHTML( '<b> Number of Unique Values in {0} : {1} </b>'.format(config['reconcilation_check'][key], str(right_df.count())))
                  
        DFJOIN  = left_df.join(beta_df, left_df[key] == right_df[config['reconcilation_check'][key]], "outer")
        DFJOINMERGE = DFJOIN.withColumn("_merge", when(DFJOIN[key].isNull(), config['datasources']['beta_alias']).when(DFJOIN[config['reconcilation_check'][key]].isNull(), config['datasources']['alpha_alias']).otherwise("both"))
        display(DFJOINMERGE.select(*[key]+[config['reconcilation_check'][key]] + ['_merge']))
        res = DFJOINMERGE.groupBy("_merge").count().toPandas()
      
        try:
            displayHTML( '<b> Match Result Percentage : {0} </b>'.format(str(np.round((res[res['_merge'] == 'both']['count'].values[0]/res['count'].sum())*100, 2))))
            if np.round((res[res['_merge'] == 'both']['count'].values[0]/res['count'].sum())*100, 2) < (1-tol_val)*100:
                raise ValueError('Reconcilliation check failed due to not passing tolerance level')
        except:
            displayHTML( '<b> Match Result Percentage : {0} </b>'.format(str(0.00)))
            if 0 < (1-tol_val)*100:
                raise ValueError('Reconcilliation check failed due to not passing tolerance level')
      
        if get_fuzzy_match:
            displayHTML( '<b> Fuzzy Match between {0} and {1} </b>'.format(key, config['reconcilation_check'][key]))
            fuzzydf = left_df.join(right_df, levenshtein(lower(left_df[key]), lower(right_df[config['reconcilation_check'][key]])) < 3)
            display(fuzzydf)

In [None]:
def stage_data_to_s3():
    AWS_SECRET = ''
    AWS_KEY_ID =  ''
    import boto3
# Generate the boto3 client for interacting with S3
    s3 = boto3.resource('s3', region_name='us-east-1', 
                        # Set up AWS credentials 
                        aws_access_key_id=AWS_KEY_ID, 
                         aws_secret_access_key=AWS_SECRET)
    
    AWS_BUCKET_NAME = "demo-s3-stage"
    SRC_FILE_PATH = f'/dbfs/FileStore/ROI_DQ/DQ_Report.xlsx'
    
    FILE_NAME = f'DQ_Report.xlsx'
    S3_DEST_PATH= f'{AWS_BUCKET_NAME}'+'/'+FILE_NAME


    bucket = s3.Bucket(AWS_BUCKET_NAME)
    bucket.upload_file(SRC_FILE_PATH, S3_DEST_PATH)
    print(f'file loaded to s3 path {S3_DEST_PATH}')
    