# DDL Statements

In [1]:
import psycopg2
import pandas as pd

def connect_to_db():
    """Connect to the PostgreSQL database."""
    try:
        # conn = psycopg2.connect("dbname='airflow' user='airflow' password='airflow' host='postgres'")
        conn = psycopg2.connect(
            host="db",  # This is the name of the PostgreSQL service in Docker Compose
            database="mydatabase",
            user="postgres",
            password="password"
        )
        return conn
    except Exception as e:
        print(f"Error connecting to the database: {e}")
        return None

conn = connect_to_db()

# Function to execute SQL query
def execute_query(query):
    try:
        with conn.cursor() as cur:
            cur.execute(query)
            conn.commit()
    except Exception as e:
        print("An error occurred:", e)
        conn.rollback()


# Function to fetch data as DataFrame
def fetch_query(query):
    return pd.read_sql_query(query, conn)

def load_data(file_path, table_name):
    with conn.cursor() as cur:
        with open(file_path, 'r') as f:
            next(f)  # Skip the header row
            # cur.copy_expert(f"COPY {table_name} FROM STDIN WITH CSV HEADER", f)
            cur.execute(f"TRUNCATE {table_name}", f)
            cur.copy_expert(f"COPY {table_name} FROM STDIN WITH CSV", f)
        conn.commit()


In [2]:
execute_query("""CREATE TABLE IF NOT EXISTS employees (
                    employee_id INT,
                    branch_id INT,
                    salary DECIMAL(10, 2),
                    join_date DATE,
                    resign_date DATE);
                    
                  CREATE TABLE IF NOT EXISTS timesheets (
                    timesheet_id INT,
                    employee_id INT,
                    date DATE,
                    checkin TIME,
                    checkout TIME);
  
                  CREATE TABLE IF NOT EXISTS salary_per_hours_sql (
                    branch_id INT,
                    year INT,
                    month INT,
                    salary_per_hour DECIMAL(10,2))""")

In [3]:
load_data('data/employees.csv', 'employees')
load_data('data/timesheets.csv', 'timesheets')

In [4]:
# execute_query("""DROP TABLE IF EXISTS employees;
#                  DROP TABLE IF EXISTS timesheets;
#                  DROP TABLE IF EXISTS salary_per_hours_sql;""")

## EDA

## Timesheets Table

- check is there any duplicate for timesheets tables

In [5]:
fetch_query("""
                SELECT 
                    COUNT(timesheet_id) AS raw_timesheet_id, 
                    COUNT(distinct(timesheet_id)) AS unique_timesheet_id
                FROM timesheets;
            """)
# answer: there is no duplicate data

  return pd.read_sql_query(query, conn)


Unnamed: 0,raw_timesheet_id,unique_timesheet_id
0,39714,39714


- check is company implements two shifts

In [6]:
fetch_query("""
                SELECT max(checkin),min(checkin)
                FROM timesheets;
            """)
# answer: seems company implement two shift, because the result of checkin time is 24 hours

  return pd.read_sql_query(query, conn)


Unnamed: 0,max,min
0,23:59:00,00:00:14


- check is there any missing value for checkin or checkout column

In [7]:
fetch_query("""
                SELECT *
                FROM timesheets
                WHERE checkin is null OR checkout is null
                LIMIT 5;
            """)
# it seems there is missing value on checkin or checkout

  return pd.read_sql_query(query, conn)


Unnamed: 0,timesheet_id,employee_id,date,checkin,checkout
0,23907435,63,2019-08-21,09:55:47,
1,23907445,60,2019-08-22,,18:04:33
2,23907459,31,2019-08-26,,17:57:45
3,23907468,22,2019-08-27,,18:25:52
4,23907470,21,2019-08-27,,18:35:22


- check is there any missing value for both of column (checkin and checkout)

In [8]:
fetch_query("""
                SELECT *
                FROM timesheets
                WHERE checkin is null AND checkout is null;
            """)
# all good, there is no missing for both

  return pd.read_sql_query(query, conn)


