# Description

Databricks is well known as a Data Engineering platform, where engineers can ingest multiple data sources and transform them into new data products. Most often following the Medallion Architecture.

Databricks can ingest data from all kind of different sources, from cloud storage, to APIs, federated databases, streaming sources like kafka etc. Databricks is able to orchestrate the ingestion with different frequencies based on the trigger type, from continuous to batch, and able to do full or incremental loads.

# Boilerplate

## Dependencies

## Parameters

In [0]:
dbutils.widgets.text("catalog_name", "", "00 - Catalog Name")
dbutils.widgets.text("schema_name", "", "01 - Schema Name")

In [0]:
params = dbutils.widgets.getAll()

for key, value in params.items():
  assert value != "", f"Parameter {key} is empty"

locals().update(params)

# Main

In [0]:
df_plans = spark.read.table(f"{catalog_name}.{schema_name}.plans")
df_customers = spark.read.table(f"{catalog_name}.{schema_name}.customers")
df_events = spark.read.table(f"{catalog_name}.{schema_name}.events")

## Customer Pricing

In [0]:
df_customer_pricing = df_customers.join(df_plans, df_plans.plan_id == df_customers.plan_id)

df_customer_pricing.display()

## Device Event Summary

In [0]:
import pyspark.sql.functions as F
import pyspark.sql.types as T


# lets compute the summary minutes messages and bytes transferred
df_enriched_events = (
  df_events
  .withColumn("event_month", F.date_trunc("month", F.col("event_ts")).cast(T.DateType()))
  .withColumn("message_count",
              F.expr("""case
                    when event_type='sms' then 1
                     else 0 end"""))
  .withColumn("ld_minutes",
              F.expr("""case
                     when event_type='ld call'
                     then cast(ceil(minutes) as decimal(18,3))
                     else 0.0 end"""))
  .withColumn("local_minutes",
              F.expr("""case when event_type='local call'
                     then cast(ceil(minutes) as decimal(18,3))
                     else 0.0 end"""))
  .withColumn("intl_minutes",
              F.expr("""case when event_type='intl call'
                     then cast(ceil(minutes) as decimal(18,3))
                     else 0.0 end"""))
  )

df_summary = spark.sql("""
  select
    device_id
    , event_month
    , round(sum(bytes_transferred) / 1000000.0, 3) as total_mb
    , sum(message_count) as total_messages
    , sum(ld_minutes) as total_ld_minutes
    , sum(local_minutes) as total_local_minutes
    , sum(intl_minutes) as total_intl_minutes
    , count(device_id) as event_count
    from {df_enriched_events}
    group by all""",
    df_enriched_events=df_enriched_events
)

df_summary.where("event_count > 0").display()

## Customer Summary

In [0]:
df_customer_summary = (
  df_customer_pricing
  .join(
    df_summary,
    df_customer_pricing.device_id == df_summary.device_id
    )
  )

df_customer_summary.display()

In [0]:
df_invoices = spark.sql(
  """
  select
    *
    , internet_cost + sms_cost + ld_cost + local_cost + intl_cost as total_invoice
    from
    (
      select
        customer_id
        , event_month billing_month
        , customer_name
        , phone_number
        , email
        , plan_name
        , cast(round(total_mb * cost_per_mb, 2) as decimal(18,3)) as internet_cost
        , cast(round(total_ld_minutes * ld_cost_per_minute, 2) as decimal(18,2)) as ld_cost
        , cast(round(total_local_minutes * cost_per_minute, 2) as decimal(18,2)) as local_cost
        , cast(round(total_intl_minutes * intl_cost_per_minute, 2) as decimal(18,2)) as intl_cost
        , cast(round(total_messages * cost_per_message, 2) as decimal(18,2)) as sms_cost
        from {df_customer_summary}
    )
    """,
    df_customer_summary = df_customer_summary
  )

df_invoices.display()

In [0]:
(
  df_invoices
  .write.mode("overwrite")
  .saveAsTable(f"{catalog_name}.{schema_name}.invoices")
)

### Add Primary Keys and Foreign Keys

In [0]:
%sql
ALTER TABLE IDENTIFIER(:catalog_name || '.' || :schema_name || '.' || 'invoices')
  ALTER COLUMN customer_id SET NOT NULL;

ALTER TABLE IDENTIFIER(:catalog_name || '.' || :schema_name || '.' || 'invoices')
  ALTER COLUMN billing_month SET NOT NULL;

ALTER TABLE IDENTIFIER(:catalog_name || '.' || :schema_name || '.' || 'invoices')
  ADD CONSTRAINT invoices_pk PRIMARY KEY (customer_id, billing_month);

In [0]:
spark.sql(f"""
ALTER TABLE {catalog_name}.{schema_name}.invoices
  ADD CONSTRAINT invoices_customers_fk
    FOREIGN KEY(customer_id) REFERENCES {catalog_name}.{schema_name}.customers;""")