In [1]:
import os

# Define the directory path
directory_path = './Data'

# Check if the directory exists
if not os.path.exists(directory_path):
    # If not, create it
    os.makedirs(directory_path)
    print(f"Directory {directory_path} created.")
else:
    print(f"Directory {directory_path} already exists.")


Directory ./Data created.


In [46]:
import pandas as pd

dz = pd.read_csv(r'.\Data\nycpayroll_2020.csv')

In [47]:
dz.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 19 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   FiscalYear             100 non-null    int64  
 1   PayrollNumber          100 non-null    int64  
 2   AgencyID               100 non-null    int64  
 3   AgencyName             100 non-null    object 
 4   EmployeeID             100 non-null    int64  
 5   LastName               100 non-null    object 
 6   FirstName              100 non-null    object 
 7   AgencyStartDate        100 non-null    object 
 8   WorkLocationBorough    100 non-null    object 
 9   TitleCode              100 non-null    int64  
 10  TitleDescription       100 non-null    object 
 11  LeaveStatusasofJune30  100 non-null    object 
 12  BaseSalary             100 non-null    float64
 13  PayBasis               100 non-null    object 
 14  RegularHours           100 non-null    float64
 15  Regular

In [44]:
import os
import psycopg2
import pandas as pd
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

class CSVLoader:
    def __init__(self):
        # Establish a connection to the PostgreSQL database
        self.conn = psycopg2.connect(
            host=os.getenv('PG_HOST'),
            dbname=os.getenv('PG_DB'),
            user=os.getenv('PG_USER'),
            password=os.getenv('PG_PASSWORD')
        )
        self.cursor = self.conn.cursor()

    def create_staging_tables(self):
        # SQL to create staging tables if they don't exist
        create_tables_queries = {
                        'payroll': '''  
                CREATE SCHEMA IF NOT EXISTS payroll;

                        ''',
            'staging_nycpayroll': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_nycpayroll (
                    ID SERIAL PRIMARY KEY,
                                        fiscalyear INTEGER,
                                        PayrollNumber VARCHAR(255),
                                        AgencyID VARCHAR(255),
                                        AgencyName VARCHAR(255),
                                        EmployeeID VARCHAR(255),
                                        LastName VARCHAR(255),
                                        FirstName VARCHAR(255),
                                        AgencyStartDate DATE,
                                        WorkLocationBorough     VARCHAR(255),
                                        TitleCode               VARCHAR(255),
                                        TitleDescription        VARCHAR(255),
                                        LeaveStatusasofJune30   VARCHAR(255),
                                        BaseSalary              FLOAT,   
                                        PayBasis                VARCHAR(255),
                                        RegularHours            INTEGER,
                                        RegularGrossPaid        FLOAT,
                                        OTHours                 FLOAT,
                                        TotalOTPaid             FLOAT,
                                        TotalOtherPay           FLOAT
                );
            ''',
            'staging_empmaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_empmaster (
                    EmployeeID VARCHAR(255),
                                            LastName VARCHAR(255),
                                            FirstName VARCHAR(255),
                                            -- Add additional columns as needed
                                            PRIMARY KEY (EmployeeID)
                );
            ''',
            'staging_agencymaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_agencymaster (
                    AgencyID VARCHAR(255),
                                            AgencyName VARCHAR(255),
                                            AgencyAddress VARCHAR(255),
                                                -- Add additional columns as needed
                                            PRIMARY KEY (AgencyID)
                );
            ''',
            'staging_titlemaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_titlemaster (
                    TitleCode VARCHAR(255),
                                            TitleDescription VARCHAR(255),
                                        -- Add additional columns as needed
                                            PRIMARY KEY (TitleCode)
                );
            '''
        }
        
        for table_name, create_query in create_tables_queries.items():
            try:
                self.cursor.execute(create_query)
                self.conn.commit()
                print(f"Table {table_name} created successfully.")
            except Exception as e:
                print(f"Error creating table {table_name}: {e}")

    def load_csv_to_table(self, csv_file, table_name):
        try:
            # Read the CSV file into a DataFrame
            df = pd.read_csv(csv_file)

            # Generate the SQL for inserting data
            columns = ', '.join(df.columns)
            values = ', '.join(['%s'] * len(df.columns))
            insert_query = f'INSERT INTO payroll.{table_name} ({columns}) VALUES ({values})'

            # Insert DataFrame rows into the table
            for row in df.itertuples(index=False, name=None):
                self.cursor.execute(insert_query, row)
            
            self.conn.commit()
            print(f"Data from {csv_file} loaded into {table_name} successfully.")
        
        except Exception as e:
            print(f"Error loading data from {csv_file} into {table_name}: {e}")

    def close_connection(self):
        # Close the cursor and the connection
        self.cursor.close()
        self.conn.close()

