# Week #6 - Live Class
Data Pipeline Course - Sekolah Engineer - Pacmann Academy 



## Objective

Objective:
1. Create Data Pipeline for integrating clinic data

## Case Description

1. `Problem` <br>
In the clinic's data infrastructure, there are multiple data sources that need to be integrated and cleaned. Specifically, data is coming from various systems including spreadsheet maintenance request records, the Clinic database, and the Clinic Ops database. 
    - Spreadsheet maintenance request data: Contains request for maintenance clinic's equipment
    - the Clinic Database: Contains data appointment and prescription patient
    - Clinic Ops databases: Contains operasional data, like employee salary, leave request, equipment, etc

2. `Solution` <br>
To address these issues, an ETL (Extract, Transform, Load) pipeline will be developed. This pipeline will extract data from the different sources, apply necessary transformations to clean and standardize the data, and then load it into a unified data warehouse. 
The pipeline will have 2 Layers, Staging and Warehouse, have Log system and Validation system <br>

<img src='https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-data-ingestion-spark/data_pipeline_workflow.png' width="800"> <br>

## Preparation

`Source Dataset`: 
1. Duplicate maintenance_request data: [Link](https://docs.google.com/spreadsheets/d/1hahMgeJw_ki35tANErRzbxzFMJJKV3aetIboL4_vJ-o/edit?usp=sharing)
2. Restore Database Clinic: [Link](https://drive.google.com/file/d/1ClXTIIKaELOei7TB9eGBY0Y7hHY-_2It/view?usp=sharing)
3. Restore Database Clinic Ops: [Link](https://drive.google.com/file/d/1UlMTbWRLHtuss4huR4icnJDWlijPjdw1/view?usp=sharing)

`Target Storage`
1. Staging: [Link](https://drive.google.com/file/d/1KxLDIaYSHf8inbZ2fLN0sDeckSfemaXv/view?usp=sharing)
2. Warehouse: [Link](https://drive.google.com/file/d/18ShJnBZwIKO3CGFXlANa9xmnIB-lhrAb/view?usp=sharing)
3. Log:[Link](https://drive.google.com/file/d/1uSXglsJLVupIfIKnm2_6H31s7x5w5AYB/view?usp=sharing)

`Tools and Technologies`:
- Python: For build Data Pipeline
- PostgreSQL: For log, staging and final data storage.
- MinIO: For load failed data, load vaidation and profiling report.
- Docker: For running MinIO

`Repository`:
https://github.com/Kurikulum-Sekolah-Pacmann/pipeline-clinic.git 


## Task

1. `Profiling` <br>
Profiling involves analyzing and understanding the structure, content, and quality of the data from multiple sources within the clinic

2. `Building Data Pipeline EL Source to Staging` <br>
This step focuses on extracting data from the clinic’s various source systems, such as maintenance_request data, Clinic Database, and Clinic Ops Database. The extracted data is then loaded into a staging area

3. `Building Data Pipeline ETL Staging to Warehouse` <br>
In this phase, the cleaned and validated data from the staging area is transformed and loaded into the data warehouse

### Profiling

In this task, you will conduct a profiling of each table or spreadsheet provided in the dataset. The profiling process involves the following steps:
1. Check Unique Values: 
2. Check Data Types
3. Check Percentage of Missing Values
4. Check Percentage of Valid Date Format

The first step in the profiling process is to `extract the data` from the source systems. This involves gathering data from various tables or spreadsheets that are part of the clinic's operational and clinical data repositories.

In [5]:
from src.profiling.profiling import Profiling
from src.profiling.extract.extract_db import extract_database
from src.profiling.extract.extract_db import extract_list_table
from src.profiling.extract.extract_spreadsheet import extract_sheet

Profiling involves `selecting specific columns from each table` to examine their data types, unique values, percentage of missing values, and percentage of valid date formats.

#### Profiling Data Spreadsheet

In [23]:
# Extract data from spreadsheet
df_maintenance_request = extract_sheet('maintenance_request')

In [24]:
# create profiling object
maintenance_request_profiling = Profiling(data = df_maintenance_request, table_name='maintenance_request')

In [25]:
# get columns from the table
maintenance_request_profiling.get_columns()

['name', 'serial_number', 'request_date', 'location', 'request_note']

In [15]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = maintenance_request_profiling.get_columns()

#list check unique values
unique_values_column = ['location','request_note']

#list check percentage missing values
missing_values_column = ['request_date','location','request_note']

#list check valid date values
valid_date_column = ['request_date']

# Set Profiling rule to object
maintenance_request_profiling.selected_columns(data_type_column, unique_values_column, missing_values_column, valid_date_column)


In [16]:
# Create Reporting Profiling
report_maintenance_request = maintenance_request_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'name': {'data_type': 'object'}, 'serial_number': {'data_type': 'object'}, 'request_date': {'data_type': 'object', 'percentage_missing_value': 0.0, 'percentage_valid_date': 100.0}, 'location': {'data_type': 'object', 'unique_value': ['labour room', 'Ophthalmic and ENT', 'OPD', 'General Instruments', 'Operation Theater', 'Ophthalmic and ENTm', 'Operation Theater or Manual'], 'percentage_missing_value': 0.0}, 'request_note': {'data_type': 'object', 'unique_value': ['Equipment inspection', 'Routine maintenance', 'Equipment calibration', 'Equipment cleaning', 'Equipment repair'], 'percentage_missing_value': 0.0}}}


In [17]:
report_maintenance_request

{'created_at': '2024-08-13',
 'report': {'name': {'data_type': 'object'},
  'serial_number': {'data_type': 'object'},
  'request_date': {'data_type': 'object',
   'percentage_missing_value': 0.0,
   'percentage_valid_date': 100.0},
  'location': {'data_type': 'object',
   'unique_value': ['labour room',
    'Ophthalmic and ENT',
    'OPD',
    'General Instruments',
    'Operation Theater',
    'Ophthalmic and ENTm',
    'Operation Theater or Manual'],
   'percentage_missing_value': 0.0},
  'request_note': {'data_type': 'object',
   'unique_value': ['Equipment inspection',
    'Routine maintenance',
    'Equipment calibration',
    'Equipment cleaning',
    'Equipment repair'],
   'percentage_missing_value': 0.0}}}

#### Profiling Data Database Clinic

In [2]:
# Extract list of table in Database Clinic
list_table = extract_list_table(db_name='clinic')
print(list_table)

     table_name
0       patient
1   appointment
2    speciality
3        doctor
4  prescription
5    medication


In [4]:
# Profiling Table patient
df_patient = extract_database('clinic', 'patient')

# create profiling object
patient_profiling = Profiling(data = df_patient, table_name='patient')

# get columns from the table
patient_profiling.get_columns()

['patient_id',
 'name',
 'dob',
 'gender',
 'phone_number',
 'address',
 'state_code',
 'created_at']

In [5]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = patient_profiling.get_columns()

#list check unique values
unique_values = ['state_code']

#list check percentage missing values
missing_values = ['phone_number', 'address', 'state_code']

#list check valid date values
valid_date = ['dob']

# Set Profiling rule to object
patient_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)

In [6]:
# Create Reporting Profiling
report_patient = patient_profiling.reporting()
report_patient

{'created_at': '2024-08-13', 'report': {'patient_id': {'data_type': 'int64'}, 'name': {'data_type': 'object'}, 'dob': {'data_type': 'object', 'percentage_valid_date': 100.0}, 'gender': {'data_type': 'object'}, 'phone_number': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'address': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'state_code': {'data_type': 'object', 'unique_value': ['WA', 'NSW', 'NT', 'VIC', 'QLD', 'TAS', 'FO', 'YQ', 'PY', 'ZV', 'RB', 'PU', 'BP', 'LK', 'SV', 'GB', 'HJ', 'JN', 'EC', 'WF', 'SD', 'NY', 'RD', 'LP', 'IU', 'BY', 'JT', 'KH', 'OL', 'VC', 'YG', 'AT', 'UT', 'JA', 'LD', 'EV', 'BD', 'TA', 'OZ', 'CT', 'LB', 'DR', 'FW', 'TL', 'BS', 'PT', 'TI', 'PN', 'QX', 'IJ', 'HV', 'PR', 'WV', 'IQ', 'TR', 'GZ', 'NV', 'EJ', 'NE', 'HB', 'BA', 'AS', 'OP', 'AN', 'UU', 'PG', 'DC', 'YP', 'AH', 'CO', 'MR', 'BX', 'ZN', 'NP', 'PB', 'EW', 'SA', 'WT', 'WL', 'KS', 'CH', 'YF', 'FA', 'AF', 'DN', 'MW', 'JM', 'XI', 'FU', 'MM', 'IF', 'IH', 'CB', 'IA', 'XK', 'FI', 'CV', 'LQ', 

{'created_at': '2024-08-13',
 'report': {'patient_id': {'data_type': 'int64'},
  'name': {'data_type': 'object'},
  'dob': {'data_type': 'object', 'percentage_valid_date': 100.0},
  'gender': {'data_type': 'object'},
  'phone_number': {'data_type': 'object', 'percentage_missing_value': 0.0},
  'address': {'data_type': 'object', 'percentage_missing_value': 0.0},
  'state_code': {'data_type': 'object',
   'unique_value': ['WA',
    'NSW',
    'NT',
    'VIC',
    'QLD',
    'TAS',
    'FO',
    'YQ',
    'PY',
    'ZV',
    'RB',
    'PU',
    'BP',
    'LK',
    'SV',
    'GB',
    'HJ',
    'JN',
    'EC',
    'WF',
    'SD',
    'NY',
    'RD',
    'LP',
    'IU',
    'BY',
    'JT',
    'KH',
    'OL',
    'VC',
    'YG',
    'AT',
    'UT',
    'JA',
    'LD',
    'EV',
    'BD',
    'TA',
    'OZ',
    'CT',
    'LB',
    'DR',
    'FW',
    'TL',
    'BS',
    'PT',
    'TI',
    'PN',
    'QX',
    'IJ',
    'HV',
    'PR',
    'WV',
    'IQ',
    'TR',
    'GZ',
    'NV',
    'E

In [7]:
# Profiling Table doctor
df_doctor = extract_database('clinic', 'doctor')

# create profiling object
doctor_profiling = Profiling(data = df_doctor, table_name='doctor')

# get columns from the table
doctor_profiling.get_columns()

['doctor_id', 'name', 'phone_number', 'speciality_id', 'created_at']

In [8]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = doctor_profiling.get_columns()

#list check unique values
unique_values = []

#list check percentage missing values
missing_values = ['phone_number']

#list check valid date values
valid_date = []

# Set Profiling rule to object
doctor_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)


In [9]:
# Create Reporting Profiling
report_doctor = doctor_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'doctor_id': {'data_type': 'int64'}, 'name': {'data_type': 'object'}, 'phone_number': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'speciality_id': {'data_type': 'float64'}, 'created_at': {'data_type': 'datetime64[ns, UTC]'}}}


In [12]:
# Profiling Table specialty
df_specialty = extract_database('clinic', 'speciality')

# create profiling object
specialty_profiling = Profiling(data = df_specialty, table_name='speciality')

# get columns from the table
specialty_profiling.get_columns()

['speciality_id', 'name', 'created_at']

In [13]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = specialty_profiling.get_columns()

#list check unique values
unique_values = ['name']

#list check percentage missing values
missing_values = []

#list check valid date values
valid_date = []

# Set Profiling rule to object
specialty_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)


In [14]:
# Create Reporting Profiling
report_specialty = specialty_profiling.reporting()


{'created_at': '2024-08-13', 'report': {'speciality_id': {'data_type': 'int64'}, 'name': {'data_type': 'object', 'unique_value': ['Cardiology', 'Dermatology', 'Endocrinology', 'Gastroenterology', 'Neurology', 'Ophthalmology', 'Pediatrics']}, 'created_at': {'data_type': 'datetime64[ns, UTC]'}}}


In [15]:
# Profiling Table medication
df_medication = extract_database('clinic', 'medication')

# create profiling object
medication_profiling = Profiling(data = df_medication, table_name='medication')

# get columns from the table
medication_profiling.get_columns()


['medication_id',
 'name',
 'manufacturer',
 'dosage_form',
 'strength',
 'description',
 'created_at']

In [16]:
#Set Profiling Rule
# list check data type (all columns)
data_type_column = medication_profiling.get_columns()

#list check unique values
unique_values = ['manufacturer', 'dosage_form', 'strength']

#list check percentage missing values
missing_values = ['manufacturer', 'dosage_form', 'strength', 'description']

#list check valid date values
valid_date = []

# Set Profiling rule to object
medication_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)

In [17]:
# Create Reporting Profiling
report_medication = medication_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'medication_id': {'data_type': 'int64'}, 'name': {'data_type': 'object'}, 'manufacturer': {'data_type': 'object', 'unique_value': ['ABC Pharma', 'XYZ Pharmaceuticals', 'MediCo', 'Pharmalife', 'HealthMeds', 'Wellness Drugs', 'BreathEasy', 'HealPharma', 'AllergyCare', 'HeartGuard', 'CardioHealth', 'ThyroidWell', 'CholesterolControl', 'ReliefMeds', 'MindBalance', 'BloodCare', 'SleepEase', 'KidneyCare', 'PainRelief', 'WaterBalance', 'MicroMed', 'GastroGuard', 'InflammationControl', 'AsthmaCare', 'HeartCare', 'MediCure', 'BreatheEasy', 'HappinessRx', 'MoodStabilize', 'JointRelief', 'StomachEase', 'SleepWell', 'EmotionBalance', 'NeuroCare', 'AnxietyControl', 'SleepAid', 'ThyroidCare', 'AllergyControl', 'AnxietyEase', 'LoveLife', 'AllergyRelief', 'MoodBalance', 'MentalHealth', 'AnxietyRelief', 'CalmEase'], 'percentage_missing_value': 0.0}, 'dosage_form': {'data_type': 'object', 'unique_value': ['Tablet', 'Capsule', 'Inhaler'], 'percentage_missing_value'

In [18]:
# Profiling Table appointment
df_appointment = extract_database('clinic', 'appointment')

# create profiling object
appointment_profiling = Profiling(data = df_appointment, table_name='appointment')

# get columns from the table
appointment_profiling.get_columns()


['appointment_id',
 'patient_id',
 'doctor_id',
 'appointment_date',
 'notes',
 'status',
 'created_at']

In [19]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = appointment_profiling.get_columns()

#list check unique values
unique_values = ['status']

#list check percentage missing values
missing_values = appointment_profiling.get_columns()

#list check valid date values
valid_date = ['appointment_date']

# Set Profiling rule to object
appointment_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)


In [20]:
# Create Reporting Profiling
report_appointment = appointment_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'appointment_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'patient_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'doctor_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'appointment_date': {'data_type': 'datetime64[ns]', 'percentage_missing_value': 0.0, 'percentage_valid_date': 100.0}, 'notes': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'status': {'data_type': 'object', 'unique_value': ['completed', None, 'Cancelled'], 'percentage_missing_value': 35.095238095238095}, 'created_at': {'data_type': 'datetime64[ns, UTC]', 'percentage_missing_value': 0.0}}}


In [21]:
# Profiling Table prescription
df_prescription = extract_database('clinic', 'prescription')

# create profiling object
prescription_profiling = Profiling(data = df_prescription, table_name='prescription')

# get columns from the table
prescription_profiling.get_columns()

['prescription_id', 'appointment_id', 'medication_id', 'created_at']

In [22]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = prescription_profiling.get_columns()

#list check unique values
unique_values = []

#list check percentage missing values
missing_values = prescription_profiling.get_columns()

#list check valid date values
valid_date = []

# Set Profiling rule to object
prescription_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)

In [23]:
# Create Reporting Profiling
report_prescription = prescription_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'prescription_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'appointment_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'medication_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'created_at': {'data_type': 'datetime64[ns, UTC]', 'percentage_missing_value': 0.0}}}


#### Profiling Database Clinic Ops

In [6]:
# Extract Table from Database Clinic Operation
list_table = extract_list_table(db_name='clinic_ops')
print(list_table)

           table_name
0           equipment
1          speciality
2                role
3              salary
4      leave_requests
5            employee
6  maintenance_record


In [26]:
# Profiling Table employee
df_employee = extract_database('clinic_ops', 'employee')

# create profiling object  
employee_profiling = Profiling(data = df_employee, table_name='employee')

# get columns from the table
employee_profiling.get_columns()

['employee_id',
 'name',
 'phone_number',
 'speciality_id',
 'role_id',
 'created_at']

In [27]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = employee_profiling.get_columns()

#list check unique values
unique_values = []

#list check percentage missing values
missing_values = employee_profiling.get_columns()

#list check valid date values
valid_date = []

# Set Profiling rule to object
employee_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)


In [28]:
# Create Reporting Profiling
report_employee = employee_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'employee_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'name': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'phone_number': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'speciality_id': {'data_type': 'float64', 'percentage_missing_value': 89.46428571428572}, 'role_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'created_at': {'data_type': 'datetime64[ns, UTC]', 'percentage_missing_value': 0.0}}}


In [29]:
# Profiling Table speciality
df_speciality = extract_database('clinic_ops', 'speciality')

# create profiling object
speciality_profiling = Profiling(data = df_speciality, table_name='speciality')

# get columns from the table
speciality_profiling.get_columns()

['speciality_id', 'name', 'created_at']

In [30]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = speciality_profiling.get_columns()

#list check unique values
unique_values = ['name']

#list check percentage missing values
missing_values = []

#list check valid date values
valid_date = []

# Set Profiling rule to object
speciality_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)

In [31]:
# Create Reporting Profiling
report_speciality = speciality_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'speciality_id': {'data_type': 'int64'}, 'name': {'data_type': 'object', 'unique_value': ['Cardiology', 'Dermatology', 'Endocrinology', 'Gastroenterology', 'Neurology', 'Ophthalmology', 'Pediatrics']}, 'created_at': {'data_type': 'datetime64[ns, UTC]'}}}


In [32]:
# Profiling Table role
df_role = extract_database('clinic_ops', 'role')

# create profiling object
role_profiling = Profiling(data = df_role, table_name='role')

# get columns from the table
role_profiling.get_columns()

['role_id', 'name', 'description', 'created_at']

In [33]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = role_profiling.get_columns()

#list check unique values
unique_values = ['name']

#list check percentage missing values
missing_values = []

#list check valid date values
valid_date = []

# Set Profiling rule to object
role_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)

In [34]:
# Create Reporting Profiling
report_role = role_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'role_id': {'data_type': 'int64'}, 'name': {'data_type': 'object', 'unique_value': ['receptionist', 'nurse', 'physician', 'technician', 'administrator', 'doctor']}, 'description': {'data_type': 'object'}, 'created_at': {'data_type': 'datetime64[ns, UTC]'}}}


In [35]:
# Profiling Table salary
df_salary = extract_database('clinic_ops', 'salary')

# create profiling object
salary_profiling = Profiling(data = df_salary, table_name='salary')

# get columns from the table
salary_profiling.get_columns()

['salary_id', 'employee_id', 'amount', 'payment_date', 'created_at']

In [37]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = salary_profiling.get_columns()

#list check unique values
unique_values = []

#list check percentage missing values
missing_values = salary_profiling.get_columns()

#list check valid date values
valid_date = ['payment_date']

# Set Profiling rule to object
salary_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)

In [38]:
# Create Reporting Profiling
report_salary = salary_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'salary_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'employee_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'amount': {'data_type': 'float64', 'percentage_missing_value': 0.0}, 'payment_date': {'data_type': 'object', 'percentage_missing_value': 0.0, 'percentage_valid_date': 100.0}, 'created_at': {'data_type': 'datetime64[ns, UTC]', 'percentage_missing_value': 0.0}}}


In [40]:
# Profiling Table leave_request
df_leave_request = extract_database('clinic_ops', 'leave_requests')

# create profiling object
leave_request_profiling = Profiling(data = df_leave_request, table_name='leave_requests')

# get columns from the table
leave_request_profiling.get_columns()


['leave_id',
 'employee_id',
 'leave_type',
 'start_date',
 'end_date',
 'status',
 'created_at']

In [41]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = leave_request_profiling.get_columns()

#list check unique values
unique_values = ['leave_type','status']

#list check percentage missing values
missing_values = leave_request_profiling.get_columns()

#list check valid date values
valid_date = ['start_date','end_date']

# Set Profiling rule to object
leave_request_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)

In [42]:
# Create Reporting Profiling
report_leave_request = leave_request_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'leave_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'employee_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'leave_type': {'data_type': 'object', 'unique_value': ['Annual', 'Parental', 'Casual', 'Religous', 'Sick'], 'percentage_missing_value': 0.0}, 'start_date': {'data_type': 'object', 'percentage_missing_value': 0.0, 'percentage_valid_date': 100.0}, 'end_date': {'data_type': 'object', 'percentage_missing_value': 0.0, 'percentage_valid_date': 100.0}, 'status': {'data_type': 'object', 'unique_value': ['Approved', 'Rejected', 'Pending'], 'percentage_missing_value': 0.0}, 'created_at': {'data_type': 'datetime64[ns, UTC]', 'percentage_missing_value': 0.0}}}


In [43]:
# Profiling Table equipment
df_equipment = extract_database('clinic_ops', 'equipment')

# create profiling object
equipment_profiling = Profiling(data = df_equipment, table_name='equipment')

# get columns from the table
equipment_profiling.get_columns()


['equipment_id',
 'name',
 'serial_number',
 'purchase_date',
 'warranty_expiration',
 'location',
 'created_at']

In [44]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = equipment_profiling.get_columns()

#list check unique values
unique_values = ['location']

#list check percentage missing values
missing_values = equipment_profiling.get_columns()

#list check valid date values
valid_date = []

# Set Profiling rule to object
equipment_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)

In [46]:
# Create Reporting Profiling
report_equipment = equipment_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'equipment_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'name': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'serial_number': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'purchase_date': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'warranty_expiration': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'location': {'data_type': 'object', 'unique_value': ['General Instruments', 'OPD', 'Operation Theater', 'Operation Theater or Manual', 'Ophthalmic and ENT', 'Ophthalmic and ENTm', 'labour room'], 'percentage_missing_value': 0.0}, 'created_at': {'data_type': 'datetime64[ns, UTC]', 'percentage_missing_value': 0.0}}}


In [55]:
# Profiling Table maintenance_report
df_maintenance_report = extract_database('clinic_ops', 'maintenance_record')

# create profiling object
maintenance_report_profiling = Profiling(data = df_maintenance_report, table_name='maintenance_record')

# get columns from the table
maintenance_report_profiling.get_columns()

['record_id',
 'equipment_id',
 'maintenance_date',
 'description',
 'cost',
 'created_at']

In [56]:
# Set Profiling Rule
# list check data type (all columns)
data_type_column = maintenance_report_profiling.get_columns()

#list check unique values
unique_values = []

#list check percentage missing values
missing_values = maintenance_report_profiling.get_columns()

#list check valid date values
valid_date = ['maintenance_date']

# Set Profiling rule to object
maintenance_report_profiling.selected_columns(data_type_column, unique_values, missing_values, valid_date)


In [57]:
# Create Reporting Profiling
report_maintenance_report = maintenance_report_profiling.reporting()

{'created_at': '2024-08-13', 'report': {'record_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'equipment_id': {'data_type': 'int64', 'percentage_missing_value': 0.0}, 'maintenance_date': {'data_type': 'object', 'percentage_missing_value': 0.0, 'percentage_valid_date': 100.0}, 'description': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'cost': {'data_type': 'object', 'percentage_missing_value': 0.0}, 'created_at': {'data_type': 'datetime64[ns, UTC]', 'percentage_missing_value': 0.0}}}


### Pipeline Source to Staging Area

In [15]:
from src.staging.extract.extract_database import extract_database
from src.staging.load.load import load_staging
from src.staging.extract.extract_spreadsheet import extract_spreadsheet

Solution: 
1. Pattern: EL
    - `Data Extraction` involves retrieving data from various sources
    - `Data Loading` involves transferring this raw data into staging systems
2. Data Extraction:
    - Sources: Extract data from spreadsheets, databases clinic and database clinic ops.
    - Techniques: Use both full and incremental extraction methods to retrieve data efficiently.
        - Full Ingestion: For Spreadsheet Data, and Initial Ingestion
        - Incremental Ingestion: For Database Data
3. Data Load:
    - Staging: Load raw data into a staging database (PostgreSQL) without transformation.
    - Failure Handling: Log failed data loads to MinIO object storage
4. Data Staging Schema:

<img src='https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-data-ingestion-spark/staging_clinic_-_public.png' width="800"> <br>



#### Extraction


##### 1. Extract Data From Database Clinic

Extract Data From PostgreSQL

- Full Extraction: Initial Load
- Incremental Extraction: Get new and updated data

Steps:
1. Establish Database Connection: Connects to a PostgreSQL database named clinic.
2. Read Log Data: Reads existing log data from log.csv to determine the last successful extraction timestamp (etl_date).
3. Initial Load or Incremental Extraction:
    - If no previous extraction has been recorded (etl_date is empty), set etl_date to '1111-01-01' indicating the initial load.
    - Otherwise, retrieve data added since the last successful extraction (etl_date).
4. Query Execution: Constructs a SQL query to select all columns from the specified table_name where created_at is greater than etl_date.
5. Data Extraction: Executes the SQL query using pd.read_sql to fetch the data into a Pandas DataFrame (df).

In [16]:
# Extarct Data
df_speciality = extract_database(db_name='clinic', table_name = "speciality")
df_doctor = extract_database(db_name='clinic', table_name = "doctor")
df_patient = extract_database(db_name='clinic', table_name = "patient")
df_medication = extract_database(db_name='clinic', table_name = "medication")
df_appointment = extract_database(db_name='clinic', table_name = "appointment")
df_prescription = extract_database(db_name='clinic', table_name = "prescription")


In [17]:
# Load Extracted Data to Staging
load_staging(data=df_speciality, table_name='speciality', schema='public', idx_name='speciality_id', source='database clinic')
load_staging(data=df_doctor, table_name='doctor', schema='public', idx_name='doctor_id', source='database clinic')
load_staging(data=df_patient, table_name='patient', schema='public', idx_name='patient_id', source='database clinic')
load_staging(data=df_medication, table_name='medication', schema='public', idx_name='medication_id', source='database clinic')
load_staging(data=df_appointment, table_name='appointment', schema='public', idx_name='appointment_id', source='database clinic')
load_staging(data=df_prescription, table_name='prescription', schema='public', idx_name='prescription_id', source='database clinic')

##### 2. Extract Data From Database Clinc Ops

In [11]:
# Extract Data
df_speciality = extract_database(db_name='clinic_ops', table_name='speciality_ops')
df_role = extract_database(db_name='clinic_ops', table_name='role')
df_employee = extract_database(db_name='clinic_ops', table_name='employee')
df_salary = extract_database(db_name='clinic_ops', table_name='salary')
df_leave_requests = extract_database(db_name='clinic_ops', table_name='leave_requests')
df_equipment = extract_database(db_name='clinic_ops', table_name='equipment')
df_maintenance_record = extract_database(db_name='clinic_ops', table_name='maintenance_record')

In [12]:
# Load Extracted Data to Staging
load_staging(data=df_speciality, table_name='speciality_ops', schema='public', idx_name='speciality_id', source='database clinic_ops')
load_staging(data=df_role, table_name='role', schema='public', idx_name='role_id', source='database clinic_ops')
load_staging(data=df_employee, table_name='employee', schema='public', idx_name='employee_id', source='database clinic_ops')
load_staging(data=df_salary, table_name='salary', schema='public', idx_name='salary_id', source='database clinic_ops')
load_staging(data=df_leave_requests, table_name='leave_requests', schema='public', idx_name='leave_id', source='database clinic_ops')
load_staging(data=df_equipment, table_name='equipment', schema='public', idx_name='equipment_id', source='database clinic_ops')
load_staging(data=df_maintenance_record, table_name='maintenance_record', schema='public', idx_name='record_id', source='database clinic_ops')

##### 3. Extract Data From Spreadsheet

In [2]:
# Extract Data
df_maintenance_request = extract_spreadsheet('maintenance_request')

# set index as new column and identifier
df_maintenance_request = df_maintenance_request.reset_index()
df_maintenance_request.rename(columns={'index':'id'}, inplace=True)

In [3]:
# Load Extracted Data to Staging
load_staging(data=df_maintenance_request, table_name='maintenance_request', schema='public', idx_name='id', source='spreadsheet')

### Pipeline Staging to Warehouse

Solution: 
1. Pattern: ETL
    - `Data Extraction` involves retrieving data from various sources
    - `Data Transformation` involve transformed data to fit the desired format or structure.
    - `Data Validation`  the transformed data is checked for accuracy, completeness, and consistency.
    - `Data Loading` involves transferring this raw data into data warehouse

2. Data Extraction:
    - Sources: Extract data from staging area.
    - Techniques: Use both full and incremental extraction methods to retrieve data efficiently.
        - Full Ingestion: First Ingestion
        - Incremental Ingestion: For Next Iteration
3. Data Load:
    - Data Warehouse: Load clean, transformed and valid data to the final destination.
    - Failure Handling: Log failed data loads to MinIO object storage for reprocessing

4. Data Transformation:
    - Cleaning: Handle missing values, incorrect data formats, and other data quality issues.
    - Techniques: Joining, Filtering, Aggregation, Deduplication, Conversion, Structuring, etc

5. Data Validation
    - Data validation is the process of ensuring that data is accurate, complete, and consistent.
    - Techniques: 
        - check missing values, 
        - verifying data types, 
        - performing range checks, 
        - or applying any other rules or constraints to ensure the quality and integrity of the data.


#### Target Schema
In the data warehouse, the target schema will be designed using a `dimensional model`
Dimension Table:
- dim_date
- dim_time
- dim_doctor
- dim_patient
- dim_medication
- dim_employee
- dim_equipment

Fact Table
- fact_appointment
- fact_salary
- fact_leave_requests
- fact_maintenance_request
- fact_maintenance_report


<img src= https://sekolahdata-assets.s3.ap-southeast-1.amazonaws.com/notebook-images/mde-data-ingestion-spark/warehouse_clinic_-_public.png width="800"> <br>


#### Source to Target Mapping
Source to Target Mapping Documentation: [Link](https://github.com/Kurikulum-Sekolah-Pacmann/pipeline-clinic/blob/main/target_mapping_warehouse.md)

#### Validation Rule

Validation Rule:
1. `Doctor Data Validation`
Checks:
- Phone Number Validation:
    - Criteria: Phone numbers must conform to the pattern allowing optional '+' as first value, followed by 9 to 15 digits.
    - Purpose: To identify doctor with invalid phone numbers.
- Specialty Information Check:
    - Criteria: The speciality_id column should not contain null values.
    - Purpose: To ensure that each doctor is associated with a valid specialty.

2. `Patient Data Validation`
Checks:
- Phone Number Validation:
    - Criteria: Phone numbers must conform to the pattern allowing optional '+' as first value, followed by 9 to 15 digits.
    - Purpose: To identify patient with invalid phone numbers.

3. `Employee Data Validation`
Checks:
- Phone Number Validation:
    - Criteria: Phone numbers must conform to the pattern allowing optional '+' as first value, followed by 9 to 15 digits.
    - Purpose: To identify employee with invalid phone numbers.

4. `Leave Requests Data Validation`
Checks:
- Leave Type Validation:
    - Criteria: Leave types must be one of the following: "Parental", "Sick", "Religous", or "Annual".
    - Purpose: To identify invalid leave type entries.

5. `Equipment Data Validation`
Checks:
- Equipment Age Validation:
    - Criteria: Equipment purchased more than 8 years ago is considered old.
    - Purpose: To identify old equipment that may need replacement or review.

    
All Data will Load to Warehuse, but every validation report will save to MiniO as JSON file

#### Pipeline Warehouse

In [1]:
from src.warehouse.extract.extract_database import extract_staging
from src.warehouse.load.load import load_warehouse

from src.warehouse.transformation.speciality import transform_speciality
from src.warehouse.transformation.doctor import transform_doctor
from src.warehouse.transformation.patient import transform_patient
from src.warehouse.transformation.medication import transform_medication
from src.warehouse.transformation.fact_appointment import transform_fact_appointment
from src.warehouse.transformation.employee import transform_employee
from src.warehouse.transformation.fact_salary import transform_fact_salary
from src.warehouse.transformation.fact_leave_request import transform_fact_leave_request
from src.warehouse.transformation.equipment import transform_equipment
from src.warehouse.transformation.fact_maintenance_request import transform_fact_maintenance_request
from src.warehouse.transformation.fact_maintenance_record import transform_fact_maintenance_record
from src.warehouse.validation.validation import report_validation_doctor, report_validation_patient, report_validation_employee, report_validation_leave_requests, report_validation_equipment

In [2]:
# Extract Data from Staging
df_doctor = extract_staging(table_name='doctor')
df_patient = extract_staging(table_name='patient')
df_speciality = extract_staging(table_name='speciality')
df_medication = extract_staging(table_name='medication')
df_appointment = extract_staging(table_name='appointment')
df_prescription = extract_staging(table_name='prescription')
df_employee = extract_staging(table_name='employee')
df_role = extract_staging(table_name='role')
df_salary = extract_staging(table_name='salary')
df_leave_requests = extract_staging(table_name='leave_requests')
df_equipment = extract_staging(table_name='equipment')
df_maintenance_record = extract_staging(table_name='maintenance_record')
df_maintenance_request = extract_staging(table_name='maintenance_request')
df_speciality_ops = extract_staging(table_name='speciality_ops')

In [3]:
# Transform data speciality and load to warehouse
speciality_transformed = transform_speciality(df_speciality, table_name='dim_speciality')
load_warehouse(data=speciality_transformed, table_name='dim_speciality', 
               schema='public', idx_name='speciality_nk', table_process='speciality', source='staging')

In [4]:
# Transform data doctor, create report validation and load to warehouse
doctor_transformed = transform_doctor(df_doctor, table_name='dim_doctor')
report_validation_doctor(doctor_transformed)
load_warehouse(data=doctor_transformed, table_name='dim_doctor', 
               schema='public', idx_name='doctor_nk', table_process='doctor', source='staging')

Save validation report as doctor_2024-08-13.json


In [5]:
# Transform data patient, create report validation and load to warehouse
patient_transformed = transform_patient(df_patient, table_name='dim_patient')
report_validation_patient(patient_transformed)
load_warehouse(data=patient_transformed, table_name='dim_patient', 
               schema='public', idx_name='patient_nk', table_process='patient', source='staging')

Save validation report as patient_2024-08-13.json


In [6]:
# Transform data medication and load to warehouse
medication_transformed = transform_medication(df_medication, table_name='dim_medication')
load_warehouse(data=medication_transformed, table_name='dim_medication', 
               schema='public', idx_name='medication_nk', table_process='medication', source='staging')

In [7]:
# Create table fact_appointment from transforming data appointment and prescription and load to warehouse
fact_appointment_transformed = transform_fact_appointment(df_appointment, df_prescription, table_name='fact_appointment')
load_warehouse(data=fact_appointment_transformed, table_name='fact_appointment', 
               schema='public', idx_name=['appointment_nk','prescription_nk'], table_process='appointment', source='staging')

In [8]:
# Transform data employee, create report validation and load to warehouse
employee_transformed = transform_employee(df_employee, df_speciality_ops, df_role, table_name='dim_employee')
report_validation_employee(employee_transformed)
load_warehouse(data=employee_transformed, table_name='dim_employee', 
               schema='public', idx_name='employee_nk', table_process='employee', source='staging')

Save validation report as employee_2024-08-13.json


In [9]:
# Create table fact_salary from transforming data salary and load to warehouse
salary_transformed = transform_fact_salary(df_salary, table_name='fact_salary')
load_warehouse(data=salary_transformed, table_name='fact_salary', 
               schema='public', idx_name='salary_nk', table_process='salary', source='staging')

In [10]:
# Create table fact_leave_request from transforming data leave_requests, create validation report and load to warehouse
leave_requests_transformed = transform_fact_leave_request(df_leave_requests, table_name='fact_leave_requests')
report_validation_leave_requests(leave_requests_transformed)
load_warehouse(data=leave_requests_transformed, table_name='fact_leave_requests', 
               schema='public', idx_name='leave_request_nk', table_process='leave_requests', source='staging')

Save validation report as fact_leave_requests_2024-08-13.json


In [3]:
# Transform data equipment, create validation report and load to warehouse
equipment_transformed = transform_equipment(df_equipment, table_name='dim_equipment')
report_validation_equipment(equipment_transformed)
load_warehouse(data=equipment_transformed, table_name='dim_equipment', 
               schema='public', idx_name='equipment_nk', table_process='equipment', source='staging')

Save validation report as equipment_2024-08-13.json


In [4]:
# Create table fact_maintenance_request from transforming data maintenance_request and load to warehouse
maintenance_request_transformed = transform_fact_maintenance_request(df_maintenance_request, table_name='fact_maintenance_request')
load_warehouse(data=maintenance_request_transformed, table_name='fact_maintenance_request', 
               schema='public', idx_name='maintenance_request_nk', table_process='maintenance_request', source='staging')

In [5]:
# Create table fact_maintenance_record from transforming data maintenance_record and load to warehouse
maintenance_report_transformed = transform_fact_maintenance_record(df_maintenance_record, table_name='fact_maintenance_record')
load_warehouse(data=maintenance_report_transformed, table_name='fact_maintenance_record', 
               schema='public', idx_name='maintenance_record_nk', table_process='maintenance_record', source='staging')