In [1]:
import os
import time
import shutil
import pandas as pd
import numpy as np
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.edge.service import Service
from selenium.webdriver.edge.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC


**FUNCTIONS INITIALIZATION**

In [8]:
def setup_driver(download_dir):
    """
    Sets up an Edge WebDriver with a specified download directory.
    """
    edge_options = Options()
    prefs = {
        "download.default_directory": download_dir,  # Set custom download directory
        "download.prompt_for_download": False,       # Disable download prompts
        "safebrowsing.enabled": True                 # Disable Safe Browsing checks
    }
    edge_options.add_experimental_option("prefs", prefs)
    
    # Provide the path to your EdgeDriver executable
    service = Service(executable_path="msedgedriver.exe")
    driver = webdriver.Edge(service=service, options=edge_options)
    return driver

def preprocess_employee_data(data):
    """
    Reads and preprocesses employee data from an Excel file.

    :param file_path: Path to the Excel file
    :return: A pandas DataFrame with the processed data
    """
    df = pd.read_excel(data)
    df['Date Of Joining'] = pd.to_datetime(df['Date Of Joining']).dt.strftime('%Y-%m-%d')
    columns_to_convert = ['Gross Salary', 'Bonus / Commission', 'Increment', 
                        'Reimbursment Amount', 'Compensation', 'Adjustments', 
                        'Absents Deduction', 'Lates Deduction']
    for col in columns_to_convert:
        df[col] = df[col].astype(int)

    return df

def split_data_into_chunks(df, num_chunks):
    """
    Splits the DataFrame into specified number of chunks.

    :param df: Input DataFrame
    :param num_chunks: Number of chunks to split the data into
    :return: List of DataFrame chunks
    """
    chunks = np.array_split(df, num_chunks)
    for i in range(len(chunks)):
        chunks[i] = chunks[i].reset_index(drop=True)
    
    return chunks

# Function to clear input fields for the next iteration
def clear_form_fields(driver):
    """
    Clears all input fields on the form for earnings and deductions, 
    as well as the employee details section.
    """
    def wait_for_element_to_load(driver, xpath, timeout=10):
        """Wait for an element to load on the page."""
        return WebDriverWait(driver, timeout).until(
            EC.presence_of_element_located((By.XPATH, xpath))
        )

    # Clear employee details section
    employee_details_field = wait_for_element_to_load(driver, '/html/body/div[1]/main/div[2]/div/section[1]/div/div[1]/div[1]/div[4]/div[2]/textarea')
    employee_details_field.send_keys(Keys.CONTROL + "a")  # Select all text
    employee_details_field.send_keys(Keys.DELETE)        # Clear it
    employee_details_field.send_keys(Keys.TAB)
    
    # Clear earnings fields
    for i in range(6):
        time.sleep(0.2)
        xpath = f'//*[@id="earnings.{i}.amount"]'
        earnings_field = wait_for_element_to_load(driver, xpath)
        earnings_field.send_keys(Keys.CONTROL + "a")  # Select all text
        earnings_field.send_keys(Keys.DELETE)        # Clear it
        earnings_field.send_keys(Keys.TAB)

    # Clear deductions fields
    for i in range(4):
        time.sleep(0.2)
        xpath = f'//*[@id="deductions.{i}.amount"]'
        deductions_field = wait_for_element_to_load(driver, xpath)
        deductions_field.send_keys(Keys.CONTROL + "a")  # Select all text
        deductions_field.send_keys(Keys.DELETE)        # Clear it
        deductions_field.send_keys(Keys.TAB)

# Function to populate static details on the form
def populate_static_details(driver):
    """
    Populates static details such as employer details, currency, and adds additional earning/deduction fields.
    """
    def wait_for_element_to_load(driver, xpath, timeout=15):
        """Wait for an element to load on the page."""
        return WebDriverWait(driver, timeout).until(
            EC.presence_of_element_located((By.XPATH, xpath))
        )

    try:
        # Upload logo
        logo = os.path.abspath("Logo.jpg")
        logo_input = wait_for_element_to_load(driver, '//*[@id="logo"]')
        logo_input.send_keys(logo)

        # Input employer details
        employer_details = f"""
        Robust Support & Solutions
        Office No.404A, Fortune Tower
        PECHS Block 6, Karachi, Pakistan
        Phone & WhatsApp: 0311-3859635
        """
        employer_textarea = wait_for_element_to_load(driver, '/html/body/div[1]/main/div[2]/div/section[1]/div/div[1]/div[1]/div[4]/div[1]/textarea')
        employer_textarea.send_keys(employer_details)

        # Clear and populate other static fields
        static_fields_xpaths = [
            '//*[@id="serial"]', '//*[@id="employeeSign"]', 
            '//*[@id="employerSign"]', '//*[@id="currency"]'
        ]
        for xpath in static_fields_xpaths:
            field = wait_for_element_to_load(driver, xpath)
            field.send_keys(Keys.CONTROL + "a")  # Select all text
            field.send_keys(Keys.DELETE)        # Clear it
            if xpath == '//*[@id="currency"]':
                field.send_keys("PKR")
            field.send_keys(Keys.TAB)   
            time.sleep(0.2)

        # Add extra fields for earnings and deductions
        n = 4
        m = 11
        for i in range(4):
            time.sleep(0.5)
            add_earning_button = wait_for_element_to_load(driver, f'/html/body/div[1]/main/div[2]/div/section[1]/div/div[1]/div[2]/table/tbody/tr[{n}]/td/button')
            add_earning_button.click()
            n += 1

        for i in range(3):
            time.sleep(0.5)
            add_deduction_button = wait_for_element_to_load(driver, f'/html/body/div[1]/main/div[2]/div/section[1]/div/div[1]/div[2]/table/tbody/tr[{m}]/td/button')
            add_deduction_button.click()
            m += 1

        print("Static details populated successfully.")

    except Exception as e:
        print(f"Error populating static details: {e}")

def update_field_descriptions(driver, field_xpath, description, clear_existing=False):
    """
    Updates the description fields for earnings or deductions.

    :param field_xpath: XPath of the field to update
    :param description: Text to input in the field
    :param clear_existing: Whether to clear existing text before updating
    """
    def wait_for_element_to_be_interactable(driver, xpath, timeout=10):
        """Wait for an element to be present and visible."""
        return WebDriverWait(driver, timeout).until(
            EC.element_to_be_clickable((By.XPATH, xpath))
        )

    try:
        element = wait_for_element_to_be_interactable(driver, f'//*[@id="{field_xpath}"]')
        if clear_existing:
            element.send_keys(Keys.CONTROL + "a")  # Select all text
            element.send_keys(Keys.DELETE)        # Clear it
        element.send_keys(description)
        element.send_keys(Keys.TAB)
        time.sleep(0.2)  # Sleep to allow UI updates
    except StaleElementReferenceException as e:
        print(f"Stale element encountered for {field_xpath}. Retrying...")
        element = wait_for_element_to_be_interactable(driver, f'//*[@id="{field_xpath}"]')
        if clear_existing:
            element.send_keys(Keys.CONTROL + "a")
            element.send_keys(Keys.DELETE)
        element.send_keys(description)
        element.send_keys(Keys.TAB)
        time.sleep(0.2)
    except Exception as e:
        print(f"Error updating field {field_xpath}: {e}")

# Update descriptions for earnings and deductions
def update_description(driver):
    """
    Updates the descriptions for earnings and deductions fields.
    """
    earnings_descriptions = [
        ("earnings.0.description", "Gross Salary", True),  # Requires clearing
        ("earnings.1.description", "Bonus / Commission", True),  # Requires clearing
        ("earnings.2.description", "Reimbursement", False),
        ("earnings.3.description", "Increment", False),
        ("earnings.4.description", "Compensation", False),
        ("earnings.5.description", "Adjustments", False)
    ]
    deductions_descriptions = [
        ("deductions.0.description", "Absents", True),  # Requires clearing
        ("deductions.1.description", "Lates", False),
        ("deductions.2.description", "Payroll Tax", False),
        ("deductions.3.description", "Others", False)
    ]

    # Update earnings descriptions
    for field_xpath, description, clear in earnings_descriptions:
        update_field_descriptions(driver, field_xpath, description, clear_existing=clear)

    # Update deductions descriptions
    for field_xpath, description, clear in deductions_descriptions:
        update_field_descriptions(driver, field_xpath, description, clear_existing=clear)
        
    # Indicate that the descriptions have been successfully updated
    print("Updated descriptions populated successfully.")

# Function to input employee-specific details and generate payslips
def generate_payslips(driver,df,directory):
    """
    Iterates over the DataFrame to input employee details, generate payslips,
    and save the files with appropriate names.
    """
    def wait_for_element_to_load(driver, xpath, timeout=15):
        """Wait for an element to load on the page."""
        return WebDriverWait(driver, timeout).until(
            EC.presence_of_element_located((By.XPATH, xpath))
        )
        
    # Load the page and wait for the static details section
    driver.get('https://printyourcopy.com/free-payslip-generator')
    print("Loading the page...")
    
    # Populate static details
    try:
        wait_for_element_to_load(driver, '/html/body/div[1]/main/div[2]/div/section[1]/div', timeout=20)
        print("Page fully loaded.")
    except Exception as e:
        print(f"Error waiting for page to load: {e}")
        return
    
    # Populate static details
    try:
        populate_static_details(driver)
    except Exception as e:
        print(f"Error populating static details: {e}")
        return

    time.sleep(0.5)
    
    # Populate updated description
    try:
        update_description(driver)
    except Exception as e:
        print(f"Error populating updated description: {e}")
        return
    
    for i in range(df.shape[0]):
        # Populate employee details
        try:
            employee_details = f"""{df.loc[i, 'Full Name (As Per CNIC)']}
            {df.loc[i, 'CNIC Number']}
            {df.loc[i, 'Designation']}
            DOJ: {df.loc[i, 'Date Of Joining']}
            """
            textarea_xpath = '/html/body/div[1]/main/div[2]/div/section[1]/div/div[1]/div[1]/div[4]/div[2]/textarea'
            wait_for_element_to_load(driver, textarea_xpath)  # Wait for textarea to load
            driver.find_element(By.XPATH, textarea_xpath).send_keys(employee_details)
            print("Populated employee details successfully.")
        except Exception as e:
            print(f"Row {i} data: {df.loc[i]}")
            print(f"Error populating employee details: {e}")
            
        driver.find_element(By.XPATH, '//*[@id="earnings.0.amount"]').send_keys(str(df.loc[i, 'Gross Salary']))
        driver.find_element(By.XPATH, '//*[@id="earnings.0.amount"]').send_keys(Keys.TAB)  
        time.sleep(0.3)

        driver.find_element(By.XPATH, '//*[@id="earnings.1.amount"]').send_keys(str(df.loc[i, 'Bonus / Commission']))
        driver.find_element(By.XPATH, '//*[@id="earnings.1.amount"]').send_keys(Keys.TAB)  
        time.sleep(0.3)

        driver.find_element(By.XPATH, '//*[@id="earnings.2.amount"]').send_keys(str(df.loc[i, 'Reimbursment Amount']))
        driver.find_element(By.XPATH, '//*[@id="earnings.2.amount"]').send_keys(Keys.TAB) 
        time.sleep(0.3)

        driver.find_element(By.XPATH, '//*[@id="earnings.3.amount"]').send_keys(str(df.loc[i, 'Increment']))
        driver.find_element(By.XPATH, '//*[@id="earnings.3.amount"]').send_keys(Keys.TAB) 
        time.sleep(0.3)

        driver.find_element(By.XPATH, '//*[@id="earnings.4.amount"]').send_keys(str(df.loc[i, 'Compensation']))
        driver.find_element(By.XPATH, '//*[@id="earnings.4.amount"]').send_keys(Keys.TAB) 
        time.sleep(0.3)

        driver.find_element(By.XPATH, '//*[@id="earnings.5.amount"]').send_keys(str(df.loc[i, 'Adjustments']))
        driver.find_element(By.XPATH, '//*[@id="earnings.5.amount"]').send_keys(Keys.TAB) 
        time.sleep(0.3)

        driver.find_element(By.XPATH, '//*[@id="deductions.0.amount"]').send_keys(str(df.loc[i, 'Absents Deduction']))
        driver.find_element(By.XPATH, '//*[@id="deductions.0.amount"]').send_keys(Keys.TAB)  
        time.sleep(0.3)

        driver.find_element(By.XPATH, '//*[@id="deductions.1.amount"]').send_keys(str(df.loc[i, 'Lates Deduction']))
        driver.find_element(By.XPATH, '//*[@id="deductions.1.amount"]').send_keys(Keys.TAB)  
        time.sleep(0.3)

        driver.find_element(By.XPATH, '//*[@id="deductions.2.amount"]').send_keys(str(df.loc[i, 'Payroll Tax Deduction']))
        driver.find_element(By.XPATH, '//*[@id="deductions.2.amount"]').send_keys(Keys.TAB) 
        time.sleep(0.3)

        driver.find_element(By.XPATH, '//*[@id="deductions.3.amount"]').send_keys(str(df.loc[i, 'Other Deductions']))
        driver.find_element(By.XPATH, '//*[@id="deductions.3.amount"]').send_keys(Keys.TAB) 
        time.sleep(0.3)
            
        # Check if the file exists and remove it if it does
        try:
            original_file = os.path.join(directory, "Payslip.pdf")
            if os.path.exists(original_file):
                os.remove(original_file)
                print(f"Removed previous Payslip file!")
        except Exception as e:
            print(f"Error removing previous payslip: {e}")
            
        # Generate and rename the payslip
        try:
            # Click the button to generate the payslip
            driver.find_element(By.XPATH, '/html/body/div[1]/main/div[2]/div/section[1]/div/div[2]/button[2]').click()
            time.sleep(2)
            
            # Move and rename the file
            new_file = fr'Payslips\{df.loc[i, "Full Name (As Per CNIC)"]} - {df.loc[i, "Month"]}.pdf'
            shutil.move(original_file, new_file)
            print(f"File Directory change and Renamed: {original_file} -> {new_file}")
            
            # Clear form for the next employee      
            try:
                clear_form_fields(driver)
                time.sleep(0.3)
            except Exception as e:
                print(f"Error clearing form fields: {e}")
            
        except Exception as e:
            print(f"Error generating payslip for {df.loc[i, 'Full Name (As Per CNIC)']}: {e}")

**PARALLEL PROGRAMMING**

In [9]:
from concurrent.futures import ThreadPoolExecutor

df = preprocess_employee_data("Demo.xlsx")
num_chunks = 8
chunks = split_data_into_chunks(df, num_chunks)

# Record the start time
start_time = time.time()

drivers = {}  # Initialize an empty dictionary
for i in range(len(chunks)):
    # Create a directory for each instance
    chunk_dir = os.path.join(r"C:\Mushaf\PaySlip-Generator\Documents", f"Instance_{i+1}")
    os.makedirs(chunk_dir, exist_ok=True)

    # Initialize the Edge WebDriver with the custom download directory
    driver = setup_driver(chunk_dir)
    # Store the driver and its directory in the dictionary
    drivers[f"Instance_{i+1}"] = {
        "driver": driver,
        "directory": chunk_dir
    }

def generate_payslips_safe(instance_key, df_chunk):
    """Wrapper for generating payslips with error handling."""
    try:
        driver = drivers[instance_key]['driver']
        directory = drivers[instance_key]['directory']
        generate_payslips(driver, df_chunk, directory)
    except Exception as e:
        print(f"Error in {instance_key}: {e}")

# Simplified parallel execution
with ThreadPoolExecutor(max_workers=8) as executor:
    for i in range(8):
        instance_key = f'Instance_{i+1}'
        executor.submit(generate_payslips_safe, instance_key, chunks[i])
        
# Record the end time
end_time = time.time()

# Calculate the total duration for parralel execution
p_execution_duration = end_time - start_time

  return bound(*args, **kwds)


Loading the page...
Loading the page...
Loading the page...
Loading the page...
Page fully loaded.
Loading the page...
Loading the page...
Page fully loaded.
Page fully loaded.
Page fully loaded.
Page fully loaded.
Loading the page...
Page fully loaded.
Page fully loaded.
Loading the page...
Page fully loaded.
Static details populated successfully.
Static details populated successfully.
Static details populated successfully.
Static details populated successfully.
Static details populated successfully.
Static details populated successfully.
Static details populated successfully.Static details populated successfully.

Updated descriptions populated successfully.
Populated employee details successfully.
Updated descriptions populated successfully.
Populated employee details successfully.
Updated descriptions populated successfully.
Populated employee details successfully.
Updated descriptions populated successfully.
Updated descriptions populated successfully.
Updated descriptions populat

In [14]:
p_minutes = p_execution_duration // 60
p_seconds = p_execution_duration % 60

# Print the execution time in minutes and seconds format
print(f"Total execution time: {int(p_minutes)} minutes and {int(p_seconds):.0f} seconds")

Total execution time: 2 minutes and 27 seconds


**SERIAL PROGRAMMING**

In [17]:
from concurrent.futures import ThreadPoolExecutor

df = preprocess_employee_data("Demo.xlsx")

# Record the start time
start_time = time.time()
dir = r"C:\Mushaf\PaySlip-Generator\Documents\serial_instance"
driver = setup_driver(dir)
generate_payslips(driver, df, dir)

# Record the end time
end_time = time.time()

# Calculate the total duration for the serial execution
s_execution_duration = end_time - start_time

Loading the page...
Page fully loaded.
Static details populated successfully.
Updated descriptions populated successfully.
Populated employee details successfully.
File Directory change and Renamed: C:\Mushaf\PaySlip-Generator\Documents\serial_instance\Payslip.pdf -> Payslips\Muhammad Haris Dilshad - OCTOBER' 2024.pdf
Populated employee details successfully.
File Directory change and Renamed: C:\Mushaf\PaySlip-Generator\Documents\serial_instance\Payslip.pdf -> Payslips\Fahad Bin Zahid - OCTOBER' 2024.pdf
Populated employee details successfully.
File Directory change and Renamed: C:\Mushaf\PaySlip-Generator\Documents\serial_instance\Payslip.pdf -> Payslips\Ibrahim Najmuddin - OCTOBER' 2024.pdf
Populated employee details successfully.
File Directory change and Renamed: C:\Mushaf\PaySlip-Generator\Documents\serial_instance\Payslip.pdf -> Payslips\Zuhaib Ghori - OCTOBER' 2024.pdf
Populated employee details successfully.
File Directory change and Renamed: C:\Mushaf\PaySlip-Generator\Documen

In [18]:
s_minutes = s_execution_duration // 60
s_seconds = s_execution_duration % 60

# Print the execution time in minutes and seconds format
print(f"Total serial execution time: {int(s_minutes)} minutes and {int(s_seconds):.0f} seconds")

Total serial execution time: 12 minutes and 21 seconds


 - 4.4x speedup in parallel programming
 - 340% faster than in serial programming