# **Part-5: Stream & Tasks for Landing Layers**

## **Objectives of part-5:**

* In Part 5, we focus on establishing streams and tasks for the landing layers.
* The goal is to set up streams within the landing zone and tasks in the curated zone.
* Stream data will be inserted into the S3 location, and tasks will process the changes from the streams.

**Stream Creation:**

* Create three streams: item_stream, customer_stream, and order_stream.
* Streams are set to be append-only and are enabled to capture changes.



In [None]:
use schema chk.landing_zone;

In [None]:
create or replace stream landing_item_stm on table landing_item
      append_only = true;

      create or replace stream landing_customer_stm on table landing_customer
      append_only = true;

      create or replace stream landing_order_stm on table landing_order
      append_only = true;

**Switching Context:**

* Change the context to the curated zone for task creation.


**Order Curated Task:**

* Create an order_curated_task that runs every minute.
* This task executes SQL statements only when data is received in the landing order stream.
* The task uses a MERGE command to update or insert records in the curated order table based on changes.

**Customer Curated Task:**

* Create a customer_curated_task that runs every second minute.
* This task processes customer data after the order task.
* It uses a MERGE command to update or insert records in the curated customer table based on changes.

**Item Curated Task:**

* Create an item_curated_task that runs every third minute.
* This task processes item data.
* It uses a MERGE command to update or insert records in the curated item table based on changes.

In [None]:
use schema chk.curated_zone;

In [None]:
create or replace task order_curated_tsk
          warehouse = compute_wh
          schedule  = '1 minute'
      when
          system$stream_has_data('chk.landing_zone.landing_order_stm')
      as
        merge into ch19.curated_zone.curated_order curated_order
        using ch19.landing_zone.landing_order_stm landing_order_stm on
        curated_order.order_date = landing_order_stm.order_date and
        curated_order.order_time = landing_order_stm.order_time and
        curated_order.item_id = landing_order_stm.item_id and
        curated_order.item_desc = landing_order_stm.item_desc
      when matched
         then update set
            curated_order.customer_id = landing_order_stm.customer_id,
            curated_order.salutation = landing_order_stm.salutation,
            curated_order.first_name = landing_order_stm.first_name,
            curated_order.last_name = landing_order_stm.last_name,
            curated_order.store_id = landing_order_stm.store_id,
            curated_order.store_name = landing_order_stm.store_name,
            curated_order.order_quantity = landing_order_stm.order_quantity,
            curated_order.sale_price = landing_order_stm.sale_price,
            curated_order.disount_amt = landing_order_stm.disount_amt,
            curated_order.coupon_amt = landing_order_stm.coupon_amt,
            curated_order.net_paid = landing_order_stm.net_paid,
            curated_order.net_paid_tax = landing_order_stm.net_paid_tax,
            curated_order.net_profit = landing_order_stm.net_profit
          when not matched then
          insert (
            order_date ,
            order_time ,
            item_id ,
            item_desc ,
            customer_id ,
            salutation ,
            first_name ,
            last_name ,
            store_id ,
            store_name ,
            order_quantity ,
            sale_price ,
            disount_amt ,
            coupon_amt ,
            net_paid ,
            net_paid_tax ,
            net_profit )
          values (
            landing_order_stm.order_date ,
            landing_order_stm.order_time ,
            landing_order_stm.item_id ,
            landing_order_stm.item_desc ,
            landing_order_stm.customer_id ,
            landing_order_stm.salutation ,
            landing_order_stm.first_name ,
            landing_order_stm.last_name ,
            landing_order_stm.store_id ,
            landing_order_stm.store_name ,
            landing_order_stm.order_quantity ,
            landing_order_stm.sale_price ,
            landing_order_stm.disount_amt ,
            landing_order_stm.coupon_amt ,
            landing_order_stm.net_paid ,
            landing_order_stm.net_paid_tax ,
            landing_order_stm.net_profit );



In [None]:

      create or replace task customer_curated_tsk
          warehouse = compute_wh
          schedule  = '2 minute'
      when
          system$stream_has_data('customer_stm') AND system$stream_has_data('order_stm')
      as
      merge into chK.curated_zone.curated_customer curated_customer
      using chK.landing_zone.landing_customer_stm landing_customer_stm on
      curated_customer.customer_id = landing_customer_stm.customer_id
      when matched
         then update set
            curated_customer.salutation = landing_customer_stm.salutation,
            curated_customer.first_name = landing_customer_stm.first_name,
            curated_customer.last_name = landing_customer_stm.last_name,
            curated_customer.birth_day = landing_customer_stm.birth_day,
            curated_customer.birth_month = landing_customer_stm.birth_month,
            curated_customer.birth_year = landing_customer_stm.birth_year,
            curated_customer.birth_country = landing_customer_stm.birth_country,
            curated_customer.email_address = landing_customer_stm.email_address
      when not matched then
        insert (
          customer_id ,
          salutation ,
          first_name ,
          last_name ,
          birth_day ,
          birth_month ,
          birth_year ,
          birth_country ,
          email_address )
        values (
          landing_customer_stm.customer_id ,
          landing_customer_stm.salutation ,
          landing_customer_stm.first_name ,
          landing_customer_stm.last_name ,
          landing_customer_stm.birth_day ,
          landing_customer_stm.birth_month ,
          landing_customer_stm.birth_year ,
          landing_customer_stm.birth_country ,
          landing_customer_stm.email_address );



In [None]:

      create or replace task item_curated_tsk
          warehouse = compute_wh
          schedule  = '3 minute'
      when
          system$stream_has_data('chk.landing_zone.landing_item_stm')
      as
      merge into chK.curated_zone.curated_item item using chK.landing_zone.landing_item_stm landing_item_stm on
      item.item_id = landing_item_stm.item_id and
      item.item_desc = landing_item_stm.item_desc and
      item.start_date = landing_item_stm.start_date
      when matched
         then update set
            item.end_date = landing_item_stm.end_date,
            item.price = landing_item_stm.price,
            item.item_class = landing_item_stm.item_class,
            item.item_category = landing_item_stm.item_category
      when not matched then
        insert (
          item_id,
          item_desc,
          start_date,
          end_date,
          price,
          item_class,
          item_category)
        values (
          landing_item_stm.item_id,
          landing_item_stm.item_desc,
          landing_item_stm.start_date,
          landing_item_stm.end_date,
          landing_item_stm.price,
          landing_item_stm.item_class,
          landing_item_stm.item_category);



In [None]:
alter task order_curated_tsk resume;
alter task customer_curated_tsk resume;
alter task item_curated_tsk resume;

select *  from table(information_schema.task_history())
where name in ('CUSTOMER_CURATED_TSK' ,'ITEM_CURATED_TSK','ORDER_CURATED_TSK')
order by scheduled_time;

**Task Execution and Monitoring:**

* Resume the tasks to start their execution.
* Monitor task execution status.
* Tasks will execute MERGE commands based on the stream data they are linked to.

**Stream and Task Overview:**

* Stream and task objects can be visualized in Snowflake's Data Pipelines.
* Customer, item, and order pipes correspond to customer_stream, item_stream, and order_stream.
* Curated zone tasks include order_curated_task, customer_curated_task, and item_curated_task.

**Conclusion of Part 5:**

* Part 5 establishes streams for capturing changes and tasks for processing in the curated zone.
* Streams are set up to capture delta data.
* Tasks are created to process changes and maintain curated tables.
* Each schema (landing and curated) serves its purpose in this data loading and processing pipeline.
* Part 6 will continue the process by moving to the consumption zone and setting up views for reporting.





