### HR Data Pipeline Automation

#### Objective: 
To design, build, and automate a data pipeline that collects HR data from multiple simulated sources, cleanses it, and loads it into a data warehouse (Google BigQuery) for analysis and reporting. 

#### Import library 

In [1]:
import pandas as pd
from google.cloud import bigquery
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Get the IDs from the environment variables
PROJECT_ID = os.getenv('GCP_PROJECT_ID')
DATASET_ID = os.getenv('BIGQUERY_DATASET_ID')
TABLE_ID = os.getenv('BIGQUERY_TABLE_ID')

# Set the project ID for the BigQuery client
os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID

#### Extract

In [2]:
def extract_data():
    """Reads data from CSV files and returns pandas DataFrames."""
    print("Extracting data from CSV files...")
    ats_df = pd.read_csv('ats_data.csv')
    hris_df = pd.read_csv('hris_data.csv')
    survey_df = pd.read_csv('survey_data.csv')
    print("Data extraction complete.")
    return ats_df, hris_df, survey_df

#### Transform

In [3]:
def transform_data(ats_df, hris_df, survey_df):
    """Cleans, transforms, and merges the datasets."""
    print("Transforming data...")

    # Data Cleaning and Type Conversion 
    # Convert date columns to datetime objects for accurate calculations
    ats_df['application_date'] = pd.to_datetime(ats_df['application_date'])
    ats_df['hired_date'] = pd.to_datetime(ats_df['hired_date'])
    hris_df['start_date'] = pd.to_datetime(hris_df['start_date'])
    survey_df['survey_date'] = pd.to_datetime(survey_df['survey_date'])

    # Feature Engineering 
    # 1. Calculate time-to-hire from ATS data for successful hires
    ats_hired_df = ats_df[ats_df['hiring_outcome'] == 'Hired'].copy()
    ats_hired_df['time_to_hire_days'] = (ats_hired_df['hired_date'] - ats_hired_df['application_date']).dt.days

    # 2. Calculate employee tenure from HRIS data
    hris_df['tenure_days'] = (pd.to_datetime('today') - hris_df['start_date']).dt.days

    # Data Merging 
    # Merge HRIS and Survey data on employee_id
    unified_df = pd.merge(hris_df, survey_df, on='employee_id', how='left')
    
    # Now, join the HRIS and ATS data using the shared employee_id
    unified_df = pd.merge(unified_df, ats_hired_df[['candidate_id', 'hiring_outcome', 'time_to_hire_days']].rename(columns={'candidate_id': 'employee_id'}), on='employee_id', how='left')

    # Handle missing values after merge
    unified_df['engagement_score'].fillna(-1, inplace=True)
    unified_df['time_to_hire_days'].fillna(-1, inplace=True)
    unified_df['hiring_outcome'].fillna('Not Applicable', inplace=True)
    
    print("Data transformation complete. Unified dataset shape:", unified_df.shape)
    return unified_df

#### Load

In [4]:
def load_data(df, dataset_id, table_id):
    """Loads the DataFrame into a Google BigQuery table."""
    print(f"Loading data into BigQuery table {dataset_id}.{table_id}...")
    client = bigquery.Client()

    # Configure the table schema to ensure correct data types in BigQuery
    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE") # WRITE_TRUNCATE overwrites the table

    # Load the DataFrame to BigQuery
    job = client.load_table_from_dataframe(df, f"{dataset_id}.{table_id}", job_config=job_config)
    job.result()  # Wait for the job to complete
    print("Data loaded successfully into BigQuery.")

# --- Main Execution Block ---
if __name__ == '__main__':
    ats_data, hris_data, survey_data = extract_data()
    unified_data = transform_data(ats_data, hris_data, survey_data)

    # Pass the variables to the load function
    load_data(unified_data, DATASET_ID, TABLE_ID)
    print("\nETL Pipeline execution finished successfully!")

Extracting data from CSV files...
Data extraction complete.
Transforming data...


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  unified_df['engagement_score'].fillna(-1, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  unified_df['time_to_hire_days'].fillna(-1, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we

Data transformation complete. Unified dataset shape: (250, 12)
Loading data into BigQuery table people_analytics.hr_unified_data...




Data loaded successfully into BigQuery.

ETL Pipeline execution finished successfully!
