In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
create or replace file format frosty_csv
    type = csv
    skip_header = 1
    field_optionally_enclosed_by = '"';

create or replace stage w29_stage
    url = 's3://frostyfridaychallenges/challenge_29/'
    file_format = frosty_csv;
    
list @w29_stage;
    
create or replace table week29 as     
select t.$1::int as id, 
        t.$2::varchar(100) as first_name, 
        t.$3::varchar(100) as surname, 
        t.$4::varchar(250) as email, 
        t.$5::datetime as start_date 
from @w29_stage (pattern=>'.*start_dates.*') t;

create or replace stage func_stg;

## 解法１

In [None]:
import datetime

from snowflake.snowpark import functions as F

@F.udf(name="fiscal_year", replace=True, is_permanent=True, stage_location="func_stg")
def fiscal_year(date: datetime.date) -> int:
    if date.month >= 5:
        result = date.year + 1
    else:
        result =  date.year

    
    return result

In [None]:
data = session.table("week29").select(
    F.col("id"),
    F.col("first_name"),
    F.col("surname"),
    F.col("email"),
    F.col("start_date"),
    F.call_udf("fiscal_year", F.col("start_date")).alias("fiscal_year")
)

data.show()

In [None]:
data.group_by("fiscal_year").agg(F.col("*"), "count").show()

## Snowflake Trail（Trace）によるパフォーマンスチェック

In [None]:
ALTER SESSION SET TRACE_LEVEL = ALWAYS;

### 解法１にTraceを追加

In [None]:
import datetime

from snowflake.snowpark import functions as F

@F.udf(name="fiscal_year_trace", replace=True, is_permanent=True, stage_location="func_stg", packages=["snowflake-telemetry-python"])
def fiscal_year(date: datetime.date) -> int:
    from snowflake import telemetry

    telemetry.set_span_attribute("func.fiscal_year", "begin")
    telemetry.add_event("func.fiscal_year event")
    if date.month >= 5:
        result = date.year + 1
    else:
        result =  date.year

    telemetry.set_span_attribute("func.fiscal_year", "finish")
    
    return result

In [None]:
data = session.table("week29").select(
    F.col("id"),
    F.col("first_name"),
    F.col("surname"),
    F.col("email"),
    F.col("start_date"),
    F.call_udf("fiscal_year_trace", F.col("start_date")).alias("fiscal_year")
)

data.show()

In [None]:
data.group_by("fiscal_year").agg(F.col("*"), "count").show()

### 解法２ and Trace
Vectorized UDF を使用してみる

In [None]:
import datetime

import numpy as np
from snowflake.snowpark.types import PandasSeries
from snowflake.snowpark import functions as F

@F.udf(name="fiscal_year_batch_trace", replace=True, is_permanent=True, stage_location="func_stg", packages=["snowflake-telemetry-python"])
def fiscal_year_batch(date: PandasSeries[datetime.datetime]) -> PandasSeries[int]:
    from snowflake import telemetry

    telemetry.set_span_attribute("func.fiscal_year_batch", "begin")
    telemetry.add_event("func.fiscal_year_batch event")
    
    result = np.where(date.dt.month >= 5, date.dt.year + 1, date.dt.year)

    telemetry.set_span_attribute("func.fiscal_year_batch", "finish")
    
    return result

In [None]:
data = session.table("week29").select(
    F.col("id"),
    F.col("first_name"),
    F.col("surname"),
    F.col("email"),
    F.col("start_date"),
    F.call_udf("fiscal_year_batch_trace", F.col("start_date")).alias("fiscal_year")
)

data.show()

In [None]:
data.group_by("fiscal_year").agg(F.col("*"), "count").show()