In [None]:
CREATE DATABASE LEARNING;
CREATE SCHEMA LEARNING.raw;
CREATE SCHEMA LEARNING.silver;
CREATE SCHEMA LEARNING.gold;

In [None]:
USE SCHEMA LEARNING.RAW;

In [None]:
-- CREATE integration in Integrations
CREATE STORAGE INTEGRATION s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<accountarn>:role/<role>'
STORAGE_ALLOWED_LOCATIONS = ('s3://aws-devtest-bucket2/snowflake/');


DESCRIBE INTEGRATION s3_int;

In [None]:
CREATE OR REPLACE FILE FORMAT json_ff
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE;

In [None]:
CREATE STAGE s3_stage
URL = 's3://aws-devtest-bucket2/snowflake/'
STORAGE_INTEGRATION = s3_int
FILE_FORMAT = json_ff;

DESCRIBE STAGE s3_stage;

In [None]:
CREATE OR REPLACE TABLE LEARNING.RAW.orders (
    payload VARIANT,
    load_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

In [None]:
CREATE OR REPLACE PIPE LEARNING.RAW.orders_pipe
AUTO_INGEST = TRUE
AS
COPY INTO raw.orders(payload)
FROM @s3_stage;

DESCRIBE PIPE LEARNING.RAW.orders_pipe;

-- after creating the auto_ingest pipeline copy the notification_channel of the pipe and crate Event notification (SQS) in S3 bucket and paste the notification channel there
-- so whever new file ingested into s3 bucket the auto snowpipe get triggers

In [None]:
SELECT *
  FROM LEARNING.RAW.orders;

In [None]:
-- creating two stream on same table for two different table to use because stream is read once
CREATE OR REPLACE STREAM LEARNING.RAW.orders_stream
ON TABLE LEARNING.RAW.orders;

CREATE OR REPLACE STREAM LEARNING.RAW.orders_history_stream
ON TABLE LEARNING.RAW.orders;


In [None]:
CREATE OR REPLACE TABLE LEARNING.SILVER.orders (
    operation_type STRING,
    operation_timestamp TIMESTAMP,
    order_id STRING PRIMARY KEY,
    customer_id STRING,
    amount NUMBER,
    order_ts TIMESTAMP,
    updated_at TIMESTAMP
);

CREATE OR REPLACE TABLE LEARNING.SILVER.orders_history (
    operation_type STRING,
    operation_timestamp TIMESTAMP,
    order_id STRING,
    customer_id STRING,
    amount NUMBER,
    order_ts TIMESTAMP,
    updated_at TIMESTAMP
);

In [None]:
CREATE OR REPLACE TASK LEARNING.SILVER.task_orders_history
WAREHOUSE = SNOWFLAKE_LEARNING_WH
WHEN SYSTEM$STREAM_HAS_DATA('LEARNING.RAW.orders_history_stream')
AS
BEGIN

  INSERT INTO LEARNING.SILVER.orders_history (
      operation_type,
      operation_timestamp,
      order_id,
      customer_id,
      amount,
      order_ts,
      updated_at
  )
  SELECT
      CASE WHEN payload:op::STRING in ('c', 'r') THEN 'insert'
           WHEN payload:op::STRING = 'u' THEN 'update'
           WHEN payload:op::STRING = 'd' THEN 'delete' END :: STRING AS operation_type,
      load_ts::TIMESTAMP                                     AS operation_timestamp,

      /* Handle INSERT / UPDATE / DELETE safely */
      COALESCE(
          payload:after:order_id::STRING,
          payload:before:order_id::STRING
      )                                                      AS order_id,

      payload:after:customer_id::STRING                      AS customer_id,
      payload:after:amount::NUMBER                           AS amount,
      payload:after:order_ts::TIMESTAMP                      AS order_ts,
      NULL                                                   AS updated_at
  FROM LEARNING.RAW.orders_history_stream;
END ;

In [None]:
CREATE OR REPLACE TASK LEARNING.SILVER.task_orders
WAREHOUSE = SNOWFLAKE_LEARNING_WH
WHEN SYSTEM$STREAM_HAS_DATA('LEARNING.RAW.orders_stream')
AS
BEGIN

  MERGE INTO LEARNING.SILVER.orders t
  USING (
    SELECT
      payload:op::STRING                                     AS operation_type,
      load_ts::TIMESTAMP                                     AS operation_timestamp,

      /* Handle INSERT / UPDATE / DELETE safely */
      COALESCE(
          payload:after:order_id::STRING,
          payload:before:order_id::STRING
      )                                                      AS order_id,

      payload:after:customer_id::STRING                      AS customer_id,
      payload:after:amount::NUMBER                           AS amount,
      payload:after:order_ts::TIMESTAMP                      AS order_ts,
      CURRENT_TIMESTAMP                                      AS updated_at
    FROM LEARNING.RAW.orders_stream
  ) s
  ON t.order_id = s.order_id

  WHEN MATCHED AND s.operation_type = 'd' THEN DELETE
  WHEN MATCHED AND s.operation_type = 'u' THEN
    UPDATE SET
      t.operation_type = 'update',
      t.operation_timestamp = s.operation_timestamp,
      t.customer_id = s.customer_id,
      t.amount = s.amount,
      t.updated_at = s.order_ts

  WHEN NOT MATCHED AND s.operation_type IN ('c', 'r') THEN
    INSERT (operation_type, operation_timestamp, order_id, customer_id, amount, order_ts, updated_at)
    VALUES ('insert', s.operation_timestamp, s.order_id, s.customer_id, s.amount, s.order_ts, s.order_ts);
END;

In [None]:
ALTER TASK LEARNING.SILVER.task_orders_history RESUME;
ALTER TASK LEARNING.SILVER.task_orders RESUME;

In [None]:
SELECT * FROM LEARNING.SILVER.orders_history;
SELECT * FROM LEARNING.SILVER.orders;