# Risk Analytics - Client Input File Quality Assuance Check

This code helps you identify and flag duplicates in an excel file. It includes:

-Importing the required libraries such as Pandas, Openpyxl, os and tkinter.

-Allows the user to select the excel file to analyze and extract the file name.

-Defines the possible names of the address and effective date columns.

-Checks if any of the address column names are in the DataFrame and extract the address components.

-Flag duplicates in the address column and count the number of unique addresses and the percentage of duplicate addresses.

-Concatenates latitude and longitude columns to create a lat_long column and flags duplicates in the lat_long column. It then counts the number of duplicates and calculates the percentage of duplicates.

-Checks if the effective date column exists in the DataFrame, converts it to datetime format and creates a new output flag column with values True if the effective date is prior to 2020, False otherwise. It then calculates the percentage of records with the output flag set to True and counts the number of records with effective date prior to 2020.

-Creates a DataFrame with the count and percentages of the address duplicates, lat/long duplicates, effective date duplicates, and records prior to 2020.

-The results are written to a new Excel file that contains both the original data and the duplicate report. The user can select the save location using the file dialog.

In [None]:
# Import and leverage the imported file for analysis
import pandas as pd
import openpyxl
import os
import re
import tkinter as tk
from tkinter import Tk
from tkinter import filedialog

Tk().withdraw()
file_path = filedialog.askopenfilename()

In [None]:
# Extract the base name of the file
file_name = os.path.basename(file_path)

# Remove the extension from the file name
file_name_without_extension = os.path.splitext(file_name)[0]

In [None]:
# Read the Excel file into a DataFrame
df = pd.read_excel(file_path)

print(file_name)
print(df.columns)

In [None]:
# Define the possible names of the address columns
address_columns = {
    'street_and_house_number': ['Addr1','Risk Address','Street','address'],
    'city': ['City','Risk City','city'],
    'state_abbreviation': ['State','StateProvCd','Risk State','state'],
    'postal_code': ['Risk Zip Code','PostalCd','Zip','Zip Code','PostalCode','zip','Risk Zip'],
    'latitude': ['Lat', 'Risk Lat', 'lat','Latitude','latitude','Risk Latitude'],
    'longitude': ['Long','Risk Long','long','Longitude','longitude','Risk Longitude']
    }
    
# Check if any of the address column names are in the DataFrame
address_column_mask = df.columns.isin(
    address_columns['street_and_house_number'] +
    address_columns['city'] +
    address_columns['state_abbreviation'] +
    address_columns['postal_code']
)

# Get the name of the column to use for each component
street_and_house_number_col = list(set(df.columns) & set(address_columns['street_and_house_number']))[0]
city_col = list(set(df.columns) & set(address_columns['city']))[0]
state_abbreviation_col = list(set(df.columns) & set(address_columns['state_abbreviation']))[0]
postal_code_col = list(set(df.columns) & set(address_columns['postal_code']))[0]

# Select the first column that exists in the DataFrame
for column, column_names in address_columns.items():
    match = df.columns[df.columns.isin(column_names)].tolist()
    if match:
        address_columns[column] = match[0]

if not any(address_column_mask):
    raise ValueError('None of the specified address columns were found in the DataFrame')

# Concatenate the address components into a single address string
df['address'] = (
    df[address_columns['street_and_house_number']].astype(str) + ', ' +
    df[address_columns['city']].astype(str) + ', ' +
    df[address_columns['state_abbreviation']].astype(str) + ' ' +
    df[address_columns['postal_code']].astype(str)
)

In [None]:
# Flag duplicates in the address column
df['address_duplicate'] = df['address'].duplicated()

# Count the number of duplicates
num_address_duplicates = df['address_duplicate'].sum()

# Count the number of unique addresses
num_unique_addresses = len(df['address'].unique())

# Calculate the percentage of duplicate addresses
if num_unique_addresses > 0:
    percent_address_duplicates = num_address_duplicates / num_unique_addresses * 100
