In [2]:
import pandas as pd
import json
import os
import re

path = 'C:\\Users\\natha\\PycharmProjects\\gambling_premises_uk\\config\\2024_premises-licence-register_metadata.json'

with open(path, 'r') as file:
    data = json.load(file)

data.keys()

dict_keys(['imputation_rules', 'deduplication_rules', 'replacement_rules'])

In [3]:
path ='C:\\Users\\natha\\.cache\\kagglehub\\datasets\\nathanhg\\uk-gam-datasets\\versions\\1'
premises = '2024_premises-licence-register.csv'

# Read files into memory 

premises_df = pd.read_csv(os.path.join(path,premises))

Check column count

In [4]:
df = premises_df
# log discrepencies
columns_included = len(data['imputation_rules'].keys())
columns_expected = len(df.columns)
if columns_expected - columns_expected != 0:
    print('Analyst has not included all columns in imputation rules')
else:
    print('All columns included in imputation rules')

All columns included in imputation rules


In [5]:
def header_standardization(df):
    """
    Standardizes column names based on BigQuery lexical structure and syntax:
    https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#column_names
    Column names are changed if they violate the rules.
    
    Parameters:
        column_names (list): The list of original column names to standardize.
    
    Returns:
        list: The list of standardized column names.
    """
    column_names = df.columns
    # Define banned prefixes
    banned_prefixes = ['_TABLE_', '_FILE_', '_PARTITION', '_ROW_TIMESTAMP', '__ROOT__', '_COLIDENTIFIER']
    banned_prefixes = [prefix.lower() for prefix in banned_prefixes]

    # Initialize storage for validated column names and duplicates
    validated = []
    seen = set()

    for i, name in enumerate(column_names):
        # Ensure name is a string and convert to lowercase
        name = str(name).strip().lower()

        # Replace spaces with underscores and keep valid characters only
        name = re.sub(r'[^a-z0-9_]', '', name.replace(' ', '_'))

        # Enforce max length of 300 characters
        name = name[:300]

        # Handle banned prefixes
        for prefix in banned_prefixes:
            if name.startswith(prefix):
                name = name.replace(prefix, 'default', 1)

        # Prepend "column" if the name starts with a digit or is empty
        if not name or name[0].isdigit():
            name = f"column_{name}"

        # Resolve duplicates by appending a counter
        original_name = name
        counter = 1
        while name in seen:
            name = f"{original_name}_{counter}"
            counter += 1

        # Add the standardized name to the result and mark it as seen
        validated.append(name)
        seen.add(name)
    df.columns = validated
    return None


In [6]:
import pandas as pd

def clean_and_impute_data(df, metadata):
    """
    Cleans and imputes data based on provided metadata.

    Args:
        df (pd.DataFrame): The DataFrame to process.
        metadata (dict): Metadata for cleaning, specifying column types and imputation strategies.
    
    Returns:
        pd.DataFrame: The cleaned and imputed DataFrame.
    """
    for column, rules in metadata.items():
        col_type = rules.get("type")
        impute_method = rules.get("method")
        reference_column = rules.get("reference_column")

        # Handle data type conversions based on rules
        if col_type == "categorical":
            if df[column].dtype != 'object':  # Only convert if not already a string
                df[column] = df[column].astype(str)
                print(f'{column} converted to string')
        elif col_type == "numeric":
            if df[column].dtype != 'float64':  # Only convert if not already numeric
                df[column] = pd.to_numeric(df[column], errors='coerce')
                print(f'{column} converted to numeric')
        elif col_type == "datetime":
            if df[column].dtype != 'datetime64[ns]':  # Check if already datetime
                df[column] = pd.to_datetime(df[column], format='%Y-%m-%d %H:%M:%S', errors='coerce')
                print(f'{column} converted to datetime')
        elif col_type == "date":
            if not pd.api.types.is_datetime64_any_dtype(df[column]):  # Check for datetime type first
                df[column] = pd.to_datetime(df[column], format='%Y-%m-%d', errors='coerce').dt.date
                print(f'{column} converted to date')

        # Impute missing values based on specified method
        if impute_method == "NMAR":
            df.fillna({column:"Missing"}, inplace=True)
            print(f'Filled nulls in {column} with "Missing"')
        elif impute_method == "MAR":
            if reference_column:
                # Use the reference column to fill missing values
                df[column] = df.groupby(reference_column)[column].transform(
                    lambda x: x.ffill().bfill() if x.isnull().any() else x
                )
                print(f'Filled nulls in {column} using forward/backward fill based on {reference_column}')
            else:
                print(f'Skipping imputation for {column} due to missing reference_column')
        else:
            print(f'No imputation strategy specified for {column}')
    
    return df



