Imported Libaries

In [4]:
import pandas as pd
import chardet
import os
from scipy.stats import zscore
pd.options.mode.chained_assignment = None
from sqlalchemy import create_engine, text, inspect
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from IPython.display import display as original_display
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib.colors as mcolors
import numpy as np
import seaborn as sns
import inspect
import re
import string
import warnings
warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)

Standard Functions

In [None]:
# Function to clean labels in any plot functions
def clean_label(label):
    try:
        return label.replace('_', ' ').title()
    except AttributeError as e:
        print(f'Error cleaning label: {e}')
        return label
 
# Function for getting the name of a DataFrame
def get_var_name(var):
    try:
        for name, value in globals().items():
            if value is var:
                return name
    except Exception as e:
        print(f'Error getting variable name: {e}')
    return None

# Function to validate the data in a DataFrame
def validate_data(df, show_counts=True):
    try:
        df_name = get_var_name(df)
        print(f'#########################################################################################################################################################################################\nDataFrame: {df_name}')
        # Snapshot the dataset
        display(df)
        # Check for unique values
        unique_counts = pd.DataFrame(df.nunique())
        unique_counts = unique_counts.reset_index().rename(columns={0:'No. of Unique Values', 'index':'Field Name'})
        print("Unique values per field:")
        pd.set_option('display.max_rows', None)
        display(unique_counts)
        pd.reset_option('display.max_rows')
        # Checking for duplicates
        duplicate_count = df.duplicated().sum()
        print("\nNumber of duplicate rows:")
        print(duplicate_count,'\n')
        info = df.info(show_counts=show_counts)
        display(info)
        # Summary stats
        print("\nSummary statistics:")
        display(df.describe())
        print('End of data validation\n#########################################################################################################################################################################################\n')
    except Exception as e:
        print(f'Error validating data: {e}')
 
# Function to provide list for data sources as a DataFrame when conducting analysis
def header_list(df):
    try:
        df_list_ = df.copy()
        df_list = df_list_.columns.tolist()
        df_list = pd.DataFrame(df_list)
        new_header = df_list.iloc[0]  # Get the first row for the header
        df_list = df_list[1:]  # Take the data less the header row
        df_list.columns = new_header  # Set the header row as the df header
        df_list.reset_index(drop=True, inplace=True)  # Reset index
        return df_list
    except Exception as e:
        print(f'Error creating header list: {e}')
        return pd.DataFrame()
 
def query_data(schema, data):
    try:
        # Define the SQL query
        query = f'SELECT * FROM [{schema}].[{data}]'
        # Load data into DataFrame
        df = pd.read_sql(query, engine)
        print(f'Successfully imported {data}')
        return df
    except Exception as e:
        print(f'Error querying data: {e}')
        return pd.DataFrame()

def display(df):
    try:
        frame = inspect.currentframe().f_back
        name = "Unnamed DataFrame"
        for var_name, var_value in frame.f_locals.items():
            if var_value is df:
                name = var_name
                break
        if name not in {'df', 'Unnamed DataFrame', 'unique_counts'}:
            print(f"DataFrame: {name}")
        original_display(df)
    except Exception as e:
        print(f'Error displaying DataFrame: {e}')

def unique_values(df, display_df=True):
    try:
        unique_values = {col: df[col].unique() for col in df.columns}
        max_length = max(len(values) for values in unique_values.values())
        unique_df_data = {}
        for col, values in unique_values.items():
            unique_df_data[col] = list(values) + [None] * (max_length - len(values))
        unique_df = pd.DataFrame(unique_df_data)
        if display_df:
            pd.set_option('display.max_rows', None)
            display(unique_df.head(100))
            pd.reset_option('display.max_rows')
        return unique_df
    except Exception as e:
        print(f'Error extracting unique values: {e}')
        return pd.DataFrame()

