# Services (hour)

In [207]:
import pandas as pd
import dask.dataframe as dd

## Database connection

In [208]:
import yaml
from sqlalchemy import create_engine


with open("../config.yml", "r") as file:
	config = yaml.safe_load(file)
	config_OLTP = config["OLTP"]
	config_OLAP = config["OLAP"]


url_OLTP = (f"{config_OLTP['drivername']}://{config_OLTP['user']}:{config_OLTP['password']}"
			f"@{config_OLTP['host']}:{config_OLTP['port']}/{config_OLTP['database_name']}")

url_OLAP = (f"{config_OLAP['drivername']}://{config_OLAP['user']}:{config_OLAP['password']}"
			f"@{config_OLAP['host']}:{config_OLAP['port']}/{config_OLAP['database_name']}")

OLTP_connection = create_engine(url_OLTP)
OLAP_connection = create_engine(url_OLAP)

## Extraction

In [209]:
# Load dimensions
time_dimension = pd.read_sql_table("TIME_DIMENSION", OLAP_connection)
courier_dimension = pd.read_sql_table("COURIER_DIMENSION", OLAP_connection)
customer_dimension = pd.read_sql_table("CUSTOMER_DIMENSION", OLAP_connection)
office_dimension = pd.read_sql_table("OFFICE_DIMENSION", OLAP_connection)
service_status_dimension = pd.read_sql_table("SERVICE_STATUS_DIMENSION", OLAP_connection)

In [228]:
# Load fact table data from OLTP
services = pd.read_sql_table("mensajeria_servicio", OLTP_connection)
service_statuses = pd.read_sql_table("mensajeria_estadosservicio", OLTP_connection)

## Transformation

In [211]:
# Process service statuses to get latest status per service
# Convert 'hora' from datetime.time to string to avoid TypeError
service_statuses["hora_str"] = service_statuses["hora"].astype(str)

In [212]:
# Combine 'fecha' and 'hora_str' into a single datetime column
service_statuses["datetime"] = pd.to_datetime(
		service_statuses["fecha"]
) + pd.to_timedelta(service_statuses["hora_str"])

In [213]:
# Sort by 'datetime' and drop duplicates to get the latest status per 'servicio_id'
latest_status = service_statuses.sort_values("datetime").drop_duplicates(
		"servicio_id", keep="last"
)

In [214]:
# Rename columns for merging
latest_status = latest_status.rename(
		columns={
				"id": "status_record_id",
				"estado_id": "status_id",
				"servicio_id": "service_id",
		}
)

In [233]:
# Rename columns in services for consistency
services = services.rename(
		columns={
				"id": "service_id",
				"cliente_id": "customer_id",
				"mensajero_id": "courier_id",
				"origen_id": "origin_office_id",
				"destino_id": "destination_office_id",
		}
)

In [236]:
null_couriers = services["courier_id"].isna().sum()
print(f"Número de registros con 'courier_id' vacío: {null_couriers}")

Número de registros con 'courier_id' vacío: 727


In [216]:
# Merge the latest status with services
services = services.merge(
		latest_status[["service_id", "status_id"]], on="service_id", how="left"
)

In [217]:
# Handle services without a status (optional)
# services['status_id'] = services['status_id'].fillna(default_status_id)

In [218]:
# Combine 'fecha_solicitud' and 'hora_solicitud' into 'request_time'
services["hora_solicitud_str"] = services["hora_solicitud"].astype(str)
services["request_time"] = pd.to_datetime(
		services["fecha_solicitud"]
) + pd.to_timedelta(services["hora_solicitud_str"])
services["request_time"] = services["request_time"].dt.floor("H")

In [219]:
# Create a mapping from datetime to time_id
# Ensure that 'time_dimension["date"]' is in datetime format
time_dimension["date"] = pd.to_datetime(time_dimension["date"])
time_mapping = dict(zip(time_dimension["date"], time_dimension["time_id"]))

In [220]:
# Map 'request_time' to 'time_id'
services["time_id"] = services["request_time"].map(time_mapping)

In [221]:
# Check for unmapped 'time_id's
unmapped_times = services["time_id"].isna().sum()
if unmapped_times > 0:
		print(f"Warning: {unmapped_times} records have unmapped 'request_time'.")

In [222]:
# Select necessary columns for the fact table
service_fact = services[
		[
				"service_id",
				"customer_id",
				"courier_id",
				"status_id",
				"origin_office_id",
				"destination_office_id",
				"time_id",
		]
]

In [223]:
# Add service_fact_table_id
service_fact = service_fact.reset_index().rename(
		columns={"index": "service_fact_hour_table_id"}
)

In [226]:
service_fact.head(10)
# service_fact.shape

Unnamed: 0,service_fact_hour_table_id,service_id,customer_id,courier_id,status_id,origin_office_id,destination_office_id,time_id
0,0,34,5,,1,241,214,55260
1,1,35,5,7.0,5,236,214,55380
2,2,36,5,,1,236,214,58740
3,3,41,5,,1,241,214,72540
4,4,42,5,,1,241,214,72540
5,5,43,5,,1,15,217,72600
6,6,46,4,12.0,5,262,238,75960
7,7,45,5,12.0,5,242,214,75900
8,8,47,5,12.0,5,239,214,78420
9,9,44,5,,1,241,231,74640


## Load

In [225]:
# Save the fact table to OLAP
service_fact.to_sql(
    "SERVICE_FACT_HOUR_TABLE", OLAP_connection, if_exists="replace", index=False
)

430