In [None]:
use role sysadmin;
use warehouse gaku_wh;
create or replace database ff_week_10;
use database ff_week_10;

In [None]:
-- create warehouse
create or replace warehouse ff_week10_xsmall_wh 
    with warehouse_size = XSMALL
    auto_suspend = 60
    auto_resume = true
    initially_suspended = true 
    statement_timeout_in_seconds = 3600
    comment = 'Frosty Friday Week10 検証用'   
;
    
create or replace warehouse ff_week10_small_wh 
    with warehouse_size = SMALL
    auto_suspend = 60
    auto_resume = true
    initially_suspended = true 
    statement_timeout_in_seconds = 3600
    comment = 'Frosty Friday Week10 検証用'
;

In [None]:
create or replace file format ff_csv_format
  type = CSV
  skip_header = 1
  null_if = ('\\N', 'NULL', 'NUL', '')
  field_optionally_enclosed_by = '"'
  skip_blank_lines = true -- default false, 空白行をスキップ 
  trim_space = true -- default false, |" Hello world "|  /* loads as */  > Hello world < | "Hello world" |  /* loads as */  >Hello world<
  error_on_column_count_mismatch = true -- default true : 入力ファイルの区切り列（フィールド）の数が対応するテーブルの列の数と一致しない場合に、解析エラーを生成するかどうかを指定するブール値
  replace_invalid_characters = true -- default false
  empty_field_as_null = true -- default true
;

create or replace stage ff_week_10_frosty_stage_
    url = 's3://frostyfridaychallenges/challenge_10/'
    file_format = (type = csv)
;
list @ff_week_10_frosty_stage_;

In [None]:
select $1, $2 from @ff_week_10_frosty_stage_/2022-07-01.csv limit 5;

In [None]:
create or replace stage ff_week_10_frosty_stage
    url = 's3://frostyfridaychallenges/challenge_10/'
    file_format = ff_csv_format
;

list @ff_week_10_frosty_stage;

In [None]:
create or replace file format ff_csv_format_for_inferschema
  type = CSV
  parse_header = true
  null_if = ('\\N', 'NULL', 'NUL', '')
  field_optionally_enclosed_by = '"'
  skip_blank_lines = true -- default false, 空白行をスキップ 
  trim_space = true -- default false, |" Hello world "|  /* loads as */  > Hello world < | "Hello world" |  /* loads as */  >Hello world<
  error_on_column_count_mismatch = false -- default true : 入力ファイルの区切り列（フィールド）の数が対応するテーブルの列の数と一致しない場合に、解析エラーを生成するかどうかを指定するブール値
  -- copy into include_metadata を使うには、falseにする必要がある
  replace_invalid_characters = true -- default false
  empty_field_as_null = true -- default true
;
-- infer_schema用のStage
create or replace stage ff_week_10_frosty_stage_for_inferschema
    url = 's3://frostyfridaychallenges/challenge_10/'
    file_format = ff_csv_format_for_inferschema
;

list @ff_week_10_frosty_stage_for_inferschema;

select $1, $2, $3, $4, $5 from @ff_week_10_frosty_stage_ limit 5;

select $1, $2, $3, $4, $5, metadata$filename, metadata$file_row_number, metadata$start_scan_time from @ff_week_10_frosty_stage_ limit 5;

In [None]:
select 
--    *
    column_name
    , type
    , nullable
    , order_id
from
    table(infer_schema(
        location=>'@ff_week_10_frosty_stage_for_inferschema'
        , file_format=>'ff_csv_format_for_inferschema'
    ));

In [None]:
select 
    array_agg(object_construct('COLUMN_NAME', column_name, 'TYPE', type, 'NULLABLE', nullable, 'ORDER_ID', order_id )) 
    -- * にすると16MBを超える場合もあるので、カラムを絞る

    -- https://docs.snowflake.com/en/sql-reference/functions/infer_schema
    -- Using * for ARRAY_AGG(OBJECT_CONSTRUCT()) may result in an error if the returned result is larger than 16MB. 
from
    table(infer_schema(
        location=>'@ff_week_10_frosty_stage_for_inferschema'
        , file_format=>'ff_csv_format_for_inferschema'
    ));

In [None]:
select 
    array_append (
    array_append (
    array_append (
        array_agg(object_construct('COLUMN_NAME', column_name, 'TYPE', type, 'NULLABLE', nullable )) 
        , {'COLUMN_NAME':'filename', 'TYPE':'string', 'NULLABLE':true}::variant
    )
        , {'COLUMN_NAME':'file_row_number', 'TYPE':'number', 'NULLABLE':true}::variant

    )
        , {'COLUMN_NAME':'start_scan_time', 'TYPE':'TIMESTAMP_LTZ', 'NULLABLE':true}::variant
    )

    -- * にすると16MBを超える場合もあるので、カラムを絞る

    -- https://docs.snowflake.com/en/sql-reference/functions/infer_schema
    -- Using * for ARRAY_AGG(OBJECT_CONSTRUCT()) may result in an error if the returned result is larger than 16MB. 
from
    table(infer_schema(
        location=>'@ff_week_10_frosty_stage_for_inferschema'
        , file_format=>'ff_csv_format_for_inferschema'
    ));

In [None]:
-- create table
create or replace transient table week10_tbl
    using template (
select 
    array_cat(
        array_agg(object_construct('COLUMN_NAME', column_name, 'TYPE', type, 'NULLABLE', nullable )) 
        -- * にすると16MBを超える場合もあるので、カラムを絞る

        -- https://docs.snowflake.com/en/sql-reference/functions/infer_schema
        -- Using * for ARRAY_AGG(OBJECT_CONSTRUCT()) may result in an error if the returned result is larger than 16MB.
        , [
            {'COLUMN_NAME':'FILENAME', 'TYPE':'STRING', 'NULLABLE':true}
            , {'COLUMN_NAME':'FILE_ROW_NUMBER', 'TYPE':'NUMBER', 'NULLABLE':true}
            , {'COLUMN_NAME':'START_SCAN_TIME', 'TYPE':'TIMESTAMP_LTZ', 'NULLABLE':true}
        ]::variant
    )
from
    table(infer_schema(
        location=>'@ff_week_10_frosty_stage_for_inferschema'
        , file_format=>'ff_csv_format_for_inferschema'
        , ignore_case => true -- 大文字小文字が区別されないで、すべての列名は大文字になる
        , max_records_per_file => 10000
    )))
;

desc table week10_tbl;

copy into week10_tbl 
from 
    @ff_week_10_frosty_stage_for_inferschema
match_by_column_name = case_insensitive
include_metadata = (
   filename = METADATA$FILENAME
   , file_row_number = METADATA$FILE_ROW_NUMBER
   , start_scan_time = METADATA$START_SCAN_TIME
)
;

In [None]:
select * from week10_tbl limit 10;

In [None]:
create or replace procedure dynamic_warehouse_data_load(stage_name string, table_name string)
    returns table(value string)
    language sql
    execute as caller
as
    $$
        declare
            log_array array default ARRAY_CONSTRUCT();
        begin
            
            -- stage上のファイルの情報を取得
            execute immediate 'list @' || :stage_name;
            let result_set_ls resultset := (select $1 as name , $2 as size from table(result_scan(last_query_id())));
            let cur cursor for result_set_ls;

            for t in cur do
                let name string := t.name;
                let size number := t.size;
                if (size < 10240 ) then
                    use warehouse ff_week10_xsmall_wh;
                else
                    use warehouse ff_week10_small_wh;
                end if;
                -- let sql string := 'copy into ' || :table_name || ' from @' || :stage_name || ' files = (''' || split_part(:name, '/', -1) || ''' )';
                -- infer_schema、include_metadata を使う形で作り直し
                let sql string := 'copy into ' || :table_name || ' from @' || :stage_name 
                || ' files = (''' || split_part(:name, '/', -1) || ''' )' 
                || ' match_by_column_name = case_insensitive'
                || ' include_metadata = (filename = METADATA$FILENAME, file_row_number = METADATA$FILE_ROW_NUMBER, start_scan_time = METADATA$START_SCAN_TIME)';
--                log_array := array_append(:log_array, 'sql :' || :sql );
                execute immediate :sql;
            end for;

            let loaded_total number;
            select count(*) into :loaded_total from week10_tbl;
            log_array := array_append(:log_array, 'loaded_total :' || :loaded_total );

            let rs resultset := (select value::string as value from table(flatten(input => :log_array)));
            return table(rs);
        end;
    $$
;

In [None]:
truncate table week10_tbl;
call dynamic_warehouse_data_load('ff_week_10_frosty_stage_for_inferschema', 'week10_tbl');

In [None]:
select query_id, query_text, warehouse_name from table(information_schema.query_history_by_session()) where startswith(query_text, 'copy into week10_tbl from') order by start_time desc limit 100;