def export_to_csv(df, **kwargs):
    try:
        # Obtaining wanted directory
        directory = kwargs.get('directory',r"C:\Users\jf79\OneDrive - Office Shared Service\Documents\H&F Analysis\Python CSV Repositry")
        
        # Obtaining name of DataFrame
        df_name = kwargs.get('df_name',get_var_name(df))
        if not isinstance(df_name, str) or df_name == '_':
                df_name = input('Dataframe not found in global variables. Please enter a name for the DataFrame: ')

        file_path = f'{directory}\\{df_name}.csv'

        print(f'Exproting {df_name} to CSV...\n@ {file_path}\n')
        df.to_csv(file_path, index=False)
        print(f'Successfully exported {df_name} to CSV')
    except Exception as e:
        print(f'Error exporting to CSV: {e}')

In [None]:

def apply_features(df, date='count_date', **kwargs):
    print('Applying features...')
    try:
        df[date] = pd.to_datetime(df[date])
        df['year'] = df[date].dt.year
        df['day_name'] = df[date].dt.dayofweek

        day_dict = {
            '0':['Monday','Weekday'],
            '1':['Tuesday','Weekday'],
            '2':['Wednesday','Weekday'],
            '3':['Thursday','Weekday'],
            '4':['Friday','Weekday'],
            '5':['Saturday','Weekend'],
            '6':['Sunday','Weekend']
        }

        df['day_name'] = df['day_name'].astype(str)
        df['week_name'] = df['day_name'].map(lambda x: day_dict[x][1])
        df['day_name'] = df['day_name'].map(lambda x: day_dict[x][0])
        
        time = kwargs.get('time', False)
        if time:
            try:
                time_dict = {
                    '00-03':'6pm-6am',
                    '03-06':'6pm-6am',
                    '06-09':'6am-6pm',
                    '09-12':'6am-6pm',
                    '12-15':'6am-6pm',
                    '15-18':'6am-6pm',
                    '18-21':'6pm-6am',
                    '21-24':'6pm-6am'
                }
                df['day_night'] = df[time].map(time_dict)
            except KeyError as e:
                print(f'Invalid time column: {e}')
        
        print('Features applied.')
        return df
    except Exception as e:
        print(f'Error applying features: {e}')
        return pd.DataFrame()


def detect_anomalies(df, **kwargs):
    print('Detecting anomalies...')
    try:
        used_keys = {
            'footfall_type','day_night',
            'agg','std','primary_key'
        }
        redundant_kwargs = set(kwargs.keys()) - used_keys
        if redundant_kwargs:
            print(f'Redundant kwargs: {redundant_kwargs}')
            return pd.DataFrame()
        kwargs = {key: kwargs.get(key, f'default_value_{key}') for key in used_keys}
        
        footfall_type = kwargs.get('footfall_type')
        agg = kwargs.get('agg')
        categories = [
            'count_date',f'{footfall_type}_{agg}',
            'zscore','year','is_anomaly?',
            'day_name','week_name','day_night'
        ]
        keywords = [
            kwargs.get('primary_key'),
        ]
        for keyword in keywords:
            if keyword:
                if type(keyword) is not list:
                    keyword = [keyword]
                for word in keyword:
                    categories = categories + [f'{word}']

        std = kwargs.get('std', 3)
        anomalies = df.copy()
        anomalies['zscore'] = anomalies.groupby(keywords)[f'{footfall_type}_{agg}'].transform(zscore)
        anomalies['is_anomaly?'] = (anomalies['zscore'] < -std) | (anomalies['zscore'] > std)
        num_anomalies = anomalies['is_anomaly'].sum()
        print(f'{num_anomalies} anomalies have been detected.')

        anomalies = anomalies[categories]
        anomalies['moving_average'] = anomalies.groupby(keywords)[f'{footfall_type}_{agg}'].transform(lambda x: x.rolling(window=7).mean())
        anomalies['corrected_value'] = np.where(
            anomalies['is_anomaly?'],
            anomalies['moving_average'],anomalies[f'{footfall_type}_{agg}']
        )
        anomalies['corrected_ma_monthly'] = anomalies.groupby(keywords)['corrected_value'].transform(lambda x: x.rolling(window=30).mean())
        anomalies['corrected_ma_weekly'] = anomalies.groupby(keywords)['corrected_value'].transform(lambda x: x.rolling(window=7).mean())

        print('Anomalies have been flagged and corrected.')
        return anomalies
    except Exception as e:
        print(f'Error detecting anomalies: {e}')
        return pd.DataFrame()