In [7]:
header_standardization(df)

In [8]:
df[df['address_line_2'].isna()].shape[0]

20

In [9]:
clean_and_impute_data(df,data['imputation_rules'])
df[df['address_line_2'].isna()].shape[0]

account_number converted to numeric
Filled nulls in account_number using forward/backward fill based on account_name
Filled nulls in account_name using forward/backward fill based on account_number
Filled nulls in premises_activity with "Missing"
Filled nulls in local_authority with "Missing"
Filled nulls in address_line_1 with "Missing"
Filled nulls in address_line_2 with "Missing"
Filled nulls in city using forward/backward fill based on account_number
Filled nulls in postcode using forward/backward fill based on account_number


0

In [10]:
data['deduplication_rules']

{'deduplicate': True}

In [11]:
df.shape[0]

2660

In [12]:
def deduplicate(df, metadata):
    """
    Handles deduplication for the entire dataset based on metadata.
    Checks for duplicates before attempting to drop any.
    
    Args:
        df (pd.DataFrame): Input DataFrame.
        metadata (dict): Metadata defining deduplication rules.
    
    Returns:
        pd.DataFrame: DataFrame after applying deduplication rules.
    """
    deduplication_required = metadata.get("deduplication_rules", {}).get("deduplicate", False)
    if deduplication_required:
        # Check if there are any duplicate rows in the DataFrame
        duplicate_count = df.duplicated().sum()
        
        if duplicate_count > 0:
            # If duplicates are found, drop them
            df = df.drop_duplicates(keep="first")
            print(f"Dropped {duplicate_count} duplicate rows.")
    else:
        print("No duplicates to drop.")
    
    return None

In [13]:
deduplicate(df,data['deduplication_rules'])

No duplicates to drop.


In [14]:
data['replacement_rules']

{'premises_activity': {'Casino 2005': 'Casino'},
 'address_line_1': {"caesar's palace": 'caesars palace',
  'boylesport': 'boylesports',
  'coastal amusements limited': 'coastal amusements',
  'carousel amusements limited': 'carousel amusements',
  'blue anchor leisure limited': 'blue anchor leisure',
  'betting shop operations limited': 'betting shop operations',
  'betextra': 'bet extra',
  'betszone': 'betzone',
  'a & s leisure group limited': 'a & s leisure group',
  'a & s leisure': 'a & s leisure group',
  'cashino gaming limited': 'cashino gaming'},
 'address_line_2': {'coral island, unit 2 promenade': 'coral island, unit 2, promenade',
  '98a -99 high street, gorleston': '98a-99 high street, gorleston',
  '36 -42 marine terrace ': '36-42 marine terrace'},
 'city': {'Middlesborough': 'Middlesbrough',
  'Burton-on-trent': 'Burton-upon-trent',
  'Blaydon-777upon-tyne': 'Blaydon-on-tyne',
  'Bury': 'Bury st. edmunds',
  'Lytham': 'Lytham st. annes',
  'Richmond': 'Richmond-upon-th

In [15]:
def clean_strings(input_logger,df, df_name, metadata):
    """
    Cleans string columns in a DataFrame by applying deduplication rules and logging changes.
    Generates a cleaning audit DataFrame with details of changes.

    Args:
        input_logger (logging.Logger): Logger for logging info and debug messages.
        df (pd.DataFrame): The input DataFrame to clean.
        df_name (str): Name of the DataFrame for logging purposes.
        metadata (dict): Dictionary with deduplication rules per column.

    Returns:
        pd.DataFrame: Cleaned DataFrame.
        pd.DataFrame: Cleaning audit DataFrame with columns: ['column', 'old_value', 'new_value', 'occurrences', 'status'].
    """
    input_logger.info(f"----------- Checking for deduplication {df_name} ---------")
    audit_data = []

    for column, replacements in (metadata or {}).items():
        if column not in df.columns:
            input_logger.warning(f"Column '{column}' not found in DataFrame. Skipping...")
            continue

        df[column] = df[column].astype(str).apply(lambda x: x.strip().lower())

        for old_value, new_value in replacements.items():
            old_value = old_value.strip().lower()
            new_value = new_value.strip().lower()
            occurrences = (df[column] == old_value).sum()
            status = "Not Replaced" if occurrences == 0 else "Replaced"

            audit_data.append({
                "column": column,
                "old_value": old_value,
                "new_value": new_value,
                "occurrences": occurrences,
                "status": status
            })

            if occurrences > 0:
                df[column] = df[column].replace({old_value: new_value.title()}, regex=True)

    audit_df = pd.DataFrame(audit_data)
    input_logger.info('Completed all replacements.')

    return audit_df

In [16]:
from google.cloud import storage
import os
import json

def read_gcs_file(bucket_name, blob_name):
    """Write and read a blob from GCS using file-like IO"""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    print(f"----------- Reading {blob_name} from GCS ---------")
    with blob.open("r" ) as f:  #encoding="utf-8" add if needed here
        if ".csv" in blob_name:
            try:
                df = pd.read_csv(f,encoding="utf-8")
                print("Successfully read CSV file to pd.DataFrame")
                return df
            except Exception as e:
                 print(f"Unsuccessful CSV read to pd.DataFrame. {e}")
        
        elif ".xlsx" in blob_name:
            try:
                df = pd.read_excel(f,encoding="utf-8")
                print("Successfully read XLSX file to pd.DataFrame")
                return df
            except Exception as e:
                 print(f"Unsuccessful XLSX read to pd.DataFrame. {e}")
    
        elif ".json" in blob_name:
            try:
                json_data = json.load(f)
                print("Successfully read JSON file to memory")
                return json_data
            except Exception as e:
                print(f"Unsuccessful JSON read {e}")
        else:
            print(f"The blob_name must be a '.csv', an '.xlsx' or a '.json' file")
            return None


In [None]:
bucketname = ''
name = '2024_premises-licence-register'
meta_directory = "META"
meta_tag = "_metadata.json"

# read_gcs_file(bucketname,'META/2024_premises-licence-register_metadata.json')
meta = read_gcs_file(bucketname,os.path.join(meta_directory, f"{name}{meta_tag}").replace("\\", "/"))

----------- Reading META/2024_premises-licence-register_metadata.json from GCS ---------
Successfully read JSON file to memory


In [37]:
name = 'combined_audit'
meta_directory = "AUDITS" 
extension = '.csv'

print(os.path.join(meta_directory, f"{name}").replace("\\", "/"))
read_gcs_file(bucketname,os.path.join(meta_directory,f"{name}{extension}").replace("\\", "/"))

AUDITS/combined_audit
----------- Reading AUDITS/combined_audit.csv from GCS ---------
Successfully read CSV file to pd.DataFrame


Unnamed: 0,table,column,action,original_value,new_value,occurrences,status
0,2024_premises-licence-register,premises_activity,string_replacement,casino 2005,casino,1,Replaced
1,2024_premises-licence-register,address_line_1,string_replacement,caesar's palace,caesars palace,1,Replaced
2,2024_premises-licence-register,address_line_1,string_replacement,boylesport,boylesports,1,Replaced
3,2024_premises-licence-register,address_line_1,string_replacement,coastal amusements limited,coastal amusements,10,Replaced
4,2024_premises-licence-register,address_line_1,string_replacement,carousel amusements limited,carousel amusements,1,Replaced
5,2024_premises-licence-register,address_line_1,string_replacement,blue anchor leisure limited,blue anchor leisure,1,Replaced
6,2024_premises-licence-register,address_line_1,string_replacement,betting shop operations limited,betting shop operations,9,Replaced
7,2024_premises-licence-register,address_line_1,string_replacement,betextra,bet extra,1,Replaced
8,2024_premises-licence-register,address_line_1,string_replacement,betszone,betzone,1,Replaced
9,2024_premises-licence-register,address_line_1,string_replacement,a & s leisure group limited,a & s leisure group,1,Replaced
