<a href="https://colab.research.google.com/github/DrAdamDev/ETL-pipeline-for-UK-Employment-data/blob/main/ETL_pipeline_for_UK_regional_employment_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install mysql-connector-python

In [None]:
# Import necessary modules
import os
import socket
import sqlite3
import requests
import pandas as pd
import mysql.connector
from sqlite3.dbapi2 import OperationalError

In [None]:
# Define helper functions

def extract_excel_data(url):
    try:
        # Check the response was successful
        response = requests.get(url)
        response.raise_for_status()  

        # Load each sheet as a separate DataFrame
        dfs = pd.read_excel(response.content, sheet_name=None)
        return dfs
    except (requests.exceptions.RequestException, pd.errors.ParserError) as e:
        print("Error occurred during data extraction:", str(e))
        return None

def delete_meta_data(data_dict):
    if 'Information' in data_dict:
        del data_dict['Information']
    if 'Contents' in data_dict:
        del data_dict['Contents']

def assign_new_column_names(region, new_names, sheet):
    try:
        region.rename(columns=new_names, inplace=True)
        region.insert(loc=0, column='Region_Name', value=sheet)
    except KeyError as e:
        print("Error occurred during assigning new column names:", str(e))
        print("Check that the key values of new_names correspond to the column names")

def drop_redundant_columns(region):
    for column in region.columns:
        if 'Unnamed' in column:
            region.drop(column, axis=1, inplace=True)

def drop_redundant_rows(region, redundant_rows):
    region.drop(redundant_rows, inplace=True, errors='ignore')

def commit_row_changes(region):
    region.reset_index(drop=True, inplace=True)

def process_non_numerical_values(region):
    for column in region.columns:
        for index, entry in region[column].items():
            if entry == '-':
                region[column][index] = 0
            elif entry == '*':
                region[column][index] = None

def update_column_dtypes(region, new_dtypes):
    try:
        region = region.astype(new_dtypes)
    except KeyError as e:
        print("Error occurred during assigning new column names:", str(e))
        print("Check that the key values of new_dtypes correspond to the column names")

def generate_indentifiers(dataframe):
    try:
        # Create Region, BIG, and Region_BIG IDs
        dataframe['Region_ID'] = dataframe['Region_Name'].factorize()[0]
        dataframe['BIG_ID'] = dataframe['BIG_Name'].factorize()[0]
        dataframe['Region_BIG_ID'] = pd.MultiIndex.from_frame(dataframe[['Region_Name', 'BIG_Name']]).factorize()[0]
    except Exception as e:
        print("Error occurred during identifier generation:", str(e))

def load_into_database(dataframe):
    try:
        df_Region = dataframe[['Region_Name']]
        df_Region = df_Region.groupby('Region_Name').first().reset_index()
        df_BIG = dataframe[['BIG_Name']]
        df_BIG = df_BIG.groupby('BIG_Name').first().reset_index()
        df_Region_BIG = dataframe[['Region_ID', 'BIG_ID']]
        df_FT_Employees =  dataframe[['FT_Public', 'FT_Private', 'FT_Pub_Priv']]
        df_PT_Employees = dataframe[['PT_Public', 'PT_Private', 'PT_Pub_Priv']]
        df_FTPT_Employees = dataframe[['FTPT_Public', 'FTPT_Private', 'FTPT_Pub_Priv']]
        df_All_Employees = dataframe[['All_Public', 'All_Private', 'All_Pub_Priv']]

        # Execute database loading within a single try block
        try:
            df_Region.to_sql('Region', conn, if_exists='replace', index='Region_ID')
            df_BIG.to_sql('BIG', conn, if_exists='replace', index='BIG_ID')
            df_Region_BIG.to_sql('Region_BIG', conn, if_exists='replace', index='Region_BIG_ID')
            cursor.execute("DROP TABLE IF EXISTS FT_Employees;")
            df_FT_Employees.to_sql('FT_Employees', conn, if_exists='replace', index='Region_BIG_ID')
            cursor.execute("DROP TABLE IF EXISTS PT_Employees;")
            df_PT_Employees.to_sql('PT_Employees', conn, if_exists='replace', index='Region_BIG_ID')
            cursor.execute("DROP TABLE IF EXISTS FTPT_Employees;")
            df_FTPT_Employees.to_sql('FTPT_Employees', conn, if_exists='replace', index='Region_BIG_ID')
            cursor.execute("DROP TABLE IF EXISTS All_Employees;")
            df_All_Employees.to_sql('All_Employees', conn, if_exists='replace', index='Region_BIG_ID')

            for row in cursor:
                print(row)

        except OperationalError as e:
            print("Error occurred during database loading:", str(e))

    except KeyError as e:
        print("Error occurred during DataFrame loading preparation:", str(e))

def clean_data(data_dict, cleaned=None):
    
    # Updated column names
    new_column_names = {
    'Table 4 - Regional level employment (thousands) by BIG (public/private sector split)': 'BIG_Name',
    'Unnamed: 1': 'FT_Public',
    'Unnamed: 2': 'FT_Private',
    'Unnamed: 3': 'FT_Pub_Priv',
    'Unnamed: 4': 'PT_Public',
    'Unnamed: 5': 'PT_Private',
    'Unnamed: 6': 'PT_Pub_Priv',
    'Unnamed: 7': 'FTPT_Public',
    'Unnamed: 8': 'FTPT_Private',
    'Unnamed: 9': 'FTPT_Pub_Priv',
    'Unnamed: 10': 'All_Public',
    'Unnamed: 11': 'All_Private',
    'Unnamed: 12': 'All_Pub_Priv'
    }

    # Updated column dtypes
    new_column_dtypes = {
    'BIG_Name': str,
    'FT_Public': float,
    'FT_Private': float,
    'FT_Pub_Priv': float,
    'PT_Public': float,
    'PT_Private': float,
    'PT_Pub_Priv': float,
    'FTPT_Public': float,
    'FTPT_Private': float,
    'FTPT_Pub_Priv': float,
    'All_Public': float,
    'All_Private': float,
    'All_Pub_Priv': float 
    }

    # Rows to be dropped
    redundant_rows = [0, 1, 2, 21, 22, 23, 24, 25, 26, 27, 28]

    if not cleaned:
      delete_meta_data(data_dict)

      regional_dfs = []

      for sheet, region in data_dict.items():
          assign_new_column_names(region, new_column_names, sheet)
          drop_redundant_columns(region)
          drop_redundant_rows(region, redundant_rows)
          commit_row_changes(region)
          process_non_numerical_values(region)
          update_column_dtypes(region, new_column_dtypes)
          regional_dfs.append(region)

      cleaned_regional_data = pd.concat(regional_dfs, ignore_index=True)
      generate_indentifiers(cleaned_regional_data)

      return cleaned_regional_data

def create_table(cursor, table_name, columns, primary_key=None, foreign_key=None, indexes=None):
    create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns}"

    if primary_key:
        create_table_query += f", PRIMARY KEY ({primary_key})"

    if foreign_key:
      for foreign_key in foreign_key:
        foreign_key_query = f"FOREIGN KEY ({foreign_key[0]}) REFERENCES {foreign_key[1]} ({foreign_key[0]})"
        create_table_query += f", {foreign_key_query}"

    create_table_query += ")"
    cursor.execute(create_table_query)

    print(create_table_query)

    if indexes:
        for index in indexes:
            index_query = f"CREATE INDEX IF NOT EXISTS idx_{table_name}_{index} ON {table_name} ({index})"
            cursor.execute(index_query)

def create_database(cursor):

    region_columns = ', '.join(['Region_ID int NOT NULL', 'Region_Name varchar(255) NOT NULL'])
    BIG_columns = ', '.join(['BIG_ID int NOT NULL', 'BIG_Name varchar(255) NOT NULL'])
    region_BIG_columns = ', '.join(['Region_BIG_ID int NOT NULL','Region_ID int NOT NULL', 'BIG_ID int NOT NULL'])
    PT_employees_columns = ', '.join(['Region_BIG_ID int NOT NULL','PT_Public float','PT_Private float', 'PT_Pub_Priv float'])
    FT_employees_columns = ', '.join(['Region_BIG_ID int NOT NULL','FT_Public float','FT_Private float', 'FT_Pub_Priv float'])
    FTPT_employees_columns = ', '.join(['Region_BIG_ID int NOT NULL','FTPT_Public float','FTPT_Private float', 'FTPT_Pub_Priv float'])
    All_employee_columns = ', '.join(['Region_BIG_ID int NOT NULL','All_Public float','All_Private float', 'All_Pub_Priv float'])

    # Create tables
    create_table(cursor, 'Region', region_columns, primary_key='Region_ID', foreign_key=None, indexes=['Region_Name'])
    create_table(cursor, 'BIG', BIG_columns, primary_key='BIG_ID', foreign_key=None, indexes=['BIG_Name'])
    create_table(cursor, 'Region_BIG', region_BIG_columns, primary_key='Region_BIG_ID', foreign_key=[['Region_ID', 'Region'],['BIG_ID', 'BIG']], indexes=None)
    create_table(cursor, 'FT_employees', FT_employees_columns, primary_key='Region_BIG_ID', foreign_key=[['Region_BIG_ID', 'Region_BIG']], indexes=None)
    create_table(cursor, 'PT_employees', PT_employees_columns, primary_key='Region_BIG_ID', foreign_key=[['Region_BIG_ID', 'Region_BIG']], indexes=None)
    create_table(cursor, 'FTPT_employees', FTPT_employees_columns, primary_key='Region_BIG_ID', foreign_key=[['Region_BIG_ID', 'Region_BIG']], indexes=None)
    create_table(cursor, 'All_employees', All_employee_columns, primary_key='Region_BIG_ID', foreign_key=[['Region_BIG_ID', 'Region_BIG']], indexes=None)

In [None]:
def main(cursor):

    # Office of National Statistics data sources
    employment_data_url = 'https://www.ons.gov.uk/file?uri=/employmentandlabourmarket/peopleinwork/employmentandemployeetypes/datasets/regionbybroadindustrygroupsicbusinessregisterandemploymentsurveybrestable4/2021provisional/table42021p.xlsx'
    jobs_data_url = 'https://www.ons.gov.uk/file?uri=/employmentandlabourmarket/peopleinwork/employmentandemployeetypes/datasets/workforcejobsbyindustryjobs02/current/jobs02mar2023.xls'
    earnings_data_url = 'https://www.ons.gov.uk/file?uri=/employmentandlabourmarket/peopleinwork/earningsandworkinghours/datasets/regionbyindustry2digitsicashetable5/2022provisional/ashetable52022provisional.zip'

    # Load employment data into DataFrame dictionary by region
    employment_data_dict = extract_excel_data(employment_data_url)

    # Set flag to track cleaning status
    cleaned_status = False

    # Clean data and flatten into a single DataFrame
    cleaned_regional_data = clean_data(employment_data_dict, cleaned=cleaned_status)

    # Create SQLite3 database
    create_database(cursor)

    # Split data into dataframe tables and load into relational tables
    load_into_database(cleaned_regional_data)

In [None]:
if __name__ == '__main__':

    # Open the database connection
    conn = sqlite3.connect('regional_UK_employment.db')
    cursor = conn.cursor()
    
    # Execute ETL process for regional UK employment data
    main(cursor)

    # Commit changes and close the connection
    conn.commit()
    conn.close()

In [None]:
# Establish test connection
conn = sqlite3.connect('regional_UK_employment.db')
cursor = conn.cursor()

# Perform test query to return total employment count by broad industry group (BIG)
cursor.execute('''
    SELECT b.BIG_Name, ROUND(SUM(a.All_Pub_Priv)) AS UK_Employee_Count
    FROM All_Employees a
    JOIN Region_BIG rb ON a.Region_BIG_ID = rb.Region_BIG_ID
    JOIN BIG b ON rb.BIG_ID = b.BIG_ID
    GROUP BY b.BIG_Name
    ORDER BY UK_Employee_Count DESC;
'''
)

for row in cursor:
  print(row)

# Commit changes and close the connection
conn.commit()
conn.close()