def agg_footfall_data(df, **kwargs):
    print('Aggregating footfall data...')
    try:
        used_keys = {
            'category','day_night',
            'agg', 'footfall_type'
        }
        redundant_kwargs = set(kwargs.keys()) - used_keys
        if redundant_kwargs:
            print(f'Redundant kwargs: {redundant_kwargs}')
            return pd.DataFrame()
        unused_keys = set(used_keys) - set(kwargs.keys())
        if unused_keys:
            print(f'Missing kwargs: {unused_keys}\nThese args will be set to default values')
        
        df = apply_features(df, time='time_indicator')

        merge_list = ['day_name','week_name','day_night','count_date']
        new_categories = [
            'count_date','day_name','week_name','day_night',
            'corrected_ma_monthly','corrected_ma_weekly',
            'corrected_value',
        ]
        category = kwargs.get('category')
        keywords = [
            kwargs.get('category')
        ]
        for keyword in keywords:
            if keyword:
                if not isinstance(keyword, list):
                    keyword = [keyword]
                for word in keyword:
                    merge_list = [f'{word}'] + merge_list
                    new_categories = new_categories + [f'{word}']
        
        agg = kwargs.get('agg','sum')
        agg_data = df.groupby(merge_list + ['year']).agg(
            residents_sum = ('resident',f'{agg}'),
            workers_sum = ('worker',f'{agg}'),
            visitors_sum = ('visitor',f'{agg}'),
            loyalty = ('loyalty_percentage','mean'),
            dwell_time = ('dwell_time',f'{agg}')
        )
        agg_data = agg_data.reset_index()
        agg_data = agg_data.sort_values(
            ['count_date'],
            ascending=False
        )

        default_values = ['residents','workers','visitors']
        footfall_type = kwargs.get('footfall_type', default_values)
        anomalies = {}
        i = 0
        for footfall in footfall_type:
            if footfall not in default_values:
                raise KeyError(f'Invalid footfall type: [{footfall}]')
        for footfall in footfall_type:
            i = i + 1
            anomalies[f'{footfall}_z'] = detect_anomalies(agg_data,footfall_type=footfall,std=2.6,primary_key=category,agg=agg)
            if i > len(footfall_type)-1:
                new_categories = new_categories + ['year']
            anomalies[f'{footfall}_z'] = anomalies[f'{footfall}_z'][new_categories]

        footfall_data = pd.merge(
            anomalies['residents_z'], anomalies['workers_z'],
            how='left', on=merge_list,
            suffixes=['_residents','_workers']
        ).merge(
            anomalies['visitors_z'],
            how='left', on=merge_list,
        ).rename(columns={
                'corrected_value':'corrected_value_visitors',
                'corrected_ma_monthly':'corrected_ma_monthly_visitors',
                'corrected_ma_weekly':'corrected_ma_weekly_visitors'
            }
        )
        for footfall in footfall_type:
            if footfall not in default_values:
                raise KeyError(f'Invalid footfall type: [{footfall}]')
        for footfall in footfall_type:
            footfall_data['corrected_value_total'] = 0
            footfall_data['corrected_value_total'] = footfall_data['corrected_value_total'] + footfall_data[f'corrected_value_{footfall}']
        footfall_data['corrected_value_total'].fillna(0, inplace=True)
        footfall_data['corrected_ma_monthly_total'] = footfall_data.groupby(keywords)['corrected_value_total'].transform(lambda x: x.rolling(window=30).mean())
        footfall_data['corrected_ma_weekly_total'] = footfall_data.groupby(keywords)['corrected_value_total'].transform(lambda x: x.rolling(window=7).mean())

        print('Footfall Data Aggregated.')
        return footfall_data
    except Exception as e:
        print(f'Error aggregating footfall data: {e}')
        return pd.DataFrame()