else:
    percent_address_duplicates = 0.0

# Check if any latitude and longitude columns exist in the dataframe
if not any(df.columns.isin([address_columns['latitude']])):
    missing_latitude_cols = [address_columns['latitude']]
else:
    missing_latitude_cols = []

if not any(df.columns.isin([address_columns['longitude']])):
    missing_longitude_cols = [address_columns['longitude']]
else:
    missing_longitude_cols = []

missing_categories = []
if missing_latitude_cols:
    missing_categories.append('latitude')
if missing_longitude_cols:
    missing_categories.append('longitude')

if missing_categories:
    print("The following categories are missing:", missing_categories)
    num_lat_long_duplicates = 0
    percent_lat_long_duplicates = 0
else:
    # Select the first column that exists in the DataFrame for latitude and longitude
    lat_col = df.columns[df.columns.isin([address_columns['latitude']])].tolist()[0]
    long_col = df.columns[df.columns.isin([address_columns['longitude']])].tolist()[0]

    # Concatenate latitude and longitude columns to create a lat_long column
    df['lat_long'] = df[lat_col].astype(str) + ', ' + df[long_col].astype(str)

    # Flag duplicates in the lat_long column
    df['lat_long_duplicate'] = df['lat_long'].duplicated()

    # Count the number of duplicates
    num_lat_long_duplicates = df['lat_long_duplicate'].sum()

    # Calculate the percentage of duplicates
    percent_lat_long_duplicates = num_lat_long_duplicates / df.shape[0] * 100

In [None]:
lower_48_states = ['AL', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DC', 'DE', 'FL', 'GA', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY']

# Define a regular expression pattern to match any string that is not a lower 48 state abbreviation
pattern = re.compile(r'\b(?!(DC|PR|VI|GU|AS))[A-Z]{2}\b')

# Check if the state_abbreviation column exists in the DataFrame
if state_abbreviation_col not in df.columns:
    raise ValueError('The specified state_abbreviation column was not found in the DataFrame')

# Convert the state_abbreviation column to string
df[state_abbreviation_col] = df[state_abbreviation_col].astype(str)

# Count the number of non-lower-48 state abbreviations in the state_abbreviation column
non_lower_48_count = df[state_abbreviation_col].apply(lambda x: pattern.match(x) is not None).sum()

# Flag all instances of non-lower-48 state abbreviations in the state_abbreviation column
df['is_non_lower_48_state'] = df[state_abbreviation_col].apply(lambda x: x not in lower_48_states)

# Count the number of lower-48 state abbreviations in the state_abbreviation column
lower_48_count = df[state_abbreviation_col].apply(lambda x: x in lower_48_states).sum()

# Print the total and lower-48 state counts
total_count = len(df[state_abbreviation_col])
print('total_count:', total_count)
print('lower_48_count:', lower_48_count)

In [None]:
# Print the results
print('Number of duplicate addresses:', num_address_duplicates)
print('Percentage of duplicate addresses:', percent_address_duplicates)
print('Number of duplicate lat/long pairs:',num_lat_long_duplicates)
print('Percentage of duplicate lat/long pairs:', percent_lat_long_duplicates)
print('Out of Contiguous 48 States :', total_count - lower_48_count)

In [None]:
# Define the possible names of the effective date columns
effective_date_columns = ['Effective Date', 'EffDate', 'eff_date','effective_date']

# Check if any of the effective date column names are in the DataFrame
effective_date_column_mask = df.columns.isin(effective_date_columns)

if any(effective_date_column_mask):
    # Get the name of the column to use for the effective date
    effective_date_col = list(set(df.columns) & set(effective_date_columns))[0]
else:
    print('No effective date column found in the DataFrame. Count and percentages will be reported as 0.')
    effective_date_col = None

# Flag duplicates in the effective date column
if effective_date_col:
    df['effective_date_duplicate'] = df[effective_date_col].duplicated()
