# DATA ENGINEERING 

In [14]:
%pip install psycopg2 pandas openpyxl python-dotenv

Note: you may need to restart the kernel to use updated packages.


## RAW LAYER

### Read the tables from the source DB and store it in raw layer

In [15]:
import psycopg2
import pandas as pd
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Fetching the credentials from the environment variables
dbname = os.getenv('DB_NAME')
user = os.getenv('DB_USER')
password = os.getenv('DB_PASSWORD')
host = os.getenv('DB_HOST')
port = os.getenv('DB_PORT')

try:
    # Connect to your PostgreSQL database
    connection = psycopg2.connect(
        dbname=dbname,
        user=user,
        password=password,
        host=host,
        port=port
    )

    # Create a cursor object
    cursor = connection.cursor()

    cursor.execute("""
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = 'public';
    """)

    tables = cursor.fetchall()

    output_dir = './1.Raw'  # Adjusted output directory to 'data'
    os.makedirs(output_dir, exist_ok=True)

    for table in tables:
        table_name = table[0]
        if table_name == 'User_Accounts':
            query = f'SELECT employee_id,email,role FROM public."{table_name}"'
        else:
            query = f'SELECT * FROM public."{table_name}"'
        df = pd.read_sql(query, connection)

        df = df.astype(str)  # Convert all columns to string

        # Save each table as a separate CSV file
        csv_file = os.path.join(output_dir, f"{table_name}.csv")
        df.to_csv(csv_file, index=False)  # Saving as CSV
        print(f"Saved: {csv_file}")

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    # Close the cursor and connection
    if cursor:
        cursor.close()
    if connection:
        connection.close()

print(f"Data has been saved to {output_dir} as separate CSV files.")


Saved: ./1.Raw\_prisma_migrations.csv
Saved: ./1.Raw\Employees.csv
Saved: ./1.Raw\Courses.csv
Saved: ./1.Raw\Designation_Courses.csv
Saved: ./1.Raw\Course_Performances.csv
Saved: ./1.Raw\Project_Performances.csv
Saved: ./1.Raw\Resignation_Records.csv
Saved: ./1.Raw\User_Accounts.csv
Data has been saved to ./1.Raw as separate CSV files.


  df = pd.read_sql(query, connection)
  df = pd.read_sql(query, connection)
  df = pd.read_sql(query, connection)
  df = pd.read_sql(query, connection)
  df = pd.read_sql(query, connection)
  df = pd.read_sql(query, connection)
  df = pd.read_sql(query, connection)
  df = pd.read_sql(query, connection)


# Prep Layer

### Read the Data from Raw layer

In [16]:
import pandas as pd
input_dir = './1.Raw'

dataframes = []

course_performances = pd.read_csv(f'{input_dir}/Course_Performances.csv')
courses = pd.read_csv(f'{input_dir}/Courses.csv')
designation_courses = pd.read_csv(f'{input_dir}/Designation_Courses.csv')
employees = pd.read_csv(f'{input_dir}/Employees.csv')
project_performance = pd.read_csv(f'{input_dir}/Project_Performances.csv')
resignation_records = pd.read_csv(f'{input_dir}/Resignation_Records.csv')
user_account = pd.read_csv(f'{input_dir}/User_Accounts.csv')    

### Basic cleaning

In [17]:
import pandas as pd
import os

input_dir = './1.Raw'

# Load CSV files into DataFrames
course_performances = pd.read_csv(f'{input_dir}/Course_Performances.csv')
courses = pd.read_csv(f'{input_dir}/Courses.csv')
designation_courses = pd.read_csv(f'{input_dir}/Designation_Courses.csv')
employees = pd.read_csv(f'{input_dir}/Employees.csv')
project_performance = pd.read_csv(f'{input_dir}/Project_Performances.csv')
resignation_records = pd.read_csv(f'{input_dir}/Resignation_Records.csv')
user_account = pd.read_csv(f'{input_dir}/User_Accounts.csv')   

# Function to perform basic data cleaning
def clean_dataframe(df):
    # Check for missing values
    print("Missing values before cleaning:")
    print(df.isnull().sum())
    
    # Remove duplicates
    df.drop_duplicates(inplace=True)

    # Rename columns (optional, based on your needs)
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_')

    # Trim whitespaces from string columns
    string_cols = df.select_dtypes(include=[object]).columns
    df[string_cols] = df[string_cols].apply(lambda x: x.str.strip())

    # Check for missing values after cleaning
    print("Missing values after cleaning:")
    print(df.isnull().sum())
    
    return df

# Clean each DataFrame
course_performances_stage = clean_dataframe(course_performances)
courses_stage = clean_dataframe(courses)
designation_courses_stage = clean_dataframe(designation_courses)
employees_stage = clean_dataframe(employees)
project_performance_stage = clean_dataframe(project_performance)
resignation_records_stage = clean_dataframe(resignation_records)
user_account_stage = clean_dataframe(user_account)


Missing values before cleaning:
id                 0
employee_id        0
course_id          0
course_status      0
score              0
completion_date    0
dtype: int64
Missing values after cleaning:
id                 0
employee_id        0
course_id          0
course_status      0
score              0
completion_date    0
dtype: int64
Missing values before cleaning:
id                    0
course_name           0
course_description    0
duration_hours        0
dtype: int64
Missing values after cleaning:
id                    0
course_name           0
course_description    0
duration_hours        0
dtype: int64
Missing values before cleaning:
designation_type    0
course_id           0
dtype: int64
Missing values after cleaning:
designation_type    0
course_id           0
dtype: int64
Missing values before cleaning:
id                   0
first_name           0
last_name            0
department           0
designation_type     0
hire_date            0
employment_status    0
dtype: i

### Cleaning Course Performance

In [18]:
course_performances_stage['score'] = course_performances_stage['score'].astype(int)
course_performances_stage['completion_date'] = pd.to_datetime(course_performances_stage['completion_date'], errors='coerce')

print(course_performances_stage.dtypes)

# output_dir = './2.Staging'
# os.makedirs(output_dir, exist_ok=True)


# print("Course Performances has been saved to CSV.")

id                          int64
employee_id                 int64
course_id                   int64
course_status              object
score                       int64
completion_date    datetime64[ns]
dtype: object


### Cleaning Course

In [19]:
courses_stage['duration_hours'] = courses_stage['duration_hours'].astype(int)
# courses_stage['completion_date'] = pd.to_datetime(courses_stage['completion_date'], errors='coerce')

print(courses_stage.dtypes)

# output_dir = './2.Staging'
# os.makedirs(output_dir, exist_ok=True)


# print("Cleaned DataFrame has been saved to CSV.")

id                     int64
course_name           object
course_description    object
duration_hours         int64
dtype: object


### Cleaning Designation Courses

In [20]:
designation_courses_stage['course_id'] = designation_courses_stage['course_id'].astype(int)

print(designation_courses_stage.dtypes)

# output_dir = './2.Staging'
# os.makedirs(output_dir, exist_ok=True)


# print("Cleaned DataFrame has been saved to CSV.")

designation_type    object
course_id            int64
dtype: object


### Cleaning Employees and User Account

In [21]:
# clean employees
employees_stage['id'] = employees_stage['id'].astype(int)
employees_stage['hire_date'] = pd.to_datetime(employees_stage['hire_date'], errors='coerce')

# clean user_account
user_account_stage['employee_id'] = user_account_stage['employee_id'].astype(int)

# merge the tables
employee_details_Stage = pd.merge(employees_stage, user_account_stage, left_on="id", right_on="employee_id", how="inner")

print(employee_details_Stage.dtypes)


id                            int64
first_name                   object
last_name                    object
department                   object
designation_type             object
hire_date            datetime64[ns]
employment_status            object
employee_id                   int64
email                        object
role                         object
dtype: object


### Cleaning Project Performance

In [22]:
project_performance_stage = project_performance_stage.astype(int) 

print(project_performance_stage.dtypes)

# output_dir = './2.Staging'
# os.makedirs(output_dir, exist_ok=True)


# print("Course Performances has been saved to CSV.")

id                           int64
employee_id                  int64
project_id                   int64
engagement_score             int64
teamwork_score               int64
punctuality_score            int64
overall_performance_score    int64
dtype: object


### Cleaning Resignation Records

In [23]:
resignation_records_stage['id'] = resignation_records_stage['id'].astype(int)
resignation_records_stage['employee_id'] = resignation_records_stage['employee_id'].astype(int)
resignation_records_stage['resignation_date'] = pd.to_datetime(resignation_records_stage['resignation_date'], errors='coerce')

print(resignation_records_stage.dtypes)

# output_dir = './2.Staging'
# os.makedirs(output_dir, exist_ok=True)


# print("Course Performances has been saved to CSV.")

id                           int64
employee_id                  int64
resignation_date    datetime64[ns]
reason                      object
dtype: object


### Save the Cleaned data in Staging Layer

In [24]:
output_dir = './2.Prep'

os.makedirs(output_dir, exist_ok=True)

course_performances_stage.to_csv(f'{output_dir}/Course_Performances.csv', index=False)
courses_stage.to_csv(f'{output_dir}/Courses.csv', index=False)
designation_courses_stage.to_csv(f'{output_dir}/Designation_Courses.csv', index=False)
employees_stage.to_csv(f'{output_dir}/Employees_Details.csv', index=False)
project_performance_stage.to_csv(f'{output_dir}/Project_Performances.csv', index=False)
resignation_records_stage.to_csv(f'{output_dir}/Resignation_Records.csv', index=False)
user_account_stage.to_csv(f'{output_dir}/User_Account.csv', index=False)

print("Course Performances has been saved to CSV.")

Course Performances has been saved to CSV.


# Report Layer

### Designation Count by Course and Status

In [30]:
def designation_count_by_course_and_status(course_performances_stage, employees_stage):
    employee_with_designation = course_performances_stage.merge(
        employees_stage[['id', 'designation_type']],
        left_on='employee_id',
        right_on='id'
    )
    
    designation_count = employee_with_designation.groupby(
        ['course_id', 'designation_type', 'course_status']
    ).agg(employees_count=('employee_id', 'count')).reset_index()

    return designation_count.sort_values(['course_id', 'designation_type'])

# Example usage with DataFrames
designation_count = designation_count_by_course_and_status(course_performances_stage, employees_stage)

designation_count.head()


Unnamed: 0,course_id,designation_type,course_status,employees_count
0,1,A,completed,11
1,1,A,failed,10
2,1,A,incomplete,2
3,2,D,completed,9
4,2,D,failed,9


### Course Status Count by Course

In [31]:
def course_status_count_by_course(course_performances_stage, courses_stage):
    course_count = course_performances_stage.merge(
        courses_stage[['id', 'course_name', 'course_description', 'duration_hours']],
        left_on='course_id',
        right_on='id'
    )
    
    course_status_count = course_count.groupby(
        ['course_id', 'course_name', 'course_description', 'duration_hours', 'course_status']
    ).agg(status_count=('course_status', 'count')).reset_index()

    return course_status_count.sort_values(['course_id', 'status_count'], ascending=[True, False])

course_status_count = course_status_count_by_course(course_performances_stage, courses_stage)

course_status_count.head()


Unnamed: 0,course_id,course_name,course_description,duration_hours,course_status,status_count
0,1,Course 1,Pm major always man open speech seat. Camera i...,26,completed,11
1,1,Course 1,Pm major always man open speech seat. Camera i...,26,failed,10
2,1,Course 1,Pm major always man open speech seat. Camera i...,26,incomplete,2
3,2,Course 2,Environment charge technology several data dre...,15,completed,9
4,2,Course 2,Environment charge technology several data dre...,15,failed,9


###  Top Scorer and Average Score by Designation

In [32]:
def top_scorer_and_avg_by_designation(course_performances_stage, employees_stage, courses_stage):
    # Average score by course_id, designation_type
    completed_courses = course_performances_stage[course_performances_stage['course_status'] == 'completed']
    
    employee_with_designation = completed_courses.merge(
        employees_stage[['id', 'designation_type']],
        left_on='employee_id',
        right_on='id'
    )
    
    average_score = employee_with_designation.groupby(
        ['course_id', 'designation_type', 'course_status']
    ).agg(avg_score=('score', 'mean')).reset_index()

    # Row numbers by score descending
    employee_with_scores = completed_courses.merge(
        employees_stage[['id', 'first_name', 'last_name', 'designation_type']],
        left_on='employee_id',
        right_on='id'
    )
    
    employee_with_scores['row_number'] = employee_with_scores.groupby(
        ['course_id', 'designation_type']
    )['score'].rank(method='first', ascending=False)

    top_score = employee_with_scores[employee_with_scores['row_number'] == 1]

    # Final result by merging top scorer and average score
    final_table = top_score.merge(
        average_score[['course_id', 'designation_type', 'avg_score']],
        on=['course_id', 'designation_type'],
        how='left'
    ).merge(
        courses_stage[['id', 'course_name']],
        left_on='course_id',
        right_on='id',
        how='left'
    )

    final_table['Department_topper'] = final_table['first_name'] + ' ' + final_table['last_name']
    
    final_columns = ['course_id', 'course_name', 'designation_type', 'Department_topper', 'score', 'avg_score']
    return final_table[final_columns].rename(columns={'designation_type': 'Department', 'score': 'top_score'})

top_scorer_avg = top_scorer_and_avg_by_designation(course_performances_stage, employees_stage, courses_stage)

top_scorer_avg.head()


Unnamed: 0,course_id,course_name,Department,Department_topper,top_score,avg_score
0,5,Course 5,D,Lisa Moses,99,81.5
1,11,Course 11,A,Kristin Lowery,90,72.333333
2,8,Course 8,C,Arthur Fitzpatrick,100,68.5
3,9,Course 9,C,Arthur Fitzpatrick,94,68.857143
4,10,Course 10,C,Steven Hart,98,74.857143


### Fact and Dimension Table

In [33]:
import pandas as pd

# Create Dimension Tables
def create_dimension_tables(employees_stage, courses_stage, designation_courses_stage, resignation_records_stage, user_account_stage):
    # Dim_Employee Table
    dim_employee = employees_stage[['id', 'first_name', 'last_name', 'department', 'designation_type', 'hire_date', 'employment_status']].copy()
    dim_employee.rename(columns={'id': 'employee_id'}, inplace=True)
    
    # Dim_Course Table
    dim_course = courses_stage[['id', 'course_name', 'course_description', 'duration_hours']].copy()
    dim_course.rename(columns={'id': 'course_id'}, inplace=True)
    
    # Dim_Designation_Course Table
    dim_designation_course = designation_courses_stage.copy()

    # Dim_Resignation_Record Table
    dim_resignation_record = resignation_records_stage.copy()

    # Dim_User_Account Table
    dim_user_account = user_account_stage[['employee_id', 'email', 'role']].copy()
    
    return dim_employee, dim_course, dim_designation_course, dim_resignation_record, dim_user_account

# Create Fact Table (Combining course performances and project performances)
def create_fact_table(course_performances_stage, project_performance_stage):
    # Fact_Performance Table
    fact_performance = course_performances_stage.merge(
        project_performance_stage[['employee_id', 'project_id', 'engagement_score', 'teamwork_score', 'punctuality_score', 'overall_performance_score']],
        on='employee_id', 
        how='left'
    )
    
    return fact_performance

# Example usage with DataFrames
dim_employee, dim_course, dim_designation_course, dim_resignation_record, dim_user_account = create_dimension_tables(
    employees_stage, 
    courses_stage, 
    designation_courses_stage, 
    resignation_records_stage, 
    user_account_stage
)

fact_performance = create_fact_table(course_performances_stage, project_performance_stage)

# Print the structure for testing
print("Dimension - Employee Table")
print(dim_employee.head())

print("\nDimension - Course Table")
print(dim_course.head())

print("\nFact - Performance Table")
print(fact_performance.head())


Dimension - Employee Table
   employee_id first_name last_name               department designation_type  \
0            1      Susan      Snow       Furniture designer                C   
1            2       Lisa     Moses          Hospital doctor                D   
2            3     Autumn   Bridges  Education administrator                A   
3            4    Kristin    Lowery          Patent examiner                A   
4            5       Mark      Cruz         Theatre director                C   

   hire_date employment_status  
0 2017-07-28            active  
1 2022-04-06            active  
2 2019-04-04          rejected  
3 2023-04-27          rejected  
4 2022-11-12          rejected  

Dimension - Course Table
   course_id course_name                                 course_description  \
0          1    Course 1  Pm major always man open speech seat. Camera i...   
1          2    Course 2  Environment charge technology several data dre...   
2          3    Course 3 

In [34]:
fact_performance.head()

Unnamed: 0,id,employee_id,course_id,course_status,score,completion_date,project_id,engagement_score,teamwork_score,punctuality_score,overall_performance_score
0,2011,1,6,completed,74,2024-03-02,1,96,42,89,23
1,2011,1,6,completed,74,2024-03-02,2,67,37,52,36
2,2011,1,6,completed,74,2024-03-02,3,60,92,30,10
3,2012,1,10,incomplete,0,2020-09-22,1,96,42,89,23
4,2012,1,10,incomplete,0,2020-09-22,2,67,37,52,36
