# Snowflake Trail

In [None]:
USE DATABASE BUILD_DEMOS;

## Traces and Logs

In [None]:
-- table for the raw records
CREATE OR REPLACE TABLE Raw_Records(
record_id INT PRIMARY KEY,
    record_text VARCHAR(50000)

);
-- insert some records into the table
INSERT INTO Raw_Records
(
SELECT 
uniform(176, 200, random()) as record_id,
'This is a Record' as record_text
FROM table(generator(rowCount => 25))
);

select * from Raw_Records;


In [None]:
-- UDF
CREATE OR REPLACE FUNCTION BUILD_DEMOS.PUBLIC.UDF_PARSE_REVIEWS("INPUT" NUMBER(38,0))
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','snowflake-telemetry-python')
HANDLER = 'compute'
AS '
import logging
from snowflake import telemetry
import time

def compute(input):

    try: 
      telemetry.set_span_attribute("executing", "udf_parse_reviews");
    
      telemetry.add_event(f"parsing record_id: (\\"{input}\\")")


      time.sleep(int(input)/1000)
    
    
      num = 1000/int(input)
      telemetry.add_event(f"parsed record_id: (\\"{input}\\")")
      
    except:
      logging.error(f"Error occurred for record_id:(\\"{input}\\")" )
      telemetry.add_event(f"NOT parsed record_id: (\\"{input}\\")")
      
    return "Done"
';


In [None]:
--Main Stored Procedure
CREATE OR REPLACE PROCEDURE BUILD_DEMOS.PUBLIC.SP_INGEST_RAW_USER_REVIEWS()
RETURNS VARCHAR(16777216)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','snowflake-telemetry-python','opentelemetry-api','joblib','numpy')
HANDLER = 'compute'
EXECUTE AS OWNER
AS '
from opentelemetry import trace
from snowflake import telemetry
from snowflake.snowpark.functions import call_udf, col
import logging
import time
import pandas as pd

telemetry.add_event(''initializing stored procedure'')

def compute(session):
  telemetry.add_event(''stored procedure starting'')
  logging.warn(f"Inside Analyze_JSON()")

  telemetry.set_span_attribute("executing", "SP_Ingest_Raw_User_Reviews");
  
  session.table(''build_demos.public.raw_records'')\\
        .select(
            call_udf(''udf_parse_reviews'', col(''record_id'')))\\
        .collect()

  python_df = session.create_dataframe(["a", "b", "c"])

  pandas_df = python_df.to_pandas()

  telemetry.add_event(''stored procedure completed'')
  
  return "OK"
  
';


In [None]:
CREATE OR REPLACE PROCEDURE BUILD_DEMOS.PUBLIC.SP_INGEST_RAW_USER_REVIEWS()
RETURNS VARCHAR(16777216)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','snowflake-telemetry-python','opentelemetry-api','joblib')
HANDLER = 'compute'
EXECUTE AS OWNER
AS '
from opentelemetry import trace
from snowflake import telemetry
from snowflake.snowpark.functions import call_udf, col
import logging
import time
import pandas as pd
import numpy as np

telemetry.add_event(''initializing stored procedure'')

def compute(session):
  telemetry.add_event(''stored procedure starting'')
  logging.warn(f"Inside Analyze_JSON()")

  telemetry.set_span_attribute("executing", "SP_Ingest_Raw_User_Reviews");
  
  session.table(''build_demos.public.raw_records'')\\
        .select(
            call_udf(''udf_parse_reviews'', col(''record_id'')))\\
        .collect()

  #Create a DataFrame with 100000 rows and 10 columns
  data = np.random.rand(100_000, 10)
  columns = [f"mycolumn_{i}" for i in range(10)]
  pandas_df = pd.DataFrame(data, columns=columns)

  python_df = session.create_dataframe(pandas_df)
 
  pandas_df = python_df.to_pandas()

  # doing some custom work
  tracer = trace.get_tracer(__name__)
  with tracer.start_as_current_span(f"CUSTOM_SPAN"):
    resp = session.call(''build_demos.public.SP_Processing_Work'')

  telemetry.add_event(''stored procedure completed'')
  
  return "OK"
  
';

In [None]:
-- second (child) Stored Procedure
CREATE OR REPLACE PROCEDURE build_demos.PUBLIC.SP_PROCESSING_WORK()
RETURNS VARCHAR(16777216)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','snowflake-telemetry-python','opentelemetry-api','joblib','numpy')
HANDLER = 'compute'
EXECUTE AS OWNER
AS '
from opentelemetry import trace
from snowflake import telemetry
import logging
import time
import numpy as np
import pandas as pd
from snowflake.snowpark.functions import call_udf, col


def compute(session):

  input_data = [[1, 2, 3, 4, 5]] * 200_000
  df = pd.DataFrame(input_data)

  tracer = trace.get_tracer(__name__)
  with tracer.start_as_current_span(f"custom_span_create_pd_df"):
  # Create a large DataFrame with random data
    large_df = pd.DataFrame(np.random.rand(1_000_000, 600))
    telemetry.set_span_attribute("executing", "SP_Processing_Work custom span - custom_span_create_pd_df");
    telemetry.add_event("custom span custom_span_create_pd_df completed")
    
  with tracer.start_as_current_span(f"custom_span_concat_pd_df"):
  # Concatenate the input DataFrame with the large DataFrame
    concat_df = pd.concat([df, large_df], ignore_index=True)
    telemetry.set_span_attribute("executing", "SP_Processing_Work custom span - custom_span_concat_pd_df");
    telemetry.add_event("custom span custom_span_concat_pd_df completed")

    
  return "OK"
';


In [None]:
ALTER DATABASE {{database_name}} SET LOG_LEVEL = DEBUG;
ALTER SCHEMA {{schema_name}} SET LOG_LEVEL = DEBUG;

ALTER DATABASE {{database_name}} SET METRIC_LEVEL = ALL;
ALTER SCHEMA {{schema_name}} SET METRIC_LEVEL = ALL;

ALTER SESSION SET METRIC_LEVEL = ALL;

In [None]:
Call SP_INGEST_RAW_USER_REVIEWS();

# Serverless Alerts and Notifications