In [1]:
import datetime
import glob
import math

import pandas as pd
import pymysql
from sqlalchemy import text
from sqlalchemy.engine import create_engine



# Processamento dos dados para a criação do dataset

Neste notebook , exploraremos um pipeline de processamento de dados que extrai, transforma e cria recursos esclarecedores a partir de dados relacionados a funcionários de uma empresa fictícia. O pipeline utiliza bibliotecas Python como Pandas e SQLAlchemy para lidar com dados de várias fontes. Nosso objetivo é demonstrar o processo de obtenção de insights significativos a partir de dados de funcionários, auxiliando na tomada de decisões e em uma compreensão mais profunda da dinâmica do contexto do problema de negócio.

## Principais Etapas no Pipeline:

### Extração de Dados do Banco de Dados:
O pipeline inicia conectando-se a um banco de dados MySQL, de onde extrai informações relacionadas a funcionários da empresa fictícia. Utilizando o SQLAlchemy, estabelece uma conexão e busca dados das tabelas "employees", "accident" e "salaries".

### Criação de Recursos (features):
O pipeline prossegue para criar diversos recursos com base nos dados adquiridos:

- **Dados de Avaliação de Desempenho:**
  Iimporta dados de avaliação de desempenho de um arquivo JSON, capturando métricas relacionadas ao desempenho dos funcionários.
  
- **Número de Projetos por Funcionário:**
  Através de uma consulta SQL, calcula-se o número de projetos com os quais cada funcionário da empresa fictícia esteve associado, gerando insights sobre o engajamento dos funcionários.
  
- **Média de Horas de Trabalho nos Últimos 3 Meses:**
  Combina dados de horas de trabalho de vários arquivos Excel para calcular a média de horas trabalhadas por cada funcionário nos últimos três meses.
  
- **Tempo na Empresa:**
  Comparando as datas de contratação dos funcionários com uma data de referência, o pipeline calcula a duração de cada funcionário na empresa fictícia.
  
- **Incidência de Acidentes de Trabalho:**
   Identifica funcionários que sofreram acidentes de trabalho, criando um recurso binário que indica a ocorrência ou não de acidentes.
  
- **Departamento, Salário e Status de Emprego:**
  Dados das tabelas "employees" e "salaries" são combinados para gerar um único dataset sobre departamentos, salários e status de emprego dos funcionários.
  
**Nota:** Todos os códigos usados neste notebook serão incorporados como tarefas em DAGs (Directed Acyclic Graphs) no Apache Airflow. Os arquivos das respectivas DAGs serão indicados ao longo deste notebook, permitindo uma visualização completa da implementação prática.

***

# Data Processing for Dataset Creation

In this notebook, we will explore a data processing pipeline that extracts, transforms, and creates informative features from data related to employees of a fictional company. The pipeline uses Python libraries such as Pandas and SQLAlchemy to handle data from various sources. Our goal is to demonstrate the process of gaining meaningful insights from employee data, aiding in decision-making, and providing a deeper understanding of the business problem's context.

## Key Steps in the Pipeline:

### Data Extraction from the Database:
The pipeline begins by connecting to a MySQL database, from which it extracts employee-related information of the fictional company. Using SQLAlchemy, it establishes a connection and retrieves data from the "employees," "accident," and "salaries" tables.

### Feature Creation:
The pipeline proceeds to create several features based on the acquired data:

- **Performance Evaluation Data:**
  Imports performance evaluation data from a JSON file, capturing metrics related to employee performance.

- **Number of Projects per Employee:**
  Through an SQL query, the number of projects each employee of the fictional company has been associated with is calculated, providing insights into employee engagement.

- **Average Working Hours in the Last 3 Months:**
  Combines working hours data from various Excel files to calculate the average hours worked per employee in the last three months.

- **Tenure at the Company:**
  By comparing employees' hiring dates with a reference date, the pipeline calculates the duration of each employee's tenure at the fictional company.

- **Workplace Accident Incidence:**
  Identifies employees who have experienced workplace accidents, creating a binary feature indicating the occurrence or absence of accidents.

- **Department, Salary, and Employment Status:**
  Data from the "employees" and "salaries" tables are combined to generate a unified dataset regarding departments, salaries, and employment status of the employees.

  **Note:** All the codes used in this notebook will be incorporated as tasks in Directed Acyclic Graphs (DAGs) within the Apache Airflow framework. The files of the respective DAGs will be referenced throughout this notebook, allowing for a comprehensive view of the practical implementation.


In [2]:
#**Note:** Initially I will not connect directly to the minIO
# This is just a test to verify if the connection and inspect the files

# --------------------------------------------------------------
# Connecting with the database (in loco)
# --------------------------------------------------------------

mysql_server = "127.0.0.1"  # IP address of the database server
mysql_login = "root"  # Login name of the database server
mysql_password = "0000"  # Password of the database server
mysql_name = "employees"  # Name of the database

engine = create_engine(
    f"mysql+pymysql://{mysql_login}:{mysql_password}@{mysql_server}:3307/{mysql_name}"
)


In [4]:
# --------------------------------------------------------------
# Creating the features: satisfaction_level e last_evaluation
# --------------------------------------------------------------

df_performance_evaluation = pd.read_json(
    "../datalake/landing/performance-evaluation/employee_performance_evaluation.json",
    orient="records",
    lines=True,
)

df_performance_evaluation.head()

Unnamed: 0,emp_id,satisfaction_level,last_evaluation
0,10001,38,53
1,10002,80,86
2,10003,11,88
3,10004,72,87
4,10005,37,52


Related to the following DAG: [etl_satisfaction_evaluation_att.py](../airflow/dags/etl_satisfaction_evaluation_att.py)

In [6]:
# --------------------------------------------------------------
# Defining query to return the number of projects per employee
# --------------------------------------------------------------

# This SQL query above calculates the number of projects for each employee (emp_id)
# in the "projects_emp" table.
# It counts the occurrences of each unique emp_id and presents the result as
# "number_projects".

query = """SELECT emp_id, Count(PROJECT_ID) as number_projects
FROM projects_emp
GROUP BY (emp_id);"""

df_number_projects = pd.read_sql_query(query, engine)

df_number_projects.head()

Unnamed: 0,emp_id,number_projects
0,10001,2
1,10002,5
2,10003,7
3,10004,5
4,10005,2


Related to the following DAG: [etl_number_projects_att.py](../airflow/dags/etl_number_projects_att.py)

In [9]:
# --------------------------------------------------------------
# Creating the feature: mean_work_last_3_months
# --------------------------------------------------------------

# Average hours worked by each employee in the last 3 mese
df_sistema_ponto = pd.DataFrame(data=None, columns=["emp_id", "data", "hora"])  # empty

# Read the .xlsx spreadsheet data
for sheet in glob.glob("../datalake/landing/working-hours/*.xlsx"):
    print(sheet)
    df_ = pd.read_excel(sheet)
    df_sistema_ponto = pd.concat([df_sistema_ponto, df_])


# Converting the attribute to the Datetime
df_sistema_ponto["hora"] = pd.to_numeric(df_sistema_ponto["hora"])
df_sistema_ponto.info()

../datalake/landing/working-hours\employee_date_hour_0.xlsx
../datalake/landing/working-hours\employee_date_hour_1.xlsx
../datalake/landing/working-hours\employee_date_hour_2.xlsx
../datalake/landing/working-hours\employee_date_hour_3.xlsx
../datalake/landing/working-hours\employee_date_hour_4.xlsx
../datalake/landing/working-hours\employee_date_hour_5.xlsx
<class 'pandas.core.frame.DataFrame'>
Int64Index: 3225000 entries, 0 to 537499
Data columns (total 3 columns):
 #   Column  Dtype         
---  ------  -----         
 0   emp_id  object        
 1   data    datetime64[ns]
 2   hora    int64         
dtypes: datetime64[ns](1), int64(1), object(1)
memory usage: 98.4+ MB


In [13]:
# Filtering only records of the last 3 months
df_last_3_month = df_sistema_ponto[
    (df_sistema_ponto["data"] > datetime.datetime(2020, 9, 30))
]
df_sistema_ponto.tail()

Unnamed: 0,emp_id,data,hora
537495,15000,2020-12-27,12
537496,15000,2020-12-28,5
537497,15000,2020-12-29,11
537498,15000,2020-12-30,14
537499,15000,2020-12-31,11


In [14]:
# Checking the counting of records per employee
df_last_3_month.groupby("emp_id").count()

Unnamed: 0_level_0,data,hora
emp_id,Unnamed: 1_level_1,Unnamed: 2_level_1
10001,92,92
10002,92,92
10003,92,92
10004,92,92
10005,92,92
...,...,...
24996,92,92
24997,92,92
24998,92,92
24999,92,92


In [15]:
# Finally, calculating the average value of the amount of hours in the last 3 months.
mean_work_last_3_months = df_last_3_month.groupby("emp_id")["hora"].agg("sum") / 3
mean_work_last_3_months.head()

emp_id
10001    266.000000
10002    252.000000
10003    289.666667
10004    275.666667
10005    292.333333
Name: hora, dtype: float64

Related to the following DAG: [etl_mean_work_last_3_months_att.py](../airflow/dags/etl_mean_work_last_3_months_att.py)

In [16]:
# --------------------------------------------------------------
# Creating the feature: time_in_company
# --------------------------------------------------------------

# Calculating the time each employee is in the company

# Defining a reference date
date_referencia = datetime.date(2021, 1, 1)

# Defining a query to return data from the "employees" table
query = """SELECT hire_date
FROM employees;"""

df_hire_date = pd.read_sql_query(query, engine)
df_hire_date.head()

Unnamed: 0,hire_date
0,2018-01-17
1,2015-02-02
2,2017-01-22
3,2016-01-28
4,2018-01-17


In [17]:
# Converting the type of data to Datetime.
df_hire_date["hire_date"] = pd.to_datetime(df_hire_date["hire_date"])
df_hire_date.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14999 entries, 0 to 14998
Data columns (total 1 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   hire_date  14999 non-null  datetime64[ns]
dtypes: datetime64[ns](1)
memory usage: 117.3 KB


In [18]:
# Calculating the difference in days from the employee's hiring date to the reference date
days_diff = []
for d in df_hire_date["hire_date"]:  # Looping thrrough the employee's hiring date
    diff = date_referencia - d.date()  # Reference date - employee's hiring date
    days_diff.append(diff.days)

days_diff[:10]

[1080, 2160, 1440, 1800, 1080, 1080, 1440, 1800, 1800, 1080]

In [19]:
# Number of days in number of years
nyears = []
for ndays in days_diff:
    nyears.append(int(math.ceil(ndays / 365)))

nyears[:20]

[3, 6, 4, 5, 3, 3, 4, 5, 5, 3, 3, 4, 5, 3, 3, 3, 3, 6, 3, 5]

In [20]:
# Adding the new column to the dataframe
df_hire_date["time_in_company"] = nyears
df_hire_date.head()

Unnamed: 0,hire_date,time_in_company
0,2018-01-17,3
1,2015-02-02,6
2,2017-01-22,4
3,2016-01-28,5
4,2018-01-17,3


Related to the following DAG: [etl_time_in_company_att.py](../airflow/dags/etl_time_in_company_att.py)

In [22]:
# --------------------------------------------------------------
# Creating the feature: work_accident (binary)
# --------------------------------------------------------------

# Loading the data from the database
df_employees = pd.read_sql_table("employees", engine)
df_accident = pd.read_sql_table("accident", engine)

print(df_employees.head(2))
print(df_accident.head(2))

   emp_no birth_date first_name last_name gender department  left   hire_date
0   10001 1953-09-02     Georgi   Facello      M      sales     1  2018-01-17
1   10002 1964-06-02    Bezalel    Simmel      F      sales     1  2015-02-02
   emp_no                                  Event Description  \
0   10019  EMPLOYEE IS ELECTROCUTED, FALLS FROM A LADDER,...   
1   10069  EMPLOYEE FALLS THROUGH HOLE IN ROOF AND IS KILLED   

                        Event Keywords                       Human Factor  
0  RIB,ELECTRICAL,FRACTURE,LADDER,FALL                              Other  
1       SLIP,FALL PROTECTION,ROOF,FALL  Safety Devices Removed/Inoperable  


In [23]:
# Verifying which employees had an accident (1-yes; 0-no)
work_accident = []
# Iterating through each emp_no in the df_employees DataFrame
for emp in df_employees["emp_no"]:
    # Checking if the emp_no exists in the list of emp_no values from df_accident
    if emp in df_accident["emp_no"].to_list():
        # Appending 1 to the work_accident list if there is a work accident for the emp_no
        work_accident.append(1)
    else:
        # Appending 0 to the work_accident list if there is no work accident for the emp_no
        work_accident.append(0)

In [24]:
# Adding the new column to the mew dataframe
df_work_accident = pd.DataFrame(data=None, columns=["work_accident"])
df_work_accident["work_accident"] = work_accident
df_work_accident.groupby(work_accident).count()

Unnamed: 0,work_accident
0,12830
1,2169


Related to the following DAG: [etl_work_accident_att.py](../airflow/dags/etl_work_accident_att.py)

In [26]:
# --------------------------------------------------------------
# Creating the features: departament, salary and left
# --------------------------------------------------------------

# This query combines data from two tables, "employees" and "salaries", using an
# inner join. The result will include the "department" column from the "employees"
# table, the "salary" column from the "salaries" table, and the "left" column
# (employment status) from the "employees" table. The join is established by matching
# the "emp_no" column from the "employees" table with the "emp_id" column from the
# "salaries" table. This way, the query presents a comprehensive view of employee
# department, salary, and employment status in a single result set.

query = """SELECT emp.department as department,sal.salary as salary, emp.left
FROM employees emp
INNER JOIN salaries sal
ON emp.emp_no = sal.emp_id;
"""

df_department_salary_left = pd.read_sql_query(query, engine)
df_department_salary_left.head()

Unnamed: 0,department,salary,left
0,sales,low,1
1,sales,medium,1
2,sales,medium,1
3,sales,low,1
4,sales,low,1


Related to the following DAG: [etl_department_salary_left_att.py](../airflow/dags/etl_department_salary_left_att.py)

---

Final DAG where combine all the previously generated dataframes into a single one): [etl_employees_dataset.py](../airflow/dags/etl_employees_dataset.py)

**Next step:** [`02_visualize.ipynb`](02_visualize.ipynb)