In [None]:
import pytz
from datetime import date, datetime, timedelta

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType, TimestampType, IntegerType, FloatType

In [None]:
schema = StructType([
    StructField("table_name", StringType(), True),
    StructField("date_time", DateType(), True),
    StructField("start_time", TimestampType(), True),
    StructField("end_time", TimestampType(), True),
    StructField("table_state", StringType(), True),
    StructField("rows_inserted", IntegerType(), True),
    StructField("metric_value", FloatType(), True),
    StructField("metric_name", StringType(), True),
    StructField("metric_config", StringType(), True)
])

In [None]:
table_name = dbutils.widgets.get("table_name")
table_state = dbutils.widgets.get("table_state")
rows_inserted = dbutils.widgets.get("rows_inserted")
last_update_time = dbutils.widgets.get("last_update_time")
last_update_time = datetime.fromisoformat(last_update_time)

In [None]:
def calculate_ingestion_delay(table_name, end_time):
    try:
        timestamp_ingestion = spark.sql(f"select max(timestamp_ingestion) from prod.rwd.{table_name}").collect()[0][0]
    except:
        timestamp_ingestion = None

    if timestamp_ingestion is not None:
        timestamp_ingestion = timestamp_ingestion.replace(tzinfo=None)
    if end_time is not None:
        end_time = end_time.replace(tzinfo=None)

    if timestamp_ingestion is None or end_time is None:
        return None, {"timestamp_ingestion": timestamp_ingestion, "end_time": end_time}
    
    return float(abs(end_time - timestamp_ingestion).seconds / 60), {"timestamp_ingestion": timestamp_ingestion, "end_time": end_time}

In [None]:
start_time = datetime.now(pytz.timezone("America/Sao_Paulo")).replace(tzinfo=None)
metric_value, metric_config = calculate_ingestion_delay(table_name, last_update_time)

obj_to_save = {
    "table_name": table_name,
    "date_time": date.today(),
    "start_time": start_time,
    "end_time": datetime.now(pytz.timezone("America/Sao_Paulo")).replace(tzinfo=None),
    "table_state": table_state,
    "rows_inserted": int(float(rows_inserted)),
    "metric_value": metric_value,
    "metric_name": "ingestion_delay",
    "metric_config": metric_config
}

objs = Row(**obj_to_save)
df = spark.createDataFrame([objs], schema=schema)
path = f"s3://recargapay-databricks-prod/timeliness_metrics_teste/{date.today().isoformat()}"
df.write.parquet(path, mode="append")