# Main script execution
if __name__ == '__main__':
    loader = CSVLoader()
    
    # Step 1: Create the staging tables
    loader.create_staging_tables()
    
    # Step 2: Load each CSV file into its corresponding staging table
    csv_to_table_mapping = {
        r'.\Data\nycpayroll_2020.csv': 'staging_nycpayroll',
        r'.\Data\EmpMaster.csv': 'staging_empmaster',
        r'.\Data\AgencyMaster.csv': 'staging_agencymaster',
        r'.\Data\TitleMaster.csv': 'staging_titlemaster'
    }
    
    for csv_file, table_name in csv_to_table_mapping.items():
        loader.load_csv_to_table(csv_file, table_name)
    
    # Step 3: Close the database connection
    loader.close_connection()


Table payroll created successfully.
Table staging_nycpayroll created successfully.
Table staging_empmaster created successfully.
Table staging_agencymaster created successfully.
Table staging_titlemaster created successfully.
Data from .\Data\nycpayroll_2020.csv loaded into staging_nycpayroll successfully.
Data from .\Data\EmpMaster.csv loaded into staging_empmaster successfully.
Data from .\Data\AgencyMaster.csv loaded into staging_agencymaster successfully.
Data from .\Data\TitleMaster.csv loaded into staging_titlemaster successfully.


In [17]:
import os
import psycopg2
import pandas as pd
from psycopg2 import sql
from dotenv import load_dotenv
import json

# Load environment variables from .env file
load_dotenv()

class DataQuarantine:
    def __init__(self):
        self.conn = psycopg2.connect(
            host=os.getenv('PG_HOST'),
            dbname=os.getenv('PG_DB'),
            user=os.getenv('PG_USER'),
            password=os.getenv('PG_PASSWORD')
        )
        self.cursor = self.conn.cursor()

    def create_quarantine_table(self):
        create_table_query = '''
        CREATE TABLE IF NOT EXISTS payroll.quarantine_data (
            quarantine_id SERIAL PRIMARY KEY,
            source_table_name VARCHAR(255) NOT NULL,
            original_data JSONB NOT NULL,
            rule_id INT,
            error_message TEXT,
            quarantined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        '''
        try:
            self.cursor.execute(create_table_query)
            self.conn.commit()
            print("Quarantine table created successfully.")
        except Exception as e:
            print(f"Error creating quarantine table: {e}")

    def create_staging_tables(self):
        # SQL to create staging tables if they don't exist
        create_tables_queries = {
                        'payroll': '''  
                CREATE SCHEMA IF NOT EXISTS payroll;

                        ''',
            'staging_nycpayroll': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_nycpayroll (
                    ID SERIAL PRIMARY KEY,
                                        fiscalyear INTEGER,
                                        PayrollNumber VARCHAR(255),
                                        AgencyID VARCHAR(255),
                                        AgencyName VARCHAR(255),
                                        EmployeeID VARCHAR(255),
                                        LastName VARCHAR(255),
                                        FirstName VARCHAR(255),
                                        AgencyStartDate DATE,
                                        WorkLocationBorough     VARCHAR(255),
                                        TitleCode               VARCHAR(255),
                                        TitleDescription        VARCHAR(255),
                                        LeaveStatusasofJune30   VARCHAR(255),
                                        BaseSalary              FLOAT,   
                                        PayBasis                VARCHAR(255),
                                        RegularHours            INTEGER,
                                        RegularGrossPaid        FLOAT,
                                        OTHours                 FLOAT,
                                        TotalOTPaid             FLOAT,
                                        TotalOtherPay           FLOAT
                );
            ''',
            'staging_empmaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_empmaster (
                    EmployeeID VARCHAR(255),
                                            LastName VARCHAR(255),
                                            FirstName VARCHAR(255),
                                            -- Add additional columns as needed
                                            PRIMARY KEY (EmployeeID)
                );
            ''',
            'staging_agencymaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_agencymaster (
                    AgencyID VARCHAR(255),
                                            AgencyName VARCHAR(255),
                                            AgencyAddress VARCHAR(255),
                                                -- Add additional columns as needed
                                            PRIMARY KEY (AgencyID)
                );
            ''',
            'staging_titlemaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_titlemaster (
                    TitleCode VARCHAR(255),
                                            TitleDescription VARCHAR(255),
                                        -- Add additional columns as needed
                                            PRIMARY KEY (TitleCode)
                );
            '''
        }
        for table_name, create_query in create_tables_queries.items():
            try:
                self.cursor.execute(create_query)
                self.conn.commit()
                print(f"Table {table_name} created successfully.")
            except Exception as e:
                print(f"Error creating table {table_name}: {e}")

    def load_and_validate_data(self, csv_file, table_name, validation_rules):
        try:
            df = pd.read_csv(csv_file)
            for index, row in df.iterrows():
                is_valid = True
                for rule_id, field_name, validation_logic, error_message in validation_rules:
                    try:
                        if not self.validate_data(row[field_name], validation_logic):
                            is_valid = False
                            self.quarantine_data(table_name, row.to_dict(), rule_id, error_message)
                            break
                    except KeyError:
                        print(f"Column {field_name} not found in CSV")
                        is_valid = False
                        break
                
                if is_valid:
                    self.insert_valid_data(table_name, row)
        
        except Exception as e:
            print(f"Error processing file {csv_file}: {e}")

    def validate_data(self, field_value, validation_logic):
        try:
            if validation_logic == "IS NULL":
                return field_value is None
            elif validation_logic == "< 0":
                return float(field_value) < 0
            else:
                print(f"Unknown validation logic: {validation_logic}")
                return False
        except ValueError as e:
            print(f"Validation error: {e}")
            return False

    def quarantine_data(self, table_name, record, rule_id, error_message):
        insert_query = '''
        INSERT INTO payroll.quarantine_data (source_table_name, original_data, rule_id, error_message)
        VALUES (%s, %s, %s, %s);
        '''
        try:
            self.cursor.execute(insert_query, (table_name, json.dumps(record), rule_id, error_message))
            self.conn.commit()
            print(f"Data quarantined from table {table_name}.")
        except Exception as e:
            print(f"Error quarantining data: {e}")

    def insert_valid_data(self, table_name, record):
        columns = record.index
        values = [record[col] for col in columns]
        insert_query = sql.SQL('''
        INSERT INTO {} ({})
        VALUES ({});
        ''').format(
            sql.Identifier('payroll', table_name),
            sql.SQL(', ').join(map(sql.Identifier, columns)),
            sql.SQL(', ').join(sql.Placeholder() * len(columns))
        )
        try:
            self.cursor.execute(insert_query, values)
            self.conn.commit()
            print(f"Valid data inserted into table {table_name}.")
        except Exception as e:
            print(f"Error inserting data into table {table_name}: {e}")

    def close_connection(self):
        self.cursor.close()
        self.conn.close()

