# Quality assurance and automated service data review

This notebook reviews published service data for common mistakes. Relies on gc-service-data-script outputs to function.

In [1]:
import pandas as pd
import numpy as np
import requests, pytz, os, re
from tqdm.notebook import tqdm  # Use tqdm.notebook for JupyterLab progress bars
from concurrent.futures import ThreadPoolExecutor, as_completed


In [2]:
# Specify date and time in correct timezone
timezone = pytz.timezone('America/Montreal')
current_datetime = pd.Timestamp.now(tz=timezone)
current_datetime_str = current_datetime.strftime("%Y-%m-%d_%H:%M:%S")
print(f'Current datetime: {current_datetime_str}')

Current datetime: 2024-12-04_10:43:48


In [3]:
# Import service inventory and service standards to dataframes
si = pd.read_csv('si.csv', sep=';',  na_values=['NA'], keep_default_na=True)
ss = pd.read_csv('ss.csv', sep=';',  na_values=['NA'], keep_default_na=True)

# Extract date of generation from timestamp on last line
date = pd.to_datetime(si.iloc[-1, 0].split(':')[1].split('_')[0])

# Remove last line with datestamp from dataframes
si = si.iloc[:-1]
ss = ss.iloc[:-1]

In [4]:
# Function to check if a URI's format is a problem
def is_problem_format(uri):
    # Normalize URI by converting to string and stripping whitespace
    uri = str(uri).strip()

    # Check for multiple occurrences of 'http' or 'https'
    if uri.count('http://') > 1 or uri.count('https://') > 1:
        return True  # Problematic if more than one scheme

    # Check if input starts with valid schemes
    if not uri.startswith(('http://', 'https://')):
        return True  # Problematic if it does not start with a valid scheme

    # Check for invalid characters
    invalid_characters_pattern = r'[^a-zA-Z0-9\-._~:/?#@!$&\'()*+,;=%]'
    if re.search(invalid_characters_pattern, uri):
        return True  # Problematic if invalid characters are found

    return False  # No problem


# Function to check if a URI's address is a problem
def is_problem_uri(uri):
    session = requests.Session()
    try:
        response = session.head(uri, allow_redirects=True, timeout=60)
        return uri, response.status_code != 200, response.status_code
    except requests.RequestException as e:
        return uri, True, str(e)

# Consolidate all URIs from DataFrames
uri_cols = {
    'service_uri_en': si,
    'service_uri_fr': si,
    'standards_targets_uri_en': ss,
    'standards_targets_uri_fr': ss,
    'performance_results_uri_en': ss,
    'performance_results_uri_fr': ss
}

# Set up dictionaries to store validation results and descriptions of issues. 
# Keys will be URIs, values will be results or details
validation_results = {}
details = {}

# Generate a list of unique URIs to check
all_uris = pd.concat([df[col] for col, df in uri_cols.items()], ignore_index=True)
print(f'total uri records: {len(all_uris)}')

unique_uris = all_uris.dropna().sort_values().unique()
print(f'unique uris: {len(unique_uris)}')

# Filter out problematic formats so we don't validate them with requests
problem_format_uris = [uri for uri in unique_uris if is_problem_format(uri)]
print(f'problem format uris: {len(problem_format_uris)}')

# All problem format URIs are assigned the same description
for uri in problem_format_uris:
#    print(f'problem format: {uri}')
    validation_results[uri] = True
    details[uri] = "Incorrect URI format"

# Remove problematic format URIs from unique_uris
unique_uris = [uri for uri in unique_uris if uri not in problem_format_uris]
print(f'valid format uris: {len(unique_uris)}')

# Use ThreadPoolExecutor for parallel validation of remaining URIs
with ThreadPoolExecutor(max_workers=1000) as executor:
    results = executor.map(is_problem_uri, unique_uris)

# Process results from executor.map
    for uri, is_problem, detail in results:
#        print(f'{is_problem}, {detail}: {uri}')
        validation_results[uri] = is_problem
        details[uri] = detail