def transform_to_daynight(df, **kwargs):
    print('Transforming to daynight...')
    try:
        category = kwargs.get('category',False)
        index = ['count_date','year','day_name','week_name']
        if category:
            index = index + [category]
        transform = df.pivot_table(
            index =index,
            columns='day_night',
            values='corrected_value_total'
        ).reset_index()
        return transform
    except Exception as e:
        print(f'Error transforming to daynight: {e}')
        return pd.DataFrame()

In [78]:
def typical_footfall(footfall_data, start, end, **kwargs):
    print('Calculating typical daily footfall...\nFor Weedays and Weekends and Weekly averages...')
    columns = [
        'OID_','Col_ID','Row_ID','Hex_ID',
        'Centroid_X','Centroid_Y','area',
        'Shape_Length','Shape_Area'
    ]
    for column in columns:
        if column in footfall_data.columns:
            footfall_data = footfall_data.drop(columns=column)

    footfall_data['count_date'] = pd.to_datetime(footfall_data['count_date'])

    footfall_data = footfall_data[
        (footfall_data['count_date'] <= pd.to_datetime(end)) &
        (footfall_data['count_date'] >= pd.to_datetime(start))]
    

    columns_to_fill = [
        'resident','worker','visitor',
        'loyalty_percentage','dwell_time'
    ]
    footfall_data.loc[:, columns_to_fill] = footfall_data[columns_to_fill].applymap(lambda x: np.nan if x < 0 else x)
    footfall_data[columns_to_fill] = footfall_data[columns_to_fill].fillna(0)
    footfall_data = footfall_data.sort_values(by=['count_date','time_indicator','hex_id'])
    

    footfall_data = agg_footfall_data(
        footfall_data,
        category=kwargs.get('category','hex_id'),
        agg=kwargs.get('agg','sum'),
        footfall_type=kwargs.get('footfall_type',['residents','workers','visitors'])
    )
    
    if kwargs.get('day_night',False):
        footfall_data = transform_to_daynight(footfall_data, category='hex_id')
        averages = footfall_data.copy()
        averages = averages.groupby(['year','week_name','hex_id']).agg(
            daytime_mean = ('6am-6pm','mean'),
            nighttime_mean = ('6pm-6am','mean')
        ).reset_index()
        weekday = averages[averages['week_name'] == 'Weekday']
        weekend = averages[averages['week_name'] == 'Weekend']
        typical = footfall_data.groupby(['year','hex_id']).agg(
            daytime_mean = ('6am-6pm','mean'),
            nighttime_mean = ('6pm-6am','mean')
        ).reset_index()
    else:
        averages = footfall_data.copy()
        averages = averages.groupby(['year','week_name','hex_id']).agg(
            averages = ('corrected_value_total','mean'),
        ).reset_index()
        weekday = averages[averages['week_name'] == 'Weekday']
        weekend = averages[averages['week_name'] == 'Weekend']
        typical = footfall_data.groupby(['year','hex_id']).agg(
            averages = ('corrected_value_total','mean'),
        ).reset_index()

    typical_footfall = {
        0 : typical,
        1 : weekday,
        2 : weekend
    }

    return typical_footfall

Database and CWD setup and connection

In [8]:
# Database credentials
db_host = 'LBHHLWSQL0001.lbhf.gov.uk'
db_port = '1433'
db_name = 'IA_ODS'

# Create the connection string for SQL Server using pyodbc with Windows Authentication
connection_string = f'mssql+pyodbc://@{db_host}:{db_port}/{db_name}?driver=ODBC+Driver+17+for+SQL+Server&Trusted_Connection=yes'

# Create the database engine
engine = create_engine(connection_string)

# Define the current working directory
cwd = r'C:\Users\jf79\OneDrive - Office Shared Service\Documents\H&F Analysis\Footfall and Spend Analysis\Footfall Data\LSOA Based'
os.chdir(cwd)
files = os.listdir(os.getcwd())
print("Files in %r: %s" % (cwd, files))

Files in 'C:\\Users\\jf79\\OneDrive - Office Shared Service\\Documents\\H&F Analysis\\Footfall and Spend Analysis\\Footfall Data\\LSOA Based': ['lsoa_hourly_counts_2022_H1.csv', 'lsoa_hourly_counts_2022_H2.csv', 'lsoa_hourly_counts_2024_H2.csv']


In [9]:
footfall_2024_Hex = pd.read_csv('C:/Users/jf79/OneDrive - Office Shared Service/Documents/H&F Analysis/Footfall and Spend Analysis/Footfall Data/Hex Based/Footfall Counts/hex_3hourly_counts_2024.csv')
relevant_hexes = pd.read_csv('C:/Users/jf79/OneDrive - Office Shared Service/Documents/H&F Analysis/Footfall and Spend Analysis/Footfall Data/Hex Based/Relevant Hexes/Relevant Hexes.csv')

In [10]:
footfall_data_2024 = footfall_2024_Hex.copy()
relevant_hexes_data = relevant_hexes.copy()

footfall_data_2024 = pd.merge(
    relevant_hexes_data,
    footfall_2024_Hex,
    left_on='Hex_ID',
    right_on='hex_id',
    how='left'
)

In [96]:
# Transform the data to Annual/Quarterly averages
# def annual_quarterly_averages(df, start, end):
#     typical_day_averages = typical_footfall(
#         df, f'{start}', f'{end}'
#     )
#     for i in range(len(typical_day_averages)):
#         display(typical_day_averages[i].sort_values(by='averages', ascending=False))

typical_day_averages = typical_footfall(
    footfall_data_2024, '2024-01-01', '2024-12-31',
    footfall_type=['residents','workers','visitors'],
    day_night=True
)
for i in range(len(typical_day_averages)):
    if 'averages' in typical_day_averages[i].columns.to_list():
        display(typical_day_averages[i].sort_values(by='averages', ascending=False))
    elif 'daytime_mean' in typical_day_averages[i].columns.to_list():
        display(typical_day_averages[i].sort_values(by='daytime_mean', ascending=False))
    else:
        display(typical_day_averages[i])

Calculating typical daily footfall...
For Weedays and Weekends and Weekly averages...
Aggregating footfall data...
Missing kwargs: {'day_night'}
These args will be set to default values
Applying features...
Detecting anomalies...
Detecting anomalies...
Detecting anomalies...
Transforming to daynight...


Unnamed: 0,year,hex_id,daytime_mean,nighttime_mean
114,2024,11281238,34128.001171,15796.983607
92,2024,11271229,10545.346995,5125.666667
91,2024,11271227,10249.332943,7134.401249
180,2024,11341220,5803.598361,3177.352459
73,2024,11261228,5639.672131,2039.199454
...,...,...,...,...
53,2024,11241248,19.455894,8.875878
86,2024,11271217,9.000390,1.226776
87,2024,11271219,8.975020,3.305621
67,2024,11251247,5.265027,0.535909


Unnamed: 0,year,week_name,hex_id,daytime_mean,nighttime_mean
114,2024,Weekday,11281238,29883.679389,15217.362595
92,2024,Weekday,11271229,10925.576336,5139.297710
91,2024,Weekday,11271227,10133.927481,7074.477099
107,2024,Weekday,11281224,6506.441658,1749.984733
63,2024,Weekday,11251239,6353.250273,1563.369138
...,...,...,...,...,...
53,2024,Weekday,11241248,22.681570,10.139586
87,2024,Weekday,11271219,8.763904,4.140676
86,2024,Weekday,11271217,8.085605,1.465649
67,2024,Weekday,11251247,7.037077,0.702835


Unnamed: 0,year,week_name,hex_id,daytime_mean,nighttime_mean
319,2024,Weekend,11281238,44820.427198,17257.182692
296,2024,Weekend,11271227,10540.065934,7285.364011
297,2024,Weekend,11271229,9587.461538,5091.326923
318,2024,Weekend,11281236,5818.270604,3531.192308
385,2024,Weekend,11341220,5739.125000,3284.516484
...,...,...,...,...,...
258,2024,Weekend,11241248,11.329670,5.692308
291,2024,Weekend,11271217,11.304945,0.625000
292,2024,Weekend,11271219,9.506868,1.201923
272,2024,Weekend,11251247,0.800824,0.115385


In [99]:
export_to_csv(typical_day_averages[2])

None
Successfully exported weekend_hf (annual) (day_night) to CSV


: 