In [1]:
from trino.dbapi import connect


# Step 1: Connect to Trino
conn = connect(
    host='localhost',
    port=8080,
    user='admin'
)
cursor = conn.cursor()

create processed iceberg tables 

In [4]:
cursor.execute("""CREATE TABLE iceberg.processed.roles (
  id                    VARCHAR,
  id_user               VARCHAR,
  role                  VARCHAR,
  parent_id             VARCHAR,         
  first_seen_date       TIMESTAMP(6),    
  ingestion_date        TIMESTAMP(6),    
  transformation_date   TIMESTAMP(6),    
  source_system         VARCHAR          
)
WITH (
  format = 'PARQUET', 
  location = 's3://asel/processed/roles'   
  
)"""
)
print("table roles created !")

table roles created !


In [None]:
print("=== Verifying  table ===")
cursor.execute("SELECT * FROM iceberg.processed.roles LIMIT 30")
rows = cursor.fetchall()

for row in rows:
    print(row)


In [None]:
cursor.execute("""CREATE TABLE iceberg.processed.activation (
  id                  VARCHAR COMMENT 'Unique activation record identifier',
  sim_id              VARCHAR COMMENT 'SIM card unique identifier',
  agent_id            VARCHAR COMMENT 'Identifier of the agent who processed the activation',
  customer_id         VARCHAR COMMENT 'Identifier of the customer who activated the SIM',
  mvno_id             VARCHAR COMMENT 'Mobile Virtual Network Operator identifier',
  createdAt           TIMESTAMP(6) COMMENT 'Activation timestamp (millisecond precision)',
  shopName            VARCHAR COMMENT 'Name of the retail location where activation occurred',
  first_seen_date     TIMESTAMP(6) COMMENT 'Date when the activation was first seen in the data source',
  ingestion_date      TIMESTAMP(6) COMMENT 'Date when the activation was ingested into the data lake',
  transformation_date TIMESTAMP(6) COMMENT 'Date when the activation was transformed in the lakehouse',
  source_system       VARCHAR COMMENT 'Source system from which this activation originated'
)
COMMENT 'Activation record table for SIM activations, with linkage to customers, agents, and MVNOs'
WITH (
  format = 'PARQUET',
  partitioning = ARRAY[
    'year(createdAt)',
    'month(createdAt)',
    'day(createdAt)',
    'identity(shopName)'
  ],
  location = 's3://asel/processed/activation'
)
"""
)
print("table activation created !")



table activation created !


In [8]:
cursor.execute("""CREATE TABLE iceberg.processed.bundle_price_history (
    bundleid VARCHAR COMMENT 'Unique identifier of the bundle',
    price DOUBLE COMMENT 'Price of the bundle during the specified time range',
    start_date TIMESTAMP(6) COMMENT 'Start of the time range during which this price was valid (used for historical lookups)',
    end_date TIMESTAMP(6) COMMENT 'End of the time range during which this price was valid (used for historical lookups)',
    first_seen_date       TIMESTAMP(6) COMMENT 'the date where the bundle price is present in the data source ',    
    ingestion_date        TIMESTAMP(6) COMMENT 'the date where the bundle price is ingested to the data lake ',    
    transformation_date   TIMESTAMP(6) COMMENT 'the date where the bundle price is transformed to the lakehouse' ,    
    source_system         VARCHAR  COMMENT 'the source name where this bundle price cames from' 
               
)
COMMENT 'Historical pricing table for bundles, capturing price changes over time using validity periods'
WITH (
    format = 'PARQUET',
    location = 's3://asel/processed/bundle_price_history'
)
""")
print("table bundle_price_history created !")

table bundle_price_history created !


In [9]:
cursor.execute("""CREATE TABLE iceberg.processed.bundles (
    id VARCHAR COMMENT 'Unique internal identifier for the record',
    name VARCHAR COMMENT 'Human-readable name of the bundle',
    bundleid VARCHAR COMMENT 'Business identifier for the bundle (shared across different versions or prices)',
    data_amount_gb DOUBLE COMMENT 'Amount of data included in the bundle, in gigabytes',
    voice_amount_minutes DOUBLE COMMENT 'Amount of voice call minutes included in the bundle',
    sms_amount INTEGER COMMENT 'Number of SMS messages included in the bundle',
    validity_days INTEGER COMMENT 'Number of days the bundle remains active once purchased',
    createdat TIMESTAMP(6) COMMENT 'Timestamp when the bundle definition was created (system ingestion or publishing date)',
    first_seen_date       TIMESTAMP(6) COMMENT 'the date where the bundle  is present in the data source ',    
    ingestion_date        TIMESTAMP(6) COMMENT 'the date where the bundle  is ingested to the data lake ',    
    transformation_date   TIMESTAMP(6) COMMENT 'the date where the bundle  is transformed to the lakehouse' ,    
    source_system         VARCHAR  COMMENT 'the source name where this bundle  cames from' 
)
COMMENT 'Table storing metadata and content definitions of asel bundles (data, voice, SMS, validity) to get the price of the bundle Join the bundle_price_history table with bundleid '
WITH (
    format = 'PARQUET',
    location = 's3://asel/processed/bundles'
)
""")
print("table bundles created !")

table bundles created !


In [15]:
cursor.execute("""CREATE TABLE iceberg.processed.customers (
    id VARCHAR NOT NULL COMMENT 'Required unique identifier for the customer record',
    
    DOB TIMESTAMP(6) COMMENT 'Date of birth of the customer (nullable)',
    POB VARCHAR COMMENT 'Place of birth of the customer (nullable)',
    address VARCHAR COMMENT 'Postal address of the customer (nullable)',
    
    arta_id DOUBLE COMMENT 'Internal ARTA system identifier (nullable)',
    
    cin VARCHAR COMMENT 'National identity card number (nullable)',
    cin_recto_path VARCHAR COMMENT 'File path to front side scan of the CIN (nullable)',
    cin_verso_path VARCHAR COMMENT 'File path to back side scan of the CIN (nullable)',
    
    city VARCHAR COMMENT 'City of residence (nullable)',
    
    creation_date TIMESTAMP(6) COMMENT 'Timestamp when the customer record was created (nullable)',
    
    email VARCHAR COMMENT 'Customer email address (nullable)',
    first_name VARCHAR COMMENT 'Customer first name (nullable)',
    gender VARCHAR COMMENT 'Gender of the customer (nullable) male or female',
    
    issue_date TIMESTAMP(6) COMMENT 'Timestamp of identity document issuance (nullable)',
    
    job VARCHAR COMMENT 'Occupation or job title of the customer (nullable)',
    last_name VARCHAR COMMENT 'Customer last name (nullable)',
    
    mvno_id DOUBLE COMMENT 'ID of the MVNO (Mobile Virtual Network Operator) associated with the customer (nullable)',
    
    passport VARCHAR COMMENT 'Passport number (nullable)',
    passport_path VARCHAR COMMENT 'File path to scanned passport copy (nullable)',
    
    postal_code VARCHAR COMMENT 'Postal or ZIP code (nullable)',
    region VARCHAR COMMENT 'Administrative region of residence (nullable)',
    first_seen_date       TIMESTAMP(6) COMMENT 'the date where the customer  is present in the data source ',    
    ingestion_date        TIMESTAMP(6) COMMENT 'the date where the customer  is ingested to the data lake ',    
    transformation_date   TIMESTAMP(6) COMMENT 'the date where the customer  is transformed to the lakehouse' ,    
    source_system         VARCHAR  COMMENT 'the source name where this customer  cames from' 
)
COMMENT 'Customer master data table containing personal identification and contact details'
WITH (
    format = 'PARQUET',
    location = 's3://asel/processed/customers',
    partitioning = ARRAY['bucket(id, 16)']
)
""")
print("table customers created !")

table customers created !