print('done checks')

# Filter validation_results for only results with errors:
filtered_results = {uri: is_problem for uri, is_problem in validation_results.items() if is_problem}

# Save updated validation results
filtered_results_df = pd.DataFrame(
    [(uri, is_problem) for uri, is_problem in filtered_results.items()],
    columns=['uri', 'is_problem']
)

# Map validation results to DataFrames
for column, df in uri_cols.items():
    # Add validation status
    df = df.merge(filtered_results_df, how='left', left_on=column, right_on='uri')
    df[f'qa_{column}_is_problem'] = df['is_problem'].astype(bool).fillna(False)
    df.drop(columns=['is_problem'], inplace=True)

filtered_results_df.to_csv('uri_validation_errors.csv', index=False)


total uri records: 67764
unique uris: 5880
problem format uris: 669
valid format uris: 5211
done checks


In [5]:
# Duplicate service ID conflict
# Step 1: Flag rows where 'service_id' is duplicated within each 'fiscal_yr'
si['qa_duplicate_sid'] = si.duplicated(subset=['fiscal_yr', 'service_id'], keep=False)

# Step 2: Get unique 'service_id's that are flagged as duplicates
duplicate_ids = si.loc[si['qa_duplicate_sid'], 'service_id'].unique()

# Step 3: Filter rows with duplicate 'service_id's and group by 'service_id' and 'department_en'
duplicate_groups = (
    si.loc[si['service_id'].isin(duplicate_ids), ['fiscal_yr', 'service_id', 'department_en']]
    .groupby(['service_id', 'department_en'])['fiscal_yr']  # Count occurrences of 'fiscal_yr'
    .nunique()  # Count unique fiscal years for each group
)

# Step 4: Identify groups with only one unique fiscal year (problematic cases)
problematic_duplicates = duplicate_groups[duplicate_groups == 1].reset_index()

# Step 5: Keep only 'service_id' and 'department_en' columns
problematic_duplicates = problematic_duplicates[['service_id', 'department_en']]

# Step 6: Create a set of tuples from 'problematic_duplicates' for efficient lookup
problematic_set = set(zip(problematic_duplicates['service_id'], problematic_duplicates['department_en']))

# Step 7: Update the 'qa_duplicate_sid' column based on whether each row matches a problematic duplicate
si['qa_duplicate_sid'] = si.apply(
    lambda row: (row['service_id'], row['department_en']) in problematic_set, axis=1
)


# check:
# si.loc[:, ['fiscal_yr', 'department_en', 'service_id', 'qa_duplicate_sid']][si['qa_duplicate_sid']]

In [6]:
# Duplicate Service Standard ID conflict
# Step 1: Flag rows where 'service_standard_id' is duplicated within each 'fiscal_yr'
ss['qa_duplicate_stdid'] = ss.duplicated(subset=['fiscal_yr', 'service_standard_id'], keep=False)

# Step 2: Get unique 'service_standard_id's that are flagged as duplicates
duplicate_ids = ss.loc[ss['qa_duplicate_stdid'], 'service_standard_id'].unique()

# Step 3: Filter rows with duplicate 'service_standard_id's and group by 'service_standard_id' and 'department_en'
duplicate_groups = (
    ss.loc[ss['service_standard_id'].isin(duplicate_ids), ['fiscal_yr', 'service_standard_id', 'department_en']]
    .groupby(['service_standard_id', 'department_en'])['fiscal_yr']  # Count occurrences of 'fiscal_yr'
    .nunique()  # Count unique fiscal years for each group
)

# Step 4: Identify groups with only one unique fiscal year (problematic cases)
problematic_duplicates = duplicate_groups[duplicate_groups == 1].reset_index()

# Step 5: Keep only 'service_standard_id' and 'department_en' columns
problematic_duplicates = problematic_duplicates[['service_standard_id', 'department_en']]