if __name__ == '__main__':
    dq = DataQuarantine()
    
    # Step 1: Create quarantine and staging tables
    dq.create_quarantine_table()
    dq.create_staging_tables()
    
    # Define validation rules (examples)
    validation_rules = [
        ('payroll_schema.staging_nycpayroll', 'fiscalyear', 'NEW.fiscalyear IS NULL', 'FiscalYear cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'PayrollNumber', 'NEW.PayrollNumber IS NULL', 'PayrollNumber cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'AgencyID', 'NEW.AgencyID IS NULL', 'AgencyID cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'AgencyName', 'NEW.AgencyName IS NULL', 'AgencyName cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'EmployeeID', 'NEW.EmployeeID IS NULL', 'EmployeeID cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'LastName', 'NEW.LastName IS NULL', 'LastName cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'FirstName', 'NEW.FirstName IS NULL', 'FirstName cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'AgencyStartDate', 'NEW.AgencyStartDate IS NULL', 'AgencyStartDate cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'WorkLocationBorough', 'NEW.WorkLocationBorough IS NULL', 'WorkLocationBorough cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'TitleCode', 'NEW.TitleCode IS NULL', 'TitleCode cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'TitleDescription', 'NEW.TitleDescription IS NULL', 'TitleDescription cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'LeaveStatusasofJune30', 'NEW.LeaveStatusasofJune30 IS NULL', 'LeaveStatusasofJune30 cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'BaseSalary', 'NEW.BaseSalary IS NULL', 'BaseSalaryr cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'PayBasis', 'NEW.PayBasis IS NULL', 'PayBasis cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'RegularHours', 'NEW.RegularHours IS NULL', 'RegularHours cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'RegularGrossPaid', 'NEW.RegularGrossPaid IS NULL', 'RegularGrossPaid cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'OTHours', 'NEW.OTHours IS NULL', 'OTHours cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'TotalOTPaid', 'NEW.TotalOTPaid IS NULL', 'TotalOTPaid cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'TotalOtherPay', 'NEW.TotalOtherPay IS NULL', 'TotalOtherPay cannot be null'),
            ('payroll_schema.staging_nycpayroll', 'FiscalYear', 'NEW.FiscalYear < 0', 'FiscalYear must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'PayrollNumber', 'NEW.PayrollNumber < 0', 'PayrollNumber must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'AgencyID', 'NEW.AgencyID < 0', 'AgencyID must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'AgencyName', 'NEW.AgencyName < 0', 'AgencyName must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'EmployeeID', 'NEW.EmployeeID < 0', 'EmployeeID must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'LastName', 'NEW.LastName < 0', 'LastName must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'FirstName', 'NEW.FirstName < 0', 'FirstName must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'AgencyStartDate', 'NEW.AgencyStartDate < 0', 'AgencyStartDate must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'WorkLocationBorough', 'NEW.WorkLocationBorough < 0', 'WorkLocationBorough must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'TitleCode', 'NEW.TitleCode < 0', 'TitleCode must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'TitleDescription', 'NEW.TitleDescription < 0', 'TitleDescription must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'LeaveStatusasofJune30', 'NEW.LeaveStatusasofJune30 < 0', 'LeaveStatusasofJune30 must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'BaseSalary', 'NEW.BaseSalary < 0', 'BaseSalary must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'PayBasis', 'NEW.PayBasis < 0', 'PayBasis must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'RegularHours', 'NEW.RegularHours < 0', 'RegularHours must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'RegularGrossPaid', 'NEW.RegularGrossPaid < 0', 'RegularGrossPaid must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'OTHours', 'NEW.OTHours < 0', 'OTHours must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'TotalOTPaid', 'NEW.TotalOTPaid < 0', 'TotalOTPaid must be non-negative'),
            ('payroll_schema.staging_nycpayroll', 'TotalOtherPay', 'NEW.TotalOtherPay < 0', 'TotalOtherPay must be non-negative'),
            ('payroll_schema.staging_empmaster', 'EmployeeID', 'NEW.EmployeeID < 0', 'EmployeeID must be non-negative'),
            ('payroll_schema.staging_empmaster', 'EmployeeID', 'NEW.EmployeeID IS NULL', 'EmployeeID cannot be null'),
            ('payroll_schema.staging_empmaster', 'LastName', 'NEW.LastName < 0', 'LastName must be non-negative'),
            ('payroll_schema.staging_empmaster', 'LastName', 'NEW.LastName IS NULL', 'LastName cannot be null'),
            ('payroll_schema.staging_empmaster', 'FirstName', 'NEW.FirstName < 0', 'FirstName must be non-negative'),
            ('payroll_schema.staging_empmaster', 'FirstName', 'NEW.FirstName  IS NULL', 'FirstName cannot be null'),
            ('payroll_schema.staging_agencymaster', 'AgencyID', 'NEW.AgencyID < 0', 'AgencyID must be non-negative'),
            ('payroll_schema.staging_agencymaster', 'AgencyID', 'NEW.AgencyID  IS NULL', 'AgencyID cannot be null'),
            ('payroll_schema.staging_agencymaster', 'AgencyName', 'NEW.AgencyName < 0', 'AgencyName must be non-negative'),
            ('payroll_schema.staging_agencymaster', 'AgencyName', 'NEW.AgencyName IS NULL', 'AgencyName cannot be null'),
            ('payroll_schema.staging_titlemaster', 'TitleCode', 'NEW.TitleCode < 0', 'TitleCode must be non-negative'),
            ('payroll_schema.staging_titlemaster', 'TitleCode', 'NEW.TitleCode IS NULL', 'TitleCode cannot be null'),
            ('payroll_schema.staging_titlemaster', 'TitleDescription', 'NEW.TitleDescription < 0', 'TitleDescription must be non-negative'),
            ('payroll_schema.staging_titlemaster', 'TitleDescription', 'NEW.TitleDescription  IS NULL', 'TitleDescription cannot be null')
        # Add more rules for other fields and tables as needed
        ]

    
    
    # Step 2: Load and validate data for each CSV
    csv_to_table_mapping = {
        r'.\Data\nycpayroll_2020.csv': 'staging_nycpayroll',
        r'.\Data\EmpMaster.csv': 'staging_empmaster',
        r'.\Data\AgencyMaster.csv': 'staging_agencymaster',
        r'.\Data\TitleMaster.csv': 'staging_titlemaster'
    }

    for csv_file, table_name in csv_to_table_mapping.items():
        dq.load_and_validate_data(csv_file, table_name, validation_rules)
    
    # Step 3: Close the database connection
    dq.close_connection()


Quarantine table created successfully.
Table payroll created successfully.
Table staging_nycpayroll created successfully.
Table staging_empmaster created successfully.
Table staging_agencymaster created successfully.
Table staging_titlemaster created successfully.
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV
Column fiscalyear not found in CSV


In [19]:
import os
import psycopg2
import pandas as pd
from psycopg2 import sql
from dotenv import load_dotenv
import json

# Load environment variables from .env file
load_dotenv()

class DataQuarantine:
    def __init__(self):
        self.conn = psycopg2.connect(
            host=os.getenv('PG_HOST'),
            dbname=os.getenv('PG_DB'),
            user=os.getenv('PG_USER'),
            password=os.getenv('PG_PASSWORD')
        )
        self.cursor = self.conn.cursor()

    def create_quarantine_table(self):
        create_table_query = '''
        CREATE TABLE IF NOT EXISTS payroll.quarantine_data (
            quarantine_id SERIAL PRIMARY KEY,
            source_table_name VARCHAR(255) NOT NULL,
            original_data JSONB NOT NULL,
            rule_id INT,
            error_message TEXT,
            quarantined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        '''
        try:
            self.cursor.execute(create_table_query)
            self.conn.commit()
            print("Quarantine table created successfully.")
        except Exception as e:
            print(f"Error creating quarantine table: {e}")

    def create_staging_tables(self):
        create_tables_queries = {
                        'payroll': '''  
                CREATE SCHEMA IF NOT EXISTS payroll;

                        ''',
            'staging_nycpayroll': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_nycpayroll (
                    ID SERIAL PRIMARY KEY,
                                        fiscalyear INTEGER,
                                        PayrollNumber VARCHAR(255),
                                        AgencyID VARCHAR(255),
                                        AgencyName VARCHAR(255),
                                        EmployeeID VARCHAR(255),
                                        LastName VARCHAR(255),
                                        FirstName VARCHAR(255),
                                        AgencyStartDate DATE,
                                        WorkLocationBorough     VARCHAR(255),
                                        TitleCode               VARCHAR(255),
                                        TitleDescription        VARCHAR(255),
                                        LeaveStatusasofJune30   VARCHAR(255),
                                        BaseSalary              FLOAT,   
                                        PayBasis                VARCHAR(255),
                                        RegularHours            INTEGER,
                                        RegularGrossPaid        FLOAT,
                                        OTHours                 FLOAT,
                                        TotalOTPaid             FLOAT,
                                        TotalOtherPay           FLOAT
                );
            ''',
            'staging_empmaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_empmaster (
                    EmployeeID VARCHAR(255),
                                            LastName VARCHAR(255),
                                            FirstName VARCHAR(255),
                                            -- Add additional columns as needed
                                            PRIMARY KEY (EmployeeID)
                );
            ''',
            'staging_agencymaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_agencymaster (
                    AgencyID VARCHAR(255),
                                            AgencyName VARCHAR(255),
                                            AgencyAddress VARCHAR(255),
                                                -- Add additional columns as needed
                                            PRIMARY KEY (AgencyID)
                );
            ''',
            'staging_titlemaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_titlemaster (
                    TitleCode VARCHAR(255),
                                            TitleDescription VARCHAR(255),
                                        -- Add additional columns as needed
                                            PRIMARY KEY (TitleCode)
                );
            '''
        }

        for table_name, create_query in create_tables_queries.items():
            try:
                self.cursor.execute(create_query)
                self.conn.commit()
                print(f"Table {table_name} created successfully.")
            except Exception as e:
                print(f"Error creating table {table_name}: {e}")

    def load_and_validate_data(self, csv_file, table_name, validation_rules):
        try:
            df = pd.read_csv(csv_file)
            columns = df.columns.tolist()

            for index, row in df.iterrows():
                is_valid = True

                for rule_id, field_name, validation_logic, error_message in validation_rules:
                    if field_name in columns:
                        try:
                            if not self.validate_data(row[field_name], validation_logic):
                                is_valid = False
                                self.quarantine_data(table_name, row.to_dict(), rule_id, error_message)
                                break
                        except KeyError:
                            print(f"Column {field_name} not found in CSV")
                            is_valid = False
                            break
                    else:
                        print(f"Skipping validation for {field_name} as it is not present in CSV")
                
                if is_valid:
                    self.insert_valid_data(table_name, row)
        
        except Exception as e:
            print(f"Error processing file {csv_file}: {e}")

    def validate_data(self, field_value, validation_logic):
        try:
            if validation_logic == "IS NULL":
                return pd.isna(field_value)
            elif validation_logic == "< 0":
                return float(field_value) < 0
            else:
                print(f"Unknown validation logic: {validation_logic}")
                return False
        except ValueError as e:
            print(f"Validation error: {e}")
            return False

    def quarantine_data(self, table_name, record, rule_id, error_message):
        insert_query = '''
        INSERT INTO payroll.quarantine_data (source_table_name, original_data, rule_id, error_message)
        VALUES (%s, %s, %s, %s);
        '''
        try:
            self.cursor.execute(insert_query, (table_name, json.dumps(record), rule_id, error_message))
            self.conn.commit()
            print(f"Data quarantined from table {table_name}.")
        except Exception as e:
            print(f"Error quarantining data: {e}")

    def insert_valid_data(self, table_name, record):
        columns = record.index
        values = [record[col] for col in columns]
        insert_query = sql.SQL('''
        INSERT INTO {} ({})
        VALUES ({});
        ''').format(
            sql.Identifier('payroll', table_name),
            sql.SQL(', ').join(map(sql.Identifier, columns)),
            sql.SQL(', ').join(sql.Placeholder() * len(columns))
        )
        try:
            self.cursor.execute(insert_query, values)
            self.conn.commit()
            print(f"Valid data inserted into table {table_name}.")
        except Exception as e:
            print(f"Error inserting data into table {table_name}: {e}")

    def close_connection(self):
        self.cursor.close()
        self.conn.close()

