In [51]:
from datetime import timedelta
import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.core import Root, CreateMode
from snowflake.core.database import Database
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage
from snowflake.core.table import Table, TableColumn, PrimaryKey
from snowflake.core.task import StoredProcedureCall, Task
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask
from snowflake.core.warehouse import Warehouse
from snowflake.core._common import CreateMode
from snowflake.core.exceptions import NotFoundError

In [52]:
connection_parms = {
    'connection_name' : 'default'
}

In [53]:
session = Session.builder.configs(connection_parms).create()

In [54]:
root = Root(session)

In [12]:
df = root.databases['ZOMATO_ETL'].schemas['DBO'].tables['CITY'].fetch()

In [5]:
df = session.table('ZOMATO_ETL.DBO.CITY').to_pandas()

In [None]:
df.head()

In [56]:
with DAG('Dag',schedule = timedelta(minutes = 1)) as dag:
    dag_task_1 = DAGTask(
        'usp_insStage_Task',
        'CALL ZOMATO_ETL.DBO.sp_insstagetable()'
        )
    dag_task_2 = DAGTask(
        'usp_InsStgtoMain_Task',
        'CALL ZOMATO_ETL.DBO.sp_ins_stagetomain()'
    )
    dag_task_3 = DAGTask(
        'usp_LkpValidateStageData_Task',
        'CALL ZOMATO_ETL.DBO.sp_restaurant_lkp_data()'
    )
    dag_task_4 = DAGTask(
        'insMainTable_Task',
        '''INSERT INTO
ZOMATO_ETL.DBO.Restaurant_Data_Mapping_Table(RESTAURANT_ID,RESTAURANT_AVERAGE_COST_FOR_TWO,RESTAURANT_CUISINE_ID,RESTAURANT_CURRENCY_ID,RESTAURANT_LOCATION_ID,HAS_ONLINE_DELIVERY,HAS_TABLE_BOOKING,RESTAURANT_RATING_ID,RESTAURANT_EVENT_ID,IS_ACTIVE,CREATED_BY,CREATED_DATE)
SELECT t.RESTAURANT_ID,t.RESTAURANT_AVERAGE_COST_FOR_TWO,t.RESTAURANT_CUISINE_ID,t.RESTAURANT_CURRENCY_ID,t.RESTAURANT_LOCATION_ID,t.HAS_ONLINE_DELIVERY,t.HAS_TABLE_BOOKING,t.RESTAURANT_RATING_ID,t.RESTAURANT_EVENT_ID,t.IS_ACTIVE,t.CREATED_BY,t.CREATED_DATE FROM TABLE(sp_getlkptrsfmddata()) t
LEFT JOIN ZOMATO_ETL.DBO.Restaurant_Data_Mapping_Table r
ON t.RESTAURANT_ID = r.restaurant_id
AND COALESCE(t.RESTAURANT_CUISINE_ID,0) = COALESCE(R.RESTAURANT_CUISINE_ID,0)
AND T.RESTAURANT_CURRENCY_ID = R.RESTAURANT_CURRENCY_ID
AND T.RESTAURANT_LOCATION_ID = R.RESTAURANT_LOCATION_ID
AND t.RESTAURANT_RATING_ID = R.RESTAURANT_RATING_ID
AND COALESCE(T.RESTAURANT_EVENT_ID,0) = COALESCE(R.RESTAURANT_EVENT_ID,0)
AND T.IS_ACTIVE = TRUE
WHERE R.RESTAURANT_ID IS NULL'''
    )
    dag_task_5 = DAGTask(
        'Execute_Task_Rating_Data_Track',
        '''INSERT INTO ZOMATO_ETL.DBO.USER_RATING_HISTORY(RESTAURANT_ID,AGGREGATE_RATING,RATING_TEXT,VOTES,IS_ACTIVE,START_DATE,END_DATE,CREATED_BY,CREATED_DATE)
    select RESTAURANT_ID,AGGREGATE_RATING,RATING_TEXT,VOTES,IS_ACTIVE,START_DATE,END_DATE,CURRENT_USER(),CURRENT_TIMESTAMP() 
    from RATING_HISTORY RH'''
    )
    dag_task_6 = DAGTask(
        'Execute_Task_Event_Update_Track',
        'CALL ZOMATO_ETL.DBO.Event_Stream_Insert()'
    )
    
dag_task_1>>dag_task_2>>dag_task_3>>dag_task_4>>dag_task_5>>dag_task_6
schema = root.databases['ZOMATO_ETL'].schemas['DBO']
dag_op = DAGOperation(schema)
try:
    dag_op.drop('dag')
except NotFoundError:
    pass
dag_op.deploy(dag)