# Step 6: Create a set of tuples from 'problematic_duplicates' for efficient lookup
problematic_set = set(zip(problematic_duplicates['service_standard_id'], problematic_duplicates['department_en']))

# Step 7: Update the 'qa_duplicate_sid' column based on whether each row matches a problematic duplicate
ss['qa_duplicate_stdid'] = ss.apply(
    lambda row: (row['service_standard_id'], row['department_en']) in problematic_set, axis=1
)


# check:
# ss.loc[:, ['fiscal_yr', 'department_en', 'service_id', 'service_standard_id', 'qa_duplicate_stdid']][ss['qa_duplicate_stdid']]

In [7]:
# Record is reported for a fiscal year that is incomplete or in the future.
si['fiscal_yr_end_date'] = pd.to_datetime(si['fiscal_yr'].str.split('-').str[1]+'-04-01')
si['qa_fiscal_yr_in_future'] = si['fiscal_yr_end_date'] >= date

ss['fiscal_yr_end_date'] = pd.to_datetime(ss['fiscal_yr'].str.split('-').str[1]+'-04-01')
ss['qa_fiscal_yr_in_future'] = ss['fiscal_yr_end_date'] >= date

In [8]:
# Record has contradiction between client feedback channels and online interaction points for feedback
si['qa_client_feedback_contradiction'] = (

    # Service accepts client feedback via the online channel (ONL) but online issue resolution or feedback is not applicable or not activated
    (
        si['client_feedback_channel'].str.contains('ONL') & 
        (
            si['os_issue_resolution_feedback'].isna() | 
            (si['os_issue_resolution_feedback'] == 'N')
        )
    ) |
    # Service has not listed the online channel (ONL) for client feedback but online issue resolution or feedback is activated
    (
        (~si['client_feedback_channel'].str.contains('ONL')) &
        (si['os_issue_resolution_feedback'] == 'Y')
    )
)

# si[['client_feedback_channel', 'os_issue_resolution_feedback', 'client_feedback_contradiction']].loc[si['client_feedback_contradiction'] == True]

In [9]:
# Service standards have volume but no volume indicated at service level
ss_vol_by_service = (
    ss.groupby(['fiscal_yr', 'service_id'])['total_volume']
    .sum()
    .reset_index()
    .rename(columns={'total_volume':'total_volume_ss'})
)

si = (
    si.merge(ss_vol_by_service, on=['fiscal_yr', 'service_id'], how='left')
    .fillna({'total_volume_ss': 0})
)

si['qa_ss_vol_without_si_vol'] = (
    (si['total_volume_ss'] > 0) & (si['num_applications_total'] == 0)
)

In [10]:
# Services that target society as a recipient type we would not expect to see specific interaction volume
# Note that this assumption may be false
si['qa_service_recipient_type_society_with_interactions'] = (
    (si['service_recipient_type'] == 'SOCIETY') &
    (si['num_applications_total'] > 0)
)

In [11]:
# Services where 'persons' are a client type should not be 'NA' for SIN as ID
si['qa_use_of_sin_applicable'] = (
    (si['client_target_groups'].str.contains('PERSON')) &
    (si['sin_usage'].isna())
)

In [12]:
# Services where 'econom' (business) are a client type should not be 'NA' for CRA BN as ID
si['qa_use_of_cra_bn_applicable'] = (
    (si['client_target_groups'].str.contains('ECONOM')) &
    (si['cra_bn_identifier_usage'].isna())
)

In [13]:
# Define the DataFrames to export to csv and their corresponding names
csv_exports = {
    "si_qa": si,
    "ss_qa": ss,
}

# Loop through the dictionary
for name, df in csv_exports.items():
    # Generate the filename using the key (string name)
    fn = f"{name}.csv"
    
    # Export the DataFrame to CSV
    df.to_csv(fn, index=False, sep=';')
    
    # Append the timestamp at the end of the file
    with open(fn, 'a') as timestamped_file:
        timestamped_file.write(f"\nTimestamp:{current_datetime_str}\n")
