#### Session Connection

In [1]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col


connection_parameters = {
    "account": "<your snowflake account identifier>",
    "user": "<your snowflake username>",
    "password": "<your snowflake password>",
    "role": "<your snowflake role>",
    "warehouse": "<your snowflake warehouse>",  # optional
    "database": "<your snowflake database>",  # optional
    "schema": "<your snowflake schema>" # optional
  }

session = Session.builder.configs(connection_parameters).create()

#### Loading Required Tables

In [2]:
purchase_history = session.table("PURCHASE_HISTORY")
campaign_info = session.table("CAMPAIGN_INFO")
complain_info = session.table("COMPLAINT_INFO")
marketing_additional = session.table("MARKETING_ADDITIONAL")

## Data Engineering Pipelines

#### Step 1 -  Joining Purchase History table with Campaign Info

In [3]:
def combine_campaign_table(purchase_history,campaign_info):
    purchase_campaign = purchase_history.join(campaign_info, purchase_history.ID == campaign_info.ID\
                        ,lsuffix="_left", rsuffix="_right")
    purchase_campaign = purchase_campaign.drop("ID_RIGHT")
    return purchase_campaign

 #### Step 2 -  Joining Purchase Campaign With Complain Info Table

In [4]:
def combine_complain_table(purchase_campaign,complain_info):
    purchase_campaign_complain = purchase_campaign.join(complain_info, purchase_campaign["ID_LEFT"] == complain_info.ID)
    purchase_campaign_complain = purchase_campaign_complain.drop("ID_LEFT")
    return purchase_campaign_complain

#### Step 3 -  Union Additional Marketing Table with Purchase Campaign Complain Data

In [5]:
def union_marketing_additional_table(purchase_campaign_complain,marketing_additional):
    final_marketing_table = purchase_campaign_complain.union_by_name(marketing_additional)
    return final_marketing_table


#### Store Procedure - Data Preparation

In [6]:
from snowflake.snowpark.functions import sproc
import snowflake

def data_prep(session: Session):


    #### Loading Required Tables
    purchase_history = session.table("PURCHASE_HISTORY")
    campaign_info = session.table("CAMPAIGN_INFO")
    complain_info = session.table("COMPLAINT_INFO")
    marketing_additional = session.table("MARKETING_ADDITIONAL")

    #### Calling Step 1
    purchase_campaign = combine_campaign_table(purchase_history,campaign_info)

    #### Calling Step 2
    purchase_campaign_complain =  combine_complain_table(purchase_campaign,complain_info)

    #### Calling Step 3
    final_marketing_data = union_marketing_additional_table(purchase_campaign_complain,marketing_additional)

    #### Writing Combined Data To New Table
    final_marketing_data.write.save_as_table("FINAL_MARKETING_DATA")
    return "LOADED FINAL MARKETING DATA TABLE"



# Create an instance of StoredProcedure using the sproc() function
from snowflake.snowpark.types import IntegerType,StringType
data_prep_sproc = sproc(
                        func= data_prep,\
                        replace=True,\
                        return_type = StringType(),\
                        stage_location="@my_stage",\
                        packages=["snowflake-snowpark-python"]
                        )


#### Stored Procedure -  Data Transformation

In [28]:

def data_transform(session: Session):

    #### Loading Required Tables
    marketing_final = session.table("FINAL_MARKETING_DATA")
    market_subset = marketing_final.select("EDUCATION","MARITAL_STATUS","INCOME")
    market_pivot = market_subset.pivot("EDUCATION",["Graduation","PhD","Master","Basic","2n Cycle"]).sum("INCOME")


    #### Writing Transformed Data To New Table
    market_pivot.write.save_as_table("MARKETING_PIVOT")
    return "CREATED MARKETING PIVOT TABLE"


data_transform_sproc = sproc(
                        func= data_transform,\
                        replace=True,\
                        return_type = StringType(),\
                        stage_location="@my_stage",\
                        packages=["snowflake-snowpark-python"]
                        )
    

#### Stored Procedure -  Data Cleanup

In [29]:

def data_cleanup(session: Session):

    #### Loading Required Tables
    market_pivot = session.table("MARKETING_PIVOT")
    
    market_drop_null = market_pivot.dropna(thresh=5)


    #### Writing Cleaned Data To New Table
    market_drop_null.write.save_as_table("MARKET_PIVOT_CLEANED")
    return "CREATED CLEANED TABLE"


data_cleanup_sproc = sproc(
                        func= data_cleanup,\
                        replace=True,\
                        return_type = StringType(),\
                        stage_location="@my_stage",\
                        packages=["snowflake-snowpark-python"]
                        )
    

In [31]:
# session.sql("drop table FINAL_MARKETING_DATA;").show()
# session.sql("drop table MARKETING_PIVOT;").show()
# session.sql("drop table MARKET_PIVOT_CLEANED;").show()

#### Calling Stored Procedure

In [38]:

#### Calling Data Preparation Stored Procedure
data_prep_sproc()

#### Calling Data Transformation Stored Procedure
data_transform_sproc()

#### Calling Data Cleanup Stored Procedure
data_cleanup_sproc()


'CREATED MARKETING PIVOT TABLE'

## Logging & Trace In Snowpark

#### Creating and Setting Up Event Table

In [None]:
session.sql('''CREATE EVENT TABLE MY_EVENTS;''').show()
session.sql('''ALTER ACCOUNT SET EVENT_TABLE = SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA.MY_EVENTS;''').show()
session.sql('''alter session set log_level = INFO;''').show()

#### Data Prep Stored Procedure with Info Logs

In [33]:
from snowflake.snowpark.functions import sproc
import logging

def data_prep(session: Session):

    ## Initializing Logger
    logger = logging.getLogger("My_Logger")
    logger.info("Data Preparation Pipeline Starts")
    
    #### Loading Required Tables
    logger.info("Loading Required Tables")

    purchase_history = session.table("PURCHASE_HISTORY")
    campaign_info = session.table("CAMPAIGN_INFO")
    complain_info = session.table("COMPLAINT_INFO")
    marketing_additional = session.table("MARKETING_ADDITIONAL")

    #### Calling Step 1
    purchase_campaign = combine_campaign_table(purchase_history,campaign_info)

    logger.info("Joined Purchase and Campaign Tables")

    #### Calling Step 2
    purchase_campaign_complain =  combine_complain_table(purchase_campaign,complain_info)

    logger.info("Joined Complain Table")

    #### Calling Step 3
    final_marketing_data = union_marketing_additional_table(purchase_campaign_complain,marketing_additional)

    logger.info("Final Marketing Data Created")

    #### Writing Combined Data To New Table
    final_marketing_data.write.save_as_table("FINAL_MARKETING_DATA")

    logger.info("Final Marketing Data Table Created")    
    return "LOADED FINAL MARKETING DATA TABLE"


##################################################################
## Register Stored Procedure in Snowflake

### Add packages and data types
from snowflake.snowpark.types import StringType
session.add_packages('snowflake-snowpark-python')

### Upload Stored Produre to Snowflake
session.sproc.register(
    func = data_prep
  , return_type = StringType()
  , input_types = []
  , is_permanent = True
  , name = 'DATA_PREP_SPROC_LOG'
  , replace = True
  , stage_location = '@MY_STAGE'
)


<snowflake.snowpark.stored_procedure.StoredProcedure at 0x2088c571940>

#### Excecuting Stored Procedure - Logging

In [37]:

session.sql(''' Call DATA_PREP_SPROC_LOG()''').show()

-------------------------------------
|"DATA_PREP_SPROC_LOG"              |
-------------------------------------
|LOADED FINAL MARKETING DATA TABLE  |
-------------------------------------



##### Retriving the Logs

In [44]:
session.sql("""SELECT RECORD['severity_text'] AS SEVERITY,VALUE AS MESSAGE
        FROM MY_EVENTS
        WHERE SCOPE['name'] = 'My_Logger'
        AND RECORD_TYPE = 'LOG'""").show()

------------------------------------------------------
|"SEVERITY"  |"MESSAGE"                              |
------------------------------------------------------
|"INFO"      |"Data Preparation Pipeline Starts"     |
|"INFO"      |"Loading Required Tables"              |
|"INFO"      |"Joined Purchase and Campaign Tables"  |
|"INFO"      |"Joined Complain Table"                |
|"INFO"      |"Final Marketing Data Created"         |
|"INFO"      |"Final Marketing Data Table Created"   |
------------------------------------------------------



#### Handling Exceptions

In [49]:
def data_transform(session: Session):
    try:

        ## Initializing Logger
        logger = logging.getLogger("Data_Transform_Logger")
        logger.info("Data Transformation Pipeline Starts")

        ## Pivoting Process 
        marketing_final = session.table("FINAL_MARKETING_DATA")
        market_subset = marketing_final.select("EDUCATION","MARITAL_STATUS","INCOME")
        market_pivot = market_subset.pivot("EDUCATION",["Graduation","PhD","Master","Basic","2n Cycle"]).sum("INCOME")

        #### Writing Transformed Data To New Table
        market_pivot.write.save_as_table("MAREKTING_PIVOT")
        logger.log("MARKETING PIVOT TABLE CREATED")
        return "CREATED MARKETING PIVOT TABLE"

    except Exception as err:
        logger.error("Logging an error from Python handler: ")
        logger.error(err)
        return "ERROR"


##################################################################
## Register Stored Procedure in Snowflake

### Add packages and data types
from snowflake.snowpark.types import StringType
session.add_packages('snowflake-snowpark-python')

### Upload Stored Produre to Snowflake
session.sproc.register(
    func = data_transform
  , return_type = StringType()
  , input_types = []
  , is_permanent = True
  , name = 'DATA_TRANSFORM_SPROC_LOG_ERROR'
  , replace = True
  , stage_location = '@MY_STAGE'
)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x192d394d190>

In [50]:
session.sql(''' Call DATA_TRANSFORM_SPROC_LOG_ERROR()''').show()

------------------------------------
|"DATA_TRANSFORM_SPROC_LOG_ERROR"  |
------------------------------------
|ERROR                             |
------------------------------------



In [60]:
session.sql("""SELECT RECORD['severity_text'] AS SEVERITY,VALUE AS MESSAGE
        FROM MY_EVENTS
        WHERE SCOPE['name'] = 'Data_Transform_Logger'
        AND RECORD_TYPE = 'LOG'""").collect()

[Row(SEVERITY='"INFO"', MESSAGE='"Data Preparation Pipeline Starts"'),
 Row(SEVERITY='"ERROR"', MESSAGE='"Logging an error from Python handler: "'),
 Row(SEVERITY='"ERROR"', MESSAGE='"(1304): 01aea100-0001-0458-0001-9ba600015746: 002002 (42710): 01aea100-0001-0458-0001-9ba600015746: SQL compilation error:\\nObject \'MAREKTING_PIVOT\' already exists."'),
 Row(SEVERITY='"INFO"', MESSAGE='"Data Transformation Pipeline Starts"'),
 Row(SEVERITY='"ERROR"', MESSAGE='"Logging an error from Python handler: "'),
 Row(SEVERITY='"ERROR"', MESSAGE='"(1304): 01aea101-0001-0458-0001-9ba60001576a: 002002 (42710): 01aea101-0001-0458-0001-9ba60001576a: SQL compilation error:\\nObject \'MAREKTING_PIVOT\' already exists."')]

#### Setting Up Event Traces

In [34]:
#session.sql("alter session set trace_level = ON_EVENT;").show()
session.sql("ALTER SESSION SET TRACE_LEVEL = ALWAYS;").show()

------------------------------------
|"status"                          |
------------------------------------
|Statement executed successfully.  |
------------------------------------



In [40]:

def data_cleanup(session: Session):

    #### Loading Telemetry Package
    from snowflake import telemetry

    #### Loading Required Tables
    market_pivot = session.table("MARKETING_PIVOT")
    
    #### Adding Trace Event
    telemetry.add_event("data_cleanup", {"table_name": "MARKETING_PIVOT", "count": market_pivot.count()})

    #### Dropping Null
    market_drop_null = market_pivot.dropna(thresh=5)

    #### Writing Cleaned Data To New Table
    market_drop_null.write.save_as_table("MARKET_PIVOT_CLEANED")

    #### Adding Trace Event
    telemetry.add_event("data_cleanup", {"table_name": "MARKET_PIVOT_CLEANED", "count": market_drop_null.count()})

    return "CREATED CLEANED TABLE"


##################################################################
## Register Stored Procedure in Snowflake

### Add packages and data types
from snowflake.snowpark.types import StringType
session.add_packages('snowflake-snowpark-python', 'snowflake-telemetry-python')

### Upload Stored Produre to Snowflake
session.sproc.register(
    func = data_cleanup
  , return_type = StringType()
  , input_types = []
  , is_permanent = True
  , name = 'DATA_CLEANUP_SPROC_TRACE'
  , replace = True
  , stage_location = '@MY_STAGE'
)  

The version of package snowflake-telemetry-python in the local environment is 0.2.0, which does not fit the criteria for the requirement snowflake-telemetry-python. Your UDF might not work when the package version is different between the server and your local environment


<snowflake.snowpark.stored_procedure.StoredProcedure at 0x2088c66d340>

In [41]:
session.sql(''' Call DATA_CLEANUP_SPROC_TRACE()''').show()

------------------------------
|"DATA_CLEANUP_SPROC_TRACE"  |
------------------------------
|CREATED CLEANED TABLE       |
------------------------------



In [57]:
session.sql(""" SELECT
  TIMESTAMP as time,
  RESOURCE_ATTRIBUTES['snow.executable.name'] as handler_name,
  RESOURCE_ATTRIBUTES['snow.executable.type'] as handler_type,
  RECORD['name'] as event_name,
  RECORD_ATTRIBUTES as attributes
FROM
  MY_EVENTS
WHERE
  EVENT_NAME  ='data_cleanup'
""").show(2)

----------------------------------------------------------------------------------------------------------------------------------------------------------
|"TIME"                      |"HANDLER_NAME"                                  |"HANDLER_TYPE"  |"EVENT_NAME"    |"ATTRIBUTES"                            |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|2023-08-29 16:12:14.767718  |"DATA_CLEANUP_SPROC_TRACE():VARCHAR(16777216)"  |"PROCEDURE"     |"data_cleanup"  |{                                       |
|                            |                                                |                |                |  "count": 8,                           |
|                            |                                                |                |                |  "table_name": "MARKETING_PIVOT"       |
|                            |                                        

In [36]:
session.sql("drop table MARKET_PIVOT_CLEANED;").show()

-----------------------------------------
|"status"                               |
-----------------------------------------
|MARKETING_PIVOT successfully dropped.  |
-----------------------------------------

----------------------------------------------
|"status"                                    |
----------------------------------------------
|MARKET_PIVOT_CLEANED successfully dropped.  |
----------------------------------------------