else:
    df['effective_date_duplicate'] = False

# Count the number of duplicates
num_effective_date_duplicates = df['effective_date_duplicate'].sum()

# Calculate the percentage of duplicates
if effective_date_col:
    percent_effective_date_duplicates = num_effective_date_duplicates / df.shape[0] * 100
else:
    percent_effective_date_duplicates = 0.0

In [None]:
# Convert the effective date column to datetime format if it exists in the DataFrame
if effective_date_col in df.columns:
    df[effective_date_col] = pd.to_datetime(df[effective_date_col], errors='coerce')

    # Find the index of the effective date column
    col_index = df.columns.get_loc(effective_date_col)

    # Create a new output flag column with values True if the effective date is prior to 2020, False otherwise
    df.insert(loc=col_index + 1, column='effective_date_prior_to_2020', value=df[effective_date_col] < '2020-01-01')

    # Calculate the percentage of records with the output flag set to True
    percent_records = df['effective_date_prior_to_2020'].mean() * 1

    print(f"Percentage of records with effective date prior to 2020: {percent_records}")

    # Count the number of records with effective date prior to 2020
    count_records = df[df['effective_date_prior_to_2020'] == True].shape[0]

    print(f"Count of records with effective date prior to 2020: {count_records}")
else:
    print('No effective date column found in the DataFrame. Count and percentages will be reported as 0.')
    effective_date_col = None

In [None]:
# Create a dataframe with the results
duplicate_results = {
    'Duplicate Addresses': [num_address_duplicates],
    'Percentage of Duplicate Addresses': [percent_address_duplicates / 100],
    'Duplicate Lat/Long Pairs': [num_lat_long_duplicates],
    'Percentage of Duplicate Lat/Long Pairs': [percent_lat_long_duplicates / 100],
    'Duplicate Effective Dates': [num_effective_date_duplicates],
    'Percentage of Effective Date Duplicates': [percent_effective_date_duplicates / 100],
    'Out of Contiguous 48 States' :[total_count - lower_48_count],
}

if effective_date_col is not None:
    duplicate_results['Percentage Before 2020'] = [percent_records]
    duplicate_results['Count Before 2020'] = [count_records]

In [None]:
# Check if 'Effective Date' exists and concatenate it with the address
if 'Effective Date' in df.columns:
    df['Full_Address_with_Date'] = df['Concatenated Address'].astype(str) + ' ' + df['Effective Date'].astype(str)
    df['Address_with_Date_Duplicate'] = df.duplicated(subset=['Full_Address_with_Date'], keep=False)
    num_address_date_duplicates = df['Address_with_Date_Duplicate'].sum()
    percent_address_date_duplicates = (num_address_date_duplicates / total_count) * 100
else:
    df['Address_with_Date_Duplicate'] = False
    num_address_date_duplicates = 0
    percent_address_date_duplicates = 0

# Add the new calculations to the duplicate_results dictionary
duplicate_results['Duplicate Address with Date'] = [num_address_date_duplicates]
duplicate_results['Percentage of Duplicate Address with Date'] = [percent_address_date_duplicates / 100]

# Create a dataframe with the results
duplicate_results_df = pd.DataFrame(duplicate_results)


In [None]:
# Create a file dialog to allow the user to select the save location
root = tk.Tk()
root.withdraw()
default_file_name = file_name_without_extension + "_duplicate_QA_Check.xlsx"
file_path = filedialog.asksaveasfilename(defaultextension='.xlsx', initialfile=default_file_name)

print("Original sheet with new name created...wait for QA data to process")

# Write the original data and the results to separate sheets in a new Excel file
with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
    df.to_excel(writer, sheet_name='Original Data', index=False)
    duplicate_results_df.to_excel(writer, sheet_name='Duplicate Results Report', index=False)

print("QA data processing...")      
print("Excel write task completed.")