Please read the Read me PDF for overall undestaing of project

## Labor Analysis

In [None]:
#Enter finger check API keys, link of API documentation https://developer.fingercheck.com/api/help 

%env FINGERCHECK_APIKEY= #Input API key 
%env FINGERCHECK_CLIENT_SECRET= #Input API key


In [None]:
# Connecting to the fingercheck end point and pulling the data for the dates needed
# This end point contains most of the data needed for my analysis except the wages

import os
import requests
import pandas as pd
import time 
from datetime import datetime, timedelta


# Retrieve API keys from environment variables for security
APIKEY = os.getenv('FINGERCHECK_APIKEY')
CLIENT_SECRET_KEY = os.getenv('FINGERCHECK_CLIENT_SECRET')

# Function to make the API request and return a DataFrame
def get_all_time_cards_for_date_range(start_date, end_date):
    base_url = "https://developer.fingercheck.com/api"
    endpoint = "/v1/Reports/GetAllTimeCardsForDateRange"
    params = {
        "startDate": start_date,
        "endDate": end_date
    }
    headers = {
        "APIKEY": APIKEY,
        "ClientSecretKey": CLIENT_SECRET_KEY
    }

    try:
        response = requests.get(base_url + endpoint, params=params, headers=headers, timeout=60)
        response.raise_for_status()  # Raises an HTTPError if the HTTP request returned an unsuccessful status code
        data = response.json()
        df = pd.DataFrame(data)
        return df
    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
    except requests.exceptions.ConnectionError as conn_err:
        print(f"Error Connecting: {conn_err}")
    except requests.exceptions.Timeout as timeout_err:
        print(f"Timeout error: {timeout_err}")
    except requests.exceptions.RequestException as err:
        print(f"An error occurred: {err}")
    return None

# Function to retry the API request in case of failure
def get_all_time_cards_for_date_range_with_retry(start_date, end_date, retries=3, delay=5):
    for attempt in range(retries):
        result = get_all_time_cards_for_date_range(start_date, end_date)
        if isinstance(result, pd.DataFrame):
            return result
        else:
            print(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds...")
            time.sleep(delay)
    return None

# Set the start date and end date, with the end date being yesterday
start_date = "2023-12-10"
end_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')  # Yesterday's date in 'YYYY-MM-DD' format

# Make the API call with retries
data_df_LATEST = get_all_time_cards_for_date_range_with_retry(start_date, end_date)

# Check the result and print or handle accordingly
if isinstance(data_df_LATEST, pd.DataFrame):
    print(data_df_LATEST.head())
else:
    print("Failed to retrieve data after multiple attempts.")


In [None]:
# Running the API for wide range of dates will result in errors, and its unnecssary to extract the same data again and again
# so I extrated data from year 2020 to 2023 and saved it to the excel file
# which I will merge with the recent data pulled at my intended regular intervels

csv_file_path = #give path for the file

Old_data = pd.read_csv(csv_file_path)

In [None]:
fingercheck_raw_data = pd.concat([Old_data, data_df_LATEST], ignore_index=True)

In [None]:
# Requesting rates/hourly wage information from API, give the dates for data needed. 

import requests
import pandas as pd
from datetime import datetime, timedelta


# Calculate yesterday's date
end_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")


def get_all_time_cards_for_date_range(start_date, end_date):
    base_url = "https://developer.fingercheck.com/api"
    endpoint = "/v1/Reports/GetEmployeeActiveRates"
    params = {
        "startDate": start_date,
        "endDate": end_date
    }
    headers = {
        "APIKEY": APIKEY,
        "ClientSecretKey": CLIENT_SECRET_KEY
    }

    response = requests.get(base_url + endpoint, params=params, headers=headers)
   
    if response.status_code == 200:
        data = response.json()
        df = pd.DataFrame(data)
        return df
    else:
        return f"Error: {response.status_code}, {response.text}"

# Example usage:
rates_df = get_all_time_cards_for_date_range("2020-01-01", end_date)

In [None]:
# stripping all the data to lower case. 

fingercheck_data = fingercheck_raw_data.copy()
fingercheck_data = fingercheck_data.applymap(lambda x: x.lower() if isinstance(x, str) else x)

In [None]:
columns_to_keep = [
            "EmployeeID", "EmployeeNumber", 
            "Position", "FirstName", "MiddleInitial", 
            "LastName", "DateWorked", "Hours","JobDescriptionPunch",'CostCenter1', 'CostCenter2',
            'DivisionEmployeeStatus'
        ]

fingercheck_data = fingercheck_data[columns_to_keep]

fingercheck_hours = fingercheck_data.copy()

In [None]:
# Merging data pulled from two end points

merged_df = fingercheck_hours.merge(rates_df, on='EmployeeID', how='outer')

We recently started making more use of fingercheck, so we do not have wages and labor classification for the 
employees who are no longer working with us. And some of the projects are started years ago. 
So we took an avarage wage rate and filled in all the nan values. 

But for all the recent projects wage rates are accurate. 

In [None]:
merged_df['Rate'] = merged_df['Rate'].fillna(30)
merged_df['Position'] = merged_df['Position'].fillna('l')

In [None]:
# Convert the 'DateWorked' column to a datetime object
merged_df['DateWorked'] = pd.to_datetime(merged_df['DateWorked'])

# Create a function to calculate hours paid based on 'Position' and day of the week
def calculate_hours_paid(row):
    if row['Position'] == 'l':
        if row['DateWorked'].day_name() == 'Saturday':
            return 8  # 'l' gets paid for 8 hours on Saturday
        else:
            return 9  # 'l' gets paid for 9 hours from Monday to Friday
    else:
        return 8  # Default 8 hours for other positions

# Add a new 'HoursPaidFor' column based on the 'Position' and 'DateWorked' columns
merged_df['HoursPaidFor'] = merged_df.apply(calculate_hours_paid, axis=1)

In [None]:
# Convert the 'DateWorked' column to datetime
merged_df['DateWorked'] = pd.to_datetime(merged_df['DateWorked'])

# Identify rows that are Saturdays
merged_df['IsSaturday'] = merged_df['DateWorked'].dt.dayofweek == 5

# Update the rate for Saturdays
merged_df.loc[merged_df['IsSaturday'], 'Rate'] *= 1.5

In [None]:
# calculating labor cost
merged_df['LaborCost'] = merged_df['HoursPaidFor'] * merged_df['Rate']


In [None]:
import pandas as pd
import numpy as np

# Group by 'JobDescriptionPunch' to perform calculations for each project
grouped = merged_df.groupby('JobDescriptionPunch')

# Calculate M&F_Hours (sum of hours for masons and foremen)
mf_hours = grouped.apply(lambda x: x[x['Position'].isin(['m', 'f'])]['HoursPaidFor'].sum())

# Calculate Avg_M&F Rate (average rate for masons and foremen) and round to one decimal
avg_mf_rate = grouped.apply(lambda x: round(x[x['Position'].isin(['m', 'f'])]['Rate'].mean(), 1))

# Calculate L_Hours (sum of hours for labor)
l_hours = grouped.apply(lambda x: x[x['Position'] == 'l']['HoursPaidFor'].sum())

# Calculate Avg_L Rate (average rate for labor) and round to one decimal
avg_l_rate = grouped.apply(lambda x: round(x[x['Position'] == 'l']['Rate'].mean(), 1))

# Calculate M vs L Ratio and format to two decimals
m_vs_l_ratio = l_hours / mf_hours.replace({0: np.nan})  # Replace 0 with NaN to avoid division by zero
m_vs_l_ratio = m_vs_l_ratio.apply(lambda x: f'1:{x:.2f}' if pd.notna(x) else '1:∞')

# Calculate Total hours (sum of hours paid for each project)
total_hours = grouped['HoursPaidFor'].sum()

# Calculate Ave_Rate (average rate for each project) and round to one decimal
ave_rate = grouped['Rate'].mean().round(1)

# Calculate Total Labor Cost (sum of LaborCost for each project)
total_cost = grouped['LaborCost'].sum()

# Find the LastDateWorked for each project
last_date_worked = grouped['DateWorked'].max()

# Create the labor_analysis DataFrame
labor_analysis = pd.DataFrame({
    'Project Name': mf_hours.index,
    'M&F_Hours': mf_hours.values,
    'Avg_M&F Rate': avg_mf_rate.values,
    'L_Hours': l_hours.values,
    'Avg_L Rate': avg_l_rate.values,
    'M vs L Ratio': m_vs_l_ratio.values,
    'Total Hours': total_hours.values,
    'Ave_Rate': ave_rate.values,
    'Total Labor Cost': total_cost.values,
    'LastDateWorked': last_date_worked.values
})

# Reset index to make sure 'Project' is a column and not an index
labor_analysis.reset_index(drop=True, inplace=True)



# Material Analysis


In [None]:
# load downloaded CSV file from quickbooks

import pandas as pd
file_path = # file path

# Load the file into a DataFrame
inv = pd.read_csv(file_path)

inv = inv.applymap(lambda x: x.lower() if isinstance(x, str) else x)

In [None]:
# List of columns to keep
columns_to_keep = ['Trans #', 'Type', 'Date', 'Num', 'Name', 'Source Name', 'Account', 
                   'Billing Status', 'Split', 'Paid', 'Amount', 'Account Type']

# Select only the desired columns
Material_cost = inv[columns_to_keep]

In [None]:
# Identify rows with all NaN values
rows_with_all_nan = Material_cost.isna().all(axis=1)

# Check how many such rows exist
num_rows_with_all_nan = rows_with_all_nan.sum()
print(f"There are {num_rows_with_all_nan} rows with all NaN values.")


In [None]:
Material_cost = Material_cost.dropna(how='all')

In [None]:
# Group by 'Name' and aggregate the sum of 'Amount' and the max of 'Date'
grouped = Material_cost.groupby('Name').agg(MaterialCost=('Amount', 'sum'), Latest_Date=('Date', 'max'))

# Reset the index
Material_cost_byproject = grouped.reset_index()

# Rename the 'Name' column to 'Project Name'
Material_cost_byproject = Material_cost_byproject.rename(columns={'Name': 'Project Name'})

# Sort by 'Latest_Date' in descending order
Material_cost_byproject = Material_cost_byproject.sort_values(by='Latest_Date', ascending=False)

# Drop the 'Latest_Date' column if you don't need it in the final dataframe
Material_cost_byproject = Material_cost_byproject.drop(columns=['Latest_Date'])

In [None]:
# Merding labor and material data frames on project name as unique ID

Labor_Material = labor_analysis.merge(Material_cost_byproject, on='Project Name', how='outer')

In [None]:
Labor_Material.sort_values(by='LastDateWorked', ascending=False)

# Project Budget / Project Contract amount


In [None]:
# loading the Textura data 

file_path = # file path
# Load the CSV file into a DataFrame
Tex = pd.read_csv(file_path,header=1)

In [None]:
len(Tex['Subcontract Number'].unique())

In [None]:
len(Tex['Project Name'].unique())

In [None]:
# To find unique pairs of 'Subcontract Number' and 'Project Name'
unique_pairs = Tex[['Subcontract Number', 'Project Name']].drop_duplicates()

In [None]:
import pandas as pd

# Calculate the change orders and revised budget
Tex['Change Orders'] = Tex['Previously Approved Change Orders'] + Tex['Current Change Orders']
Tex['Revised Budget'] = Tex['Original Budget'] + Tex['Change Orders']

# Group by 'Project Name', 'Subcontract Number', and 'Draw Number', then aggregate the required columns
grouped = Tex.groupby(['Project Name', 'Subcontract Number', 'Draw Number']).agg({
    'Draw Date': 'first',
    'Subcontract Date': 'first',
    'Original Budget': 'sum',
    'Change Orders': 'sum',
    'Revised Budget': 'sum',
    'Total Work Completed and Material Stored to Date': 'sum',
}).reset_index()

# Get the index of the row with the maximum draw number for each subcontract within each project
latest_draw_indices = grouped.groupby(['Project Name', 'Subcontract Number'])['Draw Number'].idxmax()

# Filter the dataframe to only include rows with the latest draw number for each subcontract in each project
Textura_req = grouped.loc[latest_draw_indices]

# Convert the 'Draw Date' column to datetime format
Textura_req['Draw Date'] = pd.to_datetime(Textura_req['Draw Date'])

# Sort the dataframe by 'Project Name', 'Subcontract Number', and 'Draw Date'
Textura_req = Textura_req.sort_values(by=['Project Name', 'Subcontract Number', 'Draw Date'], ascending=[True, True, False])



In [None]:
# uploading manual requisition file

file_path = # file path

# Load the CSV file into a DataFrame
Manual = pd.read_excel(file_path,header=1)

Manual = Manual.drop([ 'Unnamed: 8', 'Unnamed: 9'], axis=1)
Manual

Adding data set from manual req and textura 

In [None]:
Textura = pd.concat([Textura_req, Manual], ignore_index=True)
Textura

In [None]:
Textura = Textura.applymap(lambda x: x.lower() if isinstance(x, str) else x)

In [None]:
# Merging material, labor and budget on project name

Labor_Material_Textura = Labor_Material.merge(Textura, on='Project Name', how='outer')

In [None]:
Labor_Material_Textura.sort_values(by='Draw Date', ascending=False)

In [None]:
Project_report = Labor_Material_Textura.copy()

In [None]:
# List of specified projects
specified_projects = [# filtering only the projects needed]

# Normalize case for comparison (optional, remove if exact match is needed)
specified_projects = [project.lower() for project in specified_projects]
Project_report['Project Name'] = Project_report['Project Name'].str.lower()

# Filter the DataFrame
Filtered_Project_report= Project_report[Project_report['Project Name'].isin(specified_projects)]

# Display the new DataFrame
Filtered_Project_report


In [None]:

# List of projects which have CCIP/OCIP insurence
ccip_ocip_projects = [#]

# Function to determine the insurance type
def determine_insurance_type(project_name):
    project_name = project_name.strip().lower()  # Normalize the string
    if project_name in ccip_ocip_projects:
        return 'CCIP/OCIP'
    else:
        return 'Traditional'

# Apply the function to create a new column
Filtered_Project_report['InsuranceType'] = Filtered_Project_report['Project Name'].apply(determine_insurance_type)

# Print the updated DataFrame
Filtered_Project_report


In [None]:

# Project_final = Filtered_Project_report.copy()
Project_final = Filtered_Project_report.copy()


# Calculate the percentage of project completed
Project_final["% completed to date"] = (Project_final['Total Work Completed and Material Stored to Date'] / Project_final['Revised Budget']) #* 100

# Calculating work mem comp
Project_final['Workmen comp 18% of labor'] = Project_final.apply(
    lambda row: 0.18 * row['Total Labor Cost']
                if row['InsuranceType'] == 'Traditional' else 0, 
    axis=1
)

# Calculating insurence
Project_final['Insurance-10% of work completed'] = Project_final.apply(
    lambda row: 0.10 * row['Total Work Completed and Material Stored to Date'] 
                if row['InsuranceType'] == 'Traditional' else 0, 
    axis=1
)

# Calculating overhead
Project_final['Overhead-10% of work completed'] = 0.10 * Project_final['Total Work Completed and Material Stored to Date']

# Calculate and update 'Profit' column
Project_final['Profit'] = (Project_final['Total Work Completed and Material Stored to Date'] 
                           - Project_final['Total Labor Cost']
                           - Project_final['MaterialCost']
                           - Project_final['Workmen comp 18% of labor']
                           - Project_final['Overhead-10% of work completed'] 
                           - Project_final['Insurance-10% of work completed'])

# Calculate and update 'Profit %' column
Project_final['Profit %'] = (Project_final['Profit'] / Project_final['Total Work Completed and Material Stored to Date']) #* 100
# Convert to string and append '%'
#Project_final['Profit %'] = Project_final['Profit %'].apply(lambda x: f'{x:.2f}%' if pd.notna(x) else x)



In [None]:
Project_final['Total Labor Cost for work completed %'] = (Project_final['Total Labor Cost'] / Project_final["Total Work Completed and Material Stored to Date"]) #* 100
# Convert to string and append '%'
#Project_final['Total Labor Cost for work completed %'] = Project_final['Total Labor Cost for work completed %'].apply(lambda x: f'{x:.2f}%' if pd.notna(x) else x)


Project_final['Total Material Cost for work completed %'] = (Project_final['MaterialCost'] / Project_final["Total Work Completed and Material Stored to Date"]) #* 100
# Convert to string and append '%'
#Project_final['Total Material Cost for work completed %'] = Project_final['Total Material Cost for work completed %'].apply(lambda x: f'{x:.2f}%' if pd.notna(x) else x)

In [None]:
# loading manual change order log 
csv_file_path = # file path

CO_log = pd.read_csv(csv_file_path)


In [None]:
# List of columns to keep
columns_to_keep1 = ['Project Name','Work completed-Unpaid CO Total', "Unpaid CO's amount last updated"]

# Select only the desired columns
CO_log = CO_log[columns_to_keep1]

In [None]:
Project_final = Project_final.merge(CO_log, on='Project Name', how='outer')

In [None]:

# Example: Remove common non-numeric characters (like $ and ,)
Project_final["Work completed-Unpaid CO Total"] = Project_final["Work completed-Unpaid CO Total"].replace('[\$,]', '', regex=True)

# Then convert to numeric
Project_final["Work completed-Unpaid CO Total"] = pd.to_numeric(Project_final["Work completed-Unpaid CO Total"], errors='coerce')

# Perform the addition
Project_final['Potential Profit including CO'] = Project_final['Profit'] + Project_final["Work completed-Unpaid CO Total"]

# Calculate 'Potential Profit including CO %'
Project_final['Potential Profit including CO %'] = (Project_final['Potential Profit including CO'] / (Project_final["Work completed-Unpaid CO Total"] + Project_final['Total Work Completed and Material Stored to Date'])) #* 100
# Convert to string and append '%'
#Project_final['Potential Profit including CO %'] = Project_final['Potential Profit including CO %'].apply(lambda x: f'{x:.2f}%' if pd.notna(x) else x)

In [None]:
Project_final

In [None]:

# Specify the path where you want to save the Excel file
file_path = # file path

# Save the dataframe to an Excel file
Project_final.to_excel(file_path, index=False)