In [1]:

from pyspark.sql import SparkSession

from awsglue.context import GlueContext
spark = SparkSession.builder \
    .appName("Glue-Iceberg-Notebook") \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.profile.ProfileCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.profile", "default") \
    .config("spark.driver.extraJavaOptions", 
            "-Dcom.amazonaws.sdk.disableEc2Metadata=true "
            "-Daws.region=ap-southeast-1 ") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://scm-analytics-2025/iceberg/bronze/warehouse/") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.jars.packages","mysql:mysql-connector-java:8.0.33,") .getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]


:: loading settings :: url = jar:file:/home/glue_user/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/glue_user/.ivy2/cache
The jars for the packages stored in: /home/glue_user/.ivy2/jars
mysql#mysql-connector-java added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b6c2359b-2bdd-40c8-ad27-6d821500f1b3;1.0
	confs: [default]
	found mysql#mysql-connector-java;8.0.33 in central
	found com.mysql#mysql-connector-j;8.0.33 in central
	found com.google.protobuf#protobuf-java;3.21.9 in central
:: resolution report :: resolve 397ms :: artifacts dl 6ms
	:: modules in use:
	com.google.protobuf#protobuf-java;3.21.9 from central in [default]
	com.mysql#mysql-connector-j;8.0.33 from central in [default]
	mysql#mysql-connector-java;8.0.33 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	-------------------------------------------------------------

In [38]:
query = f"""
SELECT * FROM glue_catalog.scm_bronze.customer 
"""
spark.sql(query)

DataFrame[CustomerID: int, FirstName: string, LastName: string, CompanyName: string, EmailAddress: string, Phone: string, AddressLine1: string, City: string, CountryRegion: string, CreatedAt: timestamp, UpdatedAt: timestamp, PostalCode: string]

In [3]:
query = f"""
CREATE DATABASE IF NOT EXISTS scm_bronze
LOCATION 's3://scm-analytics-2025/scm_bronze'
"""
spark.sql(query)

DataFrame[]

In [4]:
query = f"""
CREATE DATABASE IF NOT EXISTS scm_silver
LOCATION 's3://scm-analytics-2025/scm_silver'
"""
spark.sql(query)

DataFrame[]

In [5]:
query = f"""
CREATE DATABASE IF NOT EXISTS scm_gold
LOCATION 's3://scm-analytics-2025/scm_gold'
"""
spark.sql(query)

DataFrame[]

In [6]:
query = f"""
SHOW TABLES IN glue_catalog.scm_gold;
"""
spark.sql(query).show()

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [7]:
silver_tables_to_drop = [
    "glue_catalog.scm_silver.dim_customer",
    "glue_catalog.scm_silver.dim_product",
    "glue_catalog.scm_silver.dim_supplier",
    "glue_catalog.scm_silver.fact_sales_order",
    "glue_catalog.scm_silver.dim_warehouse"
]

gold_tables_to_drop=["glue_catalog.scm_gold.fact_sales_summary","glue_catalog.scm_gold.fact_sales_summary_by_country","glue_catalog.scm_gold.fact_sales_summary_by_supplier_daily","glue_catalog.scm_gold.fact_sales_summary_by_supplier_monthly"]


#for table in gold_tables_to_drop:
    #spark.sql(f"DROP TABLE {table} PURGE")

In [8]:
# =========================
# Customer Bronze Table
# =========================
query = f"""CREATE TABLE IF NOT EXISTS glue_catalog.scm_bronze.customer (
    CustomerID INT,
    FirstName STRING,
    LastName STRING,
    CompanyName STRING,
    EmailAddress STRING,
    Phone STRING,
    AddressLine1 STRING,
    City STRING,
    CountryRegion STRING,
    CreatedAt TIMESTAMP,
    UpdatedAt TIMESTAMP,
    PostalCode STRING
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_bronze/customer'
TBLPROPERTIES (
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.compression-codec'='gzip'
)"""
spark.sql(query)

DataFrame[]

In [9]:



# =========================
# Product Bronze Table
# =========================
query = f"""CREATE TABLE IF NOT EXISTS glue_catalog.scm_bronze.product (
    ProductID INT,
    Name STRING,
    ProductNumber STRING,
    Color STRING,
    StandardCost DECIMAL(18,2),
    ListPrice DECIMAL(18,2),
    Size STRING,
    Weight DECIMAL(10,2),
    ProductCategoryID INT,
    ProductModelID INT,
    SellStartDate DATE,
    ThumbnailPhotoFileName STRING,
    CreatedAt TIMESTAMP,
    UpdatedAt TIMESTAMP,
    rowguid STRING
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_bronze/product'
TBLPROPERTIES (
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.compression-codec'='gzip'
)"""
spark.sql(query)


# =========================
# Supplier Bronze Table
# =========================
query = f"""CREATE TABLE IF NOT EXISTS glue_catalog.scm_bronze.supplier (
    SupplierID STRING,
    SupplierName STRING,
    ContactName STRING,
    Email STRING,
    Phone STRING,
    Address STRING,
    City STRING,
    State STRING,
    Country STRING,
    PostalCode STRING,
    BusinessCategory STRING,
    CreatedAt TIMESTAMP,
    UpdatedAt TIMESTAMP
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_bronze/supplier'
TBLPROPERTIES (
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.compression-codec'='gzip'
)"""
spark.sql(query)


# =========================
# Warehouse Bronze Table
# =========================
query = f"""CREATE TABLE IF NOT EXISTS glue_catalog.scm_bronze.warehouse (
    WarehouseID STRING,
    WarehouseName STRING,
    Location STRING,
    City STRING,
    State STRING,
    Country STRING,
    PostalCode STRING,
    Capacity INT,
    CreatedAt TIMESTAMP,
    UpdatedAt TIMESTAMP
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_bronze/warehouse'
TBLPROPERTIES (
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.compression-codec'='gzip'
)"""
spark.sql(query)


DataFrame[]

In [10]:
query = f"""CREATE TABLE IF NOT EXISTS  glue_catalog.scm_silver.dim_customer (
    customer_id INT,
    customer_name STRING,
    email STRING,
    phone STRING,
    region STRING,
    country STRING,
    customer_type STRING,  -- 'Retail', 'Wholesale', etc.
    loyalty_status STRING, -- 'Gold', 'Silver', 'Bronze'
    start_date TIMESTAMP,  -- SCD2 Start Date
    end_date TIMESTAMP,    -- SCD2 End Date (NULL for active records)
    is_current BOOLEAN,    -- TRUE if the record is active
    created_at TIMESTAMP,
    updated_at TIMESTAMP
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_silver/dim_customer'
TBLPROPERTIES (
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.compression-codec'='gzip'
)"""
spark.sql(query)


DataFrame[]

In [11]:
spark.sql("SELECT count(*) FROM glue_catalog.scm_silver.dim_customer ").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [12]:
query = f"""CREATE TABLE IF NOT EXISTS  glue_catalog.scm_silver.dim_product (
    product_id INT,
    product_name STRING,
    category STRING,
    sub_category STRING,
    brand STRING,
    supplier_id STRING,
    unit_price DOUBLE,
    cost_price DOUBLE,
    sell_start_date TIMESTAMP,
    thumbnail_photo STRING,
    created_at TIMESTAMP,
    updated_at TIMESTAMP
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_silver/dim_product'
TBLPROPERTIES (
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.compression-codec'='gzip'
)"""
spark.sql(query)


DataFrame[]

In [13]:
query = f"""CREATE TABLE IF NOT EXISTS  glue_catalog.scm_silver.dim_warehouse (
    warehouse_id STRING,
    warehouse_name STRING,
    location STRING,
    city STRING,
    state STRING,
    country STRING,
    postal_code STRING,
    capacity INT, -- Maximum storage capacity in units
    created_at TIMESTAMP,
    updated_at TIMESTAMP
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_silver/dim_warehouse'
TBLPROPERTIES (
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.compression-codec'='gzip'
)"""
spark.sql(query)


DataFrame[]

In [14]:
query = f"""CREATE TABLE IF NOT EXISTS glue_catalog.scm_silver.dim_supplier (
    supplier_id STRING,
    supplier_name STRING,
    contact_name STRING,
    email STRING,
    phone STRING,
    address STRING,
    city STRING,
    state STRING,
    country STRING,
    postal_code STRING,
    business_category STRING, -- 'Electronics', 'Furniture', etc.
    start_date TIMESTAMP,  -- SCD2 Start Date
    end_date TIMESTAMP,    -- SCD2 End Date (NULL for active records)
    is_current BOOLEAN,    -- TRUE for active records
    created_at TIMESTAMP,
    updated_at TIMESTAMP
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_silver/dim_supplier'
TBLPROPERTIES (
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.compression-codec'='gzip'
)"""
spark.sql(query)


DataFrame[]

In [15]:

query = f"""
    CREATE TABLE IF NOT EXISTS glue_catalog.scm_silver.fact_sales_order (
    salesordernumber STRING,
    createddate DATE,
    processeddate DATE,
    quantity INT,
    unitprice DOUBLE,
    taxamount DOUBLE,
    customer_id INT,
    product_id INT,
    supplier_id STRING,
    warehouse_id STRING
)
USING iceberg
PARTITIONED BY (createddate)
LOCATION 's3://scm-analytics-2025/scm_silver/fact_sales_order'
TBLPROPERTIES (
    'format-version'='2',          -- Iceberg format version
    'write.format.default'='parquet',  -- Storage format
    'write.merge.mode'='merge-on-read', -- Enable upserts
    'write.distribution-mode'='hash', -- Distribute data evenly
    'write.metadata.compression-codec'='gzip' -- Metadata compression for efficiency
);
"""
spark.sql(query)

DataFrame[]

In [16]:

query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.scm_silver.fact_inventory (
    inventory_id STRING,
    product_id STRING,
    warehouse_id STRING,
    stock_level BIGINT,
    last_updated TIMESTAMP,
    daily_usage_rate DOUBLE,
    inventory_status STRING,
    last_movement_date STRING,
    product_category STRING,
    reorder_point BIGINT,
    sku_name STRING,
    supplier_id STRING,
    unit_cost DOUBLE
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_silver/fact_inventory'
TBLPROPERTIES (
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.compression-codec'='gzip'
);

"""
spark.sql(query)

DataFrame[]

In [17]:
#spark.sql(f"DROP TABLE glue_catalog.scm_silver.fact_shipments PURGE")

In [18]:

query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.scm_silver.fact_shipments (
    shipment_id STRING,
    salesordernumber STRING,
    shipment_date DATE,
    delivery_status STRING,
    expected_delivery_date DATE,
    actual_delivery_date DATE,
    carrier_name STRING,
    delivery_location STRING,
    product_id STRING,
    promised_delivery_date STRING,
    purchase_order_id STRING,
    quantity_shipped BIGINT,
    shipping_cost DOUBLE,
    supplier_id STRING
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_silver/fact_shipments'
TBLPROPERTIES (
    'format-version'='2',
    'write.format.default'='parquet',
    'write.metadata.compression-codec'='gzip'
);

"""
spark.sql(query)

DataFrame[]

### Gold Tables 

In [19]:
query = f"""CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.fact_sales_summary (
    customer_name STRING,
    product_name STRING,
    supplier_name STRING,
    warehouse_name STRING,
    sale_date DATE,
    total_quantity_sold INT,
    total_sales_amount DOUBLE,
    total_tax_collected DOUBLE
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/fact_sales_summary'
"""
spark.sql(query)


DataFrame[]

In [20]:
query = f"""
CREATE TABLE IF NOT EXISTS  glue_catalog.scm_gold.fact_sales_summary_by_country (
    country STRING,
    total_sales_amount DECIMAL(18,2),
    total_quantity_sold BIGINT,
    total_orders BIGINT,
    last_updated TIMESTAMP
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/fact_sales_summary_by_country'
"""
spark.sql(query)

DataFrame[]

In [21]:
query = f"""CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.fact_sales_summary_by_supplier_daily (
    sales_date DATE,  -- Stores the sales date (YYYY-MM-DD)
    supplier_id STRING,
    supplier_name STRING,
    total_sales_amount DECIMAL(18,2),
    total_quantity_sold BIGINT,
    total_orders BIGINT,
    last_updated TIMESTAMP
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/fact_sales_summary_by_supplier_daily'
"""
spark.sql(query)


DataFrame[]

In [22]:
query = f"""CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.fact_sales_summary_by_supplier_monthly (
    sales_month STRING,  -- Format: 'YYYY-MM'
    supplier_id STRING,
    supplier_name STRING,
    total_sales_amount DECIMAL(18,2),
    total_quantity_sold BIGINT,
    total_orders BIGINT,
    last_updated TIMESTAMP
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/fact_sales_summary_by_supplier_monthly'
"""
spark.sql(query)

DataFrame[]

In [23]:
query = f"""CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.fact_supply_chain_summary (
    salesordernumber STRING,
    createddate DATE,
    processeddate DATE,
    quantity INT,
    unitprice DECIMAL(10, 2),
    taxamount DECIMAL(10, 2),
    customer_id STRING,
    customer_name STRING,
    product_id STRING,
    product_name STRING,
    supplier_id STRING,
    supplier_name STRING,
    warehouse_id STRING,
    warehouse_name STRING,
    current_stock_level INT,
    shipment_date DATE,
    delivery_status STRING
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/fact_supply_chain_summary'
"""
spark.sql(query)

DataFrame[]

In [24]:
query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.late_deliveries_daily (
  salesordernumber STRING,
  createddate DATE,
  expected_delivery_date DATE,
  actual_delivery_date DATE,
  delivery_status STRING,
  run_date DATE
)

USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/late_deliveries_daily'
"""
spark.sql(query)

DataFrame[]

In [25]:
query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.inventory_overstock_inactive (
  inventory_id STRING,
  product_id STRING,
  stock_level INT,
  reorder_point INT,
  last_movement_date DATE,
  days_since_last_movement INT,
  run_date DATE
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/inventory_overstock_inactive'
"""
spark.sql(query)

DataFrame[]

In [26]:
query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.high_shipping_orders (
  salesordernumber STRING,
  total_shipping_cost DOUBLE,
  shipments_per_order INT,
  run_date DATE
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/high_shipping_orders'
"""
spark.sql(query)

DataFrame[]

In [27]:
query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.order_stock_shortage (
  salesordernumber STRING,
  createddate DATE,
  processeddate DATE,
  order_processing_delay INT,
  stock_level INT,
  warehouse_id STRING,
  run_date DATE
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/order_stock_shortage'
"""
spark.sql(query)

DataFrame[]

In [28]:
query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.supplier_delivery_performance (
  supplier_id STRING,
  total_shipments INT,
  delayed_shipments INT,
  on_time_pct DOUBLE,
  run_date DATE
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/supplier_delivery_performance'
"""
spark.sql(query)

DataFrame[]

In [29]:
query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.upcoming_stock_shortage (
  product_id STRING,
  stock_level INT,
  total_demand INT,
  net_stock INT,
  run_date DATE
)

USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/upcoming_stock_shortage'
"""
spark.sql(query)

DataFrame[]

In [30]:
query = f"""

CREATE TABLE IF NOT EXISTS glue_catalog.scm_gold.sales_order_lead_time (
  salesordernumber STRING,
  createddate DATE,
  last_delivery DATE,
  total_lead_time INT,
  run_date DATE
)
USING iceberg
LOCATION 's3://scm-analytics-2025/scm_gold/sales_order_lead_time'
"""
spark.sql(query)

DataFrame[]

In [31]:
query = f"""


SELECT 'scm_silver.dim_customer', COUNT(*) FROM glue_catalog.scm_silver.dim_customer
UNION ALL
SELECT 'scm_silver.dim_product', COUNT(*) FROM glue_catalog.scm_silver.dim_product
UNION ALL
SELECT 'scm_silver.dim_supplier', COUNT(*) FROM glue_catalog.scm_silver.dim_supplier
UNION ALL
SELECT 'scm_silver.dim_warehouse', COUNT(*) FROM glue_catalog.scm_silver.dim_warehouse
UNION ALL
SELECT 'scm_silver.fact_inventory', COUNT(*) FROM glue_catalog.scm_silver.fact_inventory
UNION ALL
SELECT 'scm_silver.fact_sales_order', COUNT(*) FROM glue_catalog.scm_silver.fact_sales_order
UNION ALL
SELECT 'scm_silver.fact_shipments', COUNT(*) FROM glue_catalog.scm_silver.fact_shipments;



"""
spark.sql(query).show(truncate=False)

+---------------------------+--------+
|scm_silver.dim_customer    |count(1)|
+---------------------------+--------+
|scm_silver.dim_customer    |0       |
|scm_silver.dim_product     |0       |
|scm_silver.dim_supplier    |0       |
|scm_silver.dim_warehouse   |0       |
|scm_silver.fact_inventory  |0       |
|scm_silver.fact_sales_order|0       |
|scm_silver.fact_shipments  |0       |
+---------------------------+--------+



In [32]:
query = f"""
SELECT 'scm_gold.fact_sales_summary' AS table_name, COUNT(*) AS row_count FROM glue_catalog.scm_gold.fact_sales_summary
UNION ALL
SELECT 'scm_gold.fact_sales_summary_by_country', COUNT(*) FROM glue_catalog.scm_gold.fact_sales_summary_by_country
UNION ALL
SELECT 'scm_gold.fact_sales_summary_by_supplier_daily', COUNT(*) FROM glue_catalog.scm_gold.fact_sales_summary_by_supplier_daily
UNION ALL
SELECT 'scm_gold.fact_sales_summary_by_supplier_monthly', COUNT(*) FROM glue_catalog.scm_gold.fact_sales_summary_by_supplier_monthly
UNION ALL
SELECT 'scm_gold.fact_supply_chain_summary', COUNT(*) FROM glue_catalog.scm_gold.fact_supply_chain_summary
UNION ALL
SELECT 'scm_gold.high_shipping_orders', COUNT(*) FROM glue_catalog.scm_gold.high_shipping_orders
UNION ALL
SELECT 'scm_gold.inventory_overstock_inactive', COUNT(*) FROM glue_catalog.scm_gold.inventory_overstock_inactive
UNION ALL
SELECT 'scm_gold.late_deliveries_daily', COUNT(*) FROM glue_catalog.scm_gold.late_deliveries_daily
UNION ALL
SELECT 'scm_gold.order_stock_shortage', COUNT(*) FROM glue_catalog.scm_gold.order_stock_shortage
UNION ALL
SELECT 'scm_gold.sales_order_lead_time', COUNT(*) FROM glue_catalog.scm_gold.sales_order_lead_time
UNION ALL
SELECT 'scm_gold.supplier_delivery_performance', COUNT(*) FROM glue_catalog.scm_gold.supplier_delivery_performance



"""
spark.sql(query).show(truncate=False)

+-----------------------------------------------+---------+
|table_name                                     |row_count|
+-----------------------------------------------+---------+
|scm_gold.fact_sales_summary                    |0        |
|scm_gold.fact_sales_summary_by_country         |0        |
|scm_gold.fact_sales_summary_by_supplier_daily  |0        |
|scm_gold.fact_sales_summary_by_supplier_monthly|0        |
|scm_gold.fact_supply_chain_summary             |0        |
|scm_gold.high_shipping_orders                  |0        |
|scm_gold.inventory_overstock_inactive          |0        |
|scm_gold.late_deliveries_daily                 |0        |
|scm_gold.order_stock_shortage                  |0        |
|scm_gold.sales_order_lead_time                 |0        |
|scm_gold.supplier_delivery_performance         |0        |
+-----------------------------------------------+---------+



In [33]:
spark.sql("SELECT customer_name,region,country,start_date,end_date,is_current FROM glue_catalog.scm_silver.dim_customer where customer_id ='29951' ").show()

+-------------+------+-------+----------+--------+----------+
|customer_name|region|country|start_date|end_date|is_current|
+-------------+------+-------+----------+--------+----------+
+-------------+------+-------+----------+--------+----------+