Unnamed: 0,timesheet_id,employee_id,date,checkin,checkout


## Employees Table

- check is there any duplicate for employees table

In [9]:
fetch_query("""
            SELECT 
                COUNT(DISTINCT(employee_id)) AS unique_total_employee,
                COUNT(employee_id) AS raw_total_employee
            FROM employees;
 --                       SELECT 
 --               COUNT(DISTINCT(employee_id)) AS unique_total_employee,
 --               COUNT(employee_id) AS raw_total_employee
 --           FROM employees_p;
            """)

# there is duplicate data

  return pd.read_sql_query(query, conn)


Unnamed: 0,unique_total_employee,raw_total_employee
0,176,177


# salary_per_hour_calculation

steps:
- remove duplicate on employees table
- create new column for fill missing value from checkin and checkout, 
    assumption:
      - There is two shifts
      - Indonesia salary rates
      - Working day per month is 22
- create prorated calculation, since in the data itself, many of employee have not complete working day / month base on their join_date

In [10]:
df = fetch_query("""
              WITH 
                    employees_remove_duplicates AS (
                        SELECT 
                            employee_id, 
                            branch_id, 
                            salary, 
                            join_date, 
                            resign_date,
                            ROW_NUMBER() OVER (PARTITION BY employee_id ORDER BY join_date DESC, salary DESC) AS row_num
                        FROM 
                            employees
                    ),
                    clean_employees AS (
                    SELECT 
                        employee_id, 
                        branch_id, 
                        salary, 
                        join_date, 
                        resign_date
                    FROM 
                        employees_remove_duplicates
                    WHERE 
                        row_num = 1),
                    timesheets_modify AS (
                        SELECT 
                        timesheet_id,
                        employee_id,
                        date,
                        checkin,
                        checkout,
                        CASE WHEN checkin is null AND checkout >'12:00:00' AND checkout <'00:00:00' THEN '08:00:00'::TIME
                            WHEN checkin is null AND checkout >'00:00:00' AND checkout <'12:00:00' THEN '17:00:00'::TIME
                            ELSE checkin END AS checkin_new,
                        CASE WHEN checkout is null AND checkin >'00:00:00' AND checkin <'12:00:00' THEN '17:00:00'::TIME
                            WHEN checkout is null AND checkin >'12:00:00'AND checkin < '00:00:00' THEN '08:00:00'::TIME
                            ELSE checkout END AS checkout_new
                        FROM timesheets
                        ),
                    timesheets_duration AS (
                        SELECT 
                            timesheet_id,
                            employee_id,
                            date,
                            CASE
                                WHEN checkin_new > '00:00:00' AND checkin_new < '12:00:00' THEN EXTRACT(EPOCH FROM (checkout_new - checkin_new))/3600.0
                                WHEN checkin_new > '12:00:00' AND checkin_new < '00:00:00' THEN EXTRACT(EPOCH FROM (checkin_new - checkout_new))/3600.0
                                WHEN checkout_new > '12:00:00' AND checkout_new < '00:00:00' THEN EXTRACT(EPOCH FROM (checkout_new - checkin_new))/3600.0
                                WHEN checkout_new > '00:00:00' AND checkout_new < '12:00:00' THEN EXTRACT(EPOCH FROM (checkin_new - checkout_new))/3600.0
                                ELSE 9.0 END AS total_hours 
                        FROM timesheets_modify),
                    gross_total_hours AS (
                        SELECT 
                            count(t1.employee_id) as total_day,
                            t1.employee_id,
                            t2.branch_id,
                            SUM(t1.total_hours) AS total_hours,
                            min(t2.salary) as salary,
                            EXTRACT(YEAR FROM date) AS year,
                            EXTRACT(MONTH FROM date) AS month
                            FROM timesheets_duration t1
                            LEFT JOIN clean_employees t2
                            ON t1.employee_id = t2.employee_id 
                        WHERE salary is not null OR salary != 0
                        GROUP BY t1.employee_id,t2.branch_id ,EXTRACT(YEAR FROM date), EXTRACT(MONTH FROM date)
                        ),
                    prorated_salary AS (
                        SELECT 
                            total_day,
                            employee_id,
                            branch_id,
                            total_hours,
                            salary,
                            year,
                            month,
                            CASE 
                                WHEN total_day > 22 THEN salary
                                ELSE ROUND((total_day/22.0),2) * salary
                                END AS prorated_salary
                        FROM gross_total_hours
                        ),
                        salary_per_hour_calculation AS ( 
                        SELECT 
                            total_day,
                            employee_id,
                            branch_id,
                            total_hours,
                            salary,
                            year,
                            month,
                            prorated_salary/total_hours AS salary_per_hour
                        FROM prorated_salary),
                        branch_salary_per_hour AS (
                            SELECT 
                                branch_id,
                                year::INT,
                                month::INT,
                                AVG(salary_per_hour) AS salary_per_hour
                            FROM salary_per_hour_calculation
                            GROUP BY branch_id, year, month)
                        SELECT *
                        FROM branch_salary_per_hour
                        --ORDER BY branch_id ASC
                    """)

  return pd.read_sql_query(query, conn)


In [11]:
# validate the data
df.describe()

Unnamed: 0,branch_id,year,month,salary_per_hour
count,239.0,239.0,239.0,239.0
mean,2868.891213,2019.736402,7.523013,49070.799028
std,2999.106036,0.441509,3.512058,7230.733891
min,1.0,2019.0,1.0,32618.770967
25%,2590.0,2019.0,4.5,44116.172459
50%,2629.0,2020.0,9.0,49162.377224
75%,2633.5,2020.0,10.5,53930.160015
max,12722.0,2020.0,12.0,67382.995588


Based on summary statistic, it seems the data looks good, because there is no negative value for salary_per_hour and the number is make sense if we assume this rate is for Indonesia Salary 

In [12]:
df.drop_duplicates()

Unnamed: 0,branch_id,year,month,salary_per_hour
0,3092,2020,12,42176.091807
1,2623,2020,12,44472.127667
2,2635,2020,7,39458.843961
3,3092,2020,7,38028.653550
4,3092,2020,6,40642.517410
...,...,...,...,...
234,1,2020,9,46288.250603
235,2625,2020,1,55014.315394
236,2625,2020,6,50331.665013
237,2625,2019,10,52753.037964


# Save transform result

In [13]:
import os
import pandas as pd

directory = 'data/transform/sql'
file_name = 'salary_per_hours.csv'
if not os.path.exists(directory):
    os.makedirs(directory)

# Save the DataFrame to a CSV file in the directory
csv_file_path = os.path.join(directory, file_name)
df.to_csv(csv_file_path, index=False)

# Load to final table

In [14]:
load_data(csv_file_path, 'salary_per_hours_sql')

In [15]:
final_df = fetch_query('SELECT year, month, branch_id, salary_per_hour FROM salary_per_hours_sql')

  return pd.read_sql_query(query, conn)


In [16]:
final_df.describe()

Unnamed: 0,year,month,branch_id,salary_per_hour
count,239.0,239.0,239.0,239.0
mean,2019.736402,7.523013,2868.891213,49070.799163
std,0.441509,3.512058,2999.106036,7230.734259
min,2019.0,1.0,1.0,32618.77
25%,2019.0,4.5,2590.0,44116.17
50%,2020.0,9.0,2629.0,49162.38
75%,2020.0,10.5,2633.5,53930.16
max,2020.0,12.0,12722.0,67383.0


In [17]:
final_df

Unnamed: 0,year,month,branch_id,salary_per_hour
0,2020,12,3092,42176.09
1,2020,12,2623,44472.13
2,2020,7,2635,39458.84
3,2020,7,3092,38028.65
4,2020,6,3092,40642.52
...,...,...,...,...
234,2020,9,1,46288.25
235,2020,1,2625,55014.32
236,2020,6,2625,50331.67
237,2019,10,2625,52753.04