if __name__ == '__main__':
    dq = DataQuarantine()
    
    # Step 1: Create quarantine and staging tables
    dq.create_quarantine_table()
    dq.create_staging_tables()
    
    # Define validation rules (examples)
    validation_rules = [
        (1, 'FiscalYear', 'IS NULL', 'FiscalYear cannot be null'),
        (2, 'PayrollNumber', 'IS NULL', 'PayrollNumber cannot be null'),
        (3, 'AgencyID', 'IS NULL', 'AgencyID cannot be null'),
        (4, 'EmployeeID', 'IS NULL', 'EmployeeID cannot be null'),
        (5, 'TitleCode', '< 0', 'TitleCode must be non-negative')
    ]
    
    # Step 2: Load and validate data for each CSV
    csv_to_table_mapping = {
        r'.\Data\nycpayroll_2020.csv': 'staging_nycpayroll',
        r'.\Data\EmpMaster.csv': 'staging_empmaster',
        r'.\Data\AgencyMaster.csv': 'staging_agencymaster',
        r'.\Data\TitleMaster.csv': 'staging_titlemaster'
    }

    for csv_file, table_name in csv_to_table_mapping.items():
        dq.load_and_validate_data(csv_file, table_name, validation_rules)
    
    # Step 3: Close the database connection
    dq.close_connection()


Quarantine table created successfully.
Table payroll created successfully.
Table staging_nycpayroll created successfully.
Table staging_empmaster created successfully.
Table staging_agencymaster created successfully.
Table staging_titlemaster created successfully.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantined from table staging_nycpayroll.
Data quarantine

In [23]:
import os
import psycopg2
import pandas as pd
from psycopg2 import sql
from dotenv import load_dotenv
import json

# Load environment variables from .env file
load_dotenv()

class DataQuarantine:
    def __init__(self):
        self.conn = psycopg2.connect(
            host=os.getenv('pg_host'),
            dbname=os.getenv('pg_dbname'),
            user=os.getenv('pg_user'),
            password=os.getenv('pg_password')
        )
        self.cursor = self.conn.cursor()

    def create_quarantine_table(self):
        create_table_query = '''
        CREATE TABLE IF NOT EXISTS payroll.quarantine_data (
            quarantine_id SERIAL PRIMARY KEY,
            source_table_name VARCHAR(255) NOT NULL,
            original_data JSONB NOT NULL,
            rule_id INT,
            error_message TEXT,
            quarantined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        '''
        try:
            self.cursor.execute(create_table_query)
            self.conn.commit()
            print("Quarantine table created successfully.")
        except Exception as e:
            print(f"Error creating quarantine table: {e}")
            self.conn.rollback()

    def create_staging_tables(self):
        create_tables_queries = {
                        'payroll': '''  
                CREATE SCHEMA IF NOT EXISTS payroll;

                        ''',
            'staging_nycpayroll': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_nycpayroll (
                    ID SERIAL PRIMARY KEY,
                                        fiscalyear INTEGER,
                                        PayrollNumber VARCHAR(255),
                                        AgencyID VARCHAR(255),
                                        AgencyName VARCHAR(255),
                                        EmployeeID VARCHAR(255),
                                        LastName VARCHAR(255),
                                        FirstName VARCHAR(255),
                                        AgencyStartDate DATE,
                                        WorkLocationBorough     VARCHAR(255),
                                        TitleCode               VARCHAR(255),
                                        TitleDescription        VARCHAR(255),
                                        LeaveStatusasofJune30   VARCHAR(255),
                                        BaseSalary              FLOAT,   
                                        PayBasis                VARCHAR(255),
                                        RegularHours            INTEGER,
                                        RegularGrossPaid        FLOAT,
                                        OTHours                 FLOAT,
                                        TotalOTPaid             FLOAT,
                                        TotalOtherPay           FLOAT
                );
            ''',
            'staging_empmaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_empmaster (
                    EmployeeID VARCHAR(255),
                                            LastName VARCHAR(255),
                                            FirstName VARCHAR(255),
                                            -- Add additional columns as needed
                                            PRIMARY KEY (EmployeeID)
                );
            ''',
            'staging_agencymaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_agencymaster (
                    AgencyID VARCHAR(255),
                                            AgencyName VARCHAR(255),
                                            AgencyAddress VARCHAR(255),
                                                -- Add additional columns as needed
                                            PRIMARY KEY (AgencyID)
                );
            ''',
            'staging_titlemaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_titlemaster (
                    TitleCode VARCHAR(255),
                                            TitleDescription VARCHAR(255),
                                        -- Add additional columns as needed
                                            PRIMARY KEY (TitleCode)
                );
            '''
        }

        for table_name, create_query in create_tables_queries.items():
            try:
                self.cursor.execute(create_query)
                self.conn.commit()
                print(f"Table {table_name} created successfully.")
            except Exception as e:
                print(f"Error creating table {table_name}: {e}")
                self.conn.rollback()

    def load_and_validate_data(self, csv_file, table_name, validation_rules):
        try:
            df = pd.read_csv(csv_file)
            columns = df.columns.tolist()

            for index, row in df.iterrows():
                is_valid = True

                for rule_id, field_name, validation_logic, error_message in validation_rules:
                    if field_name in columns:
                        try:
                            if not self.validate_data(row[field_name], validation_logic):
                                is_valid = False
                                self.quarantine_data(table_name, row.to_dict(), rule_id, error_message)
                                break
                        except KeyError:
                            print(f"Column {field_name} not found in CSV")
                            is_valid = False
                            break
                    else:
                        print(f"Skipping validation for {field_name} as it is not present in CSV")
                
                if is_valid:
                    self.insert_valid_data(table_name, row)
        
        except Exception as e:
            print(f"Error processing file {csv_file}: {e}")
            self.conn.rollback()

    def validate_data(self, field_value, validation_logic):
        try:
            if validation_logic == "IS NULL":
                return pd.isna(field_value)
            elif validation_logic == "< 0":
                return float(field_value) < 0
            else:
                print(f"Unknown validation logic: {validation_logic}")
                return False
        except ValueError as e:
            print(f"Validation error: {e}")
            return False

    def quarantine_data(self, table_name, record, rule_id, error_message):
        try:
            insert_query = '''
            INSERT INTO payroll.quarantine_data (source_table_name, original_data, rule_id, error_message)
            VALUES (%s, %s, %s, %s);
            '''
            self.cursor.execute(insert_query, (table_name, json.dumps(record), rule_id, error_message))
            self.conn.commit()
            print(f"Data quarantined from table {table_name}.")
        except Exception as e:
            print(f"Error quarantining data: {e}")
            self.conn.rollback()

    def insert_valid_data(self, table_name, record):
        columns = record.index
        values = [record[col] for col in columns]
        try:
            insert_query = sql.SQL('''
            INSERT INTO {} ({})
            VALUES ({});
            ''').format(
                sql.Identifier('payroll', table_name),
                sql.SQL(', ').join(map(sql.Identifier, columns)),
                sql.SQL(', ').join(sql.Placeholder() * len(columns))
            )
            self.cursor.execute(insert_query, values)
            self.conn.commit()
            print(f"Valid data inserted into table {table_name}.")
        except Exception as e:
            print(f"Error inserting data into table {table_name}: {e}")
            self.conn.rollback()

    def close_connection(self):
        self.cursor.close()
        self.conn.close()

if __name__ == '__main__':
    dq = DataQuarantine()
    
    # Step 1: Create quarantine and staging tables
    dq.create_quarantine_table()
    dq.create_staging_tables()
    
    # Define validation rules (examples)
    validation_rules = [
        (1, 'FiscalYear', 'IS NULL', 'FiscalYear cannot be null'),
        (2, 'PayrollNumber', 'IS NULL', 'PayrollNumber cannot be null'),
        (3, 'AgencyID', 'IS NULL', 'AgencyID cannot be null'),
        (4, 'EmployeeID', 'IS NULL', 'EmployeeID cannot be null'),
        (5, 'TitleCode', '< 0', 'TitleCode must be non-negative')
    ]
    
    # Step 2: Load and validate data for each CSV
    csv_to_table_mapping = {
        r'.\Data\nycpayroll_2020.csv': 'payroll.staging_nycpayroll',
        r'.\Data\EmpMaster.csv': 'payroll.staging_empmaster',
        r'.\Data\AgencyMaster.csv': 'payroll.staging_agencymaster',
        r'.\Data\TitleMaster.csv': 'payroll.staging_titlemaster'
    }

    for csv_file, table_name in csv_to_table_mapping.items():
        dq.load_and_validate_data(csv_file, table_name, validation_rules)
    
    # Step 3: Close the database connection
    dq.close_connection()


Quarantine table created successfully.
Table payroll created successfully.
Table staging_nycpayroll created successfully.
Table staging_empmaster created successfully.
Table staging_agencymaster created successfully.
Table staging_titlemaster created successfully.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data quarantined from table payroll.staging_nycpayroll.
Data qu

In [25]:
import os
import psycopg2
import pandas as pd
from psycopg2 import sql
from dotenv import load_dotenv
import json

# Load environment variables from .env file
load_dotenv()

class DataLoader:
    def __init__(self):
        self.conn = psycopg2.connect(
            host=os.getenv('PG_HOST'),
            dbname=os.getenv('PG_DB'),
            user=os.getenv('PG_USER'),
            password=os.getenv('PG_PASSWORD')
        )
        self.cursor = self.conn.cursor()

    def create_staging_tables(self):
        create_tables_queries = {
                        'payroll': '''  
                CREATE SCHEMA IF NOT EXISTS payroll;

                        ''',
            'staging_nycpayroll': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_nycpayroll (
                    ID SERIAL PRIMARY KEY,
                                        fiscalyear INTEGER,
                                        PayrollNumber VARCHAR(255),
                                        AgencyID VARCHAR(255),
                                        AgencyName VARCHAR(255),
                                        EmployeeID VARCHAR(255),
                                        LastName VARCHAR(255),
                                        FirstName VARCHAR(255),
                                        AgencyStartDate DATE,
                                        WorkLocationBorough     VARCHAR(255),
                                        TitleCode               VARCHAR(255),
                                        TitleDescription        VARCHAR(255),
                                        LeaveStatusasofJune30   VARCHAR(255),
                                        BaseSalary              FLOAT,   
                                        PayBasis                VARCHAR(255),
                                        RegularHours            INTEGER,
                                        RegularGrossPaid        FLOAT,
                                        OTHours                 FLOAT,
                                        TotalOTPaid             FLOAT,
                                        TotalOtherPay           FLOAT
                );
            ''',
            'staging_empmaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_empmaster (
                    EmployeeID VARCHAR(255),
                                            LastName VARCHAR(255),
                                            FirstName VARCHAR(255),
                                            -- Add additional columns as needed
                                            PRIMARY KEY (EmployeeID)
                );
            ''',
            'staging_agencymaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_agencymaster (
                    AgencyID VARCHAR(255),
                                            AgencyName VARCHAR(255),
                                            AgencyAddress VARCHAR(255),
                                                -- Add additional columns as needed
                                            PRIMARY KEY (AgencyID)
                );
            ''',
            'staging_titlemaster': '''
                CREATE TABLE IF NOT EXISTS payroll.staging_titlemaster (
                    TitleCode VARCHAR(255),
                                            TitleDescription VARCHAR(255),
                                        -- Add additional columns as needed
                                            PRIMARY KEY (TitleCode)
                );
            '''
        }
        for table_name, create_query in create_tables_queries.items():
            try:
                self.cursor.execute(create_query)
                self.conn.commit()
                print(f"Table {table_name} created successfully.")
            except Exception as e:
                print(f"Error creating table {table_name}: {e}")
                self.conn.rollback()

    def load_data_into_table(self, csv_file, table_name):
        try:
            df = pd.read_csv(csv_file)
            columns = df.columns.tolist()

            insert_query = sql.SQL('''
                INSERT INTO {} ({})
                VALUES ({});
            ''').format(
                sql.Identifier('payroll', table_name),
                sql.SQL(', ').join(map(sql.Identifier, columns)),
                sql.SQL(', ').join(sql.Placeholder() * len(columns))
            )

            for index, row in df.iterrows():
                try:
                    self.cursor.execute(insert_query, tuple(row))
                except Exception as e:
                    print(f"Error inserting data into table {table_name}: {e}")
                    self.conn.rollback()
                    continue

            self.conn.commit()
            print(f"Data from {csv_file} loaded into {table_name} successfully.")

        except Exception as e:
            print(f"Error loading data from {csv_file}: {e}")
            self.conn.rollback()

    def close_connection(self):
        self.cursor.close()
        self.conn.close()

if __name__ == '__main__':
    loader = DataLoader()

    # Step 1: Create staging tables
    loader.create_staging_tables()

    # Step 2: Load data into tables
    csv_to_table_mapping = {
        r'.\Data\nycpayroll_2020.csv': 'staging_nycpayroll',
        r'.\Data\EmpMaster.csv': 'staging_empmaster',
        r'.\Data\AgencyMaster.csv': 'staging_agencymaster',
        r'.\Data\TitleMaster.csv': 'staging_titlemaster'
    }

    for csv_file, table_name in csv_to_table_mapping.items():
        loader.load_data_into_table(csv_file, table_name)

    # Step 3: Close the database connection
    loader.close_connection()


Table payroll created successfully.
Table staging_nycpayroll created successfully.
Table staging_empmaster created successfully.
Table staging_agencymaster created successfully.
Table staging_titlemaster created successfully.
Error inserting data into table staging_nycpayroll: column "FiscalYear" of relation "staging_nycpayroll" does not exist
LINE 2: ...      INSERT INTO "payroll"."staging_nycpayroll" ("FiscalYea...
                                                             ^

Error inserting data into table staging_nycpayroll: column "FiscalYear" of relation "staging_nycpayroll" does not exist
LINE 2: ...      INSERT INTO "payroll"."staging_nycpayroll" ("FiscalYea...
                                                             ^

Error inserting data into table staging_nycpayroll: column "FiscalYear" of relation "staging_nycpayroll" does not exist
LINE 2: ...      INSERT INTO "payroll"."staging_nycpayroll" ("FiscalYea...
                                                             