# Capstone ファイルストリーム設定

このノートブックは、S3バケットへのファイルストリーミングを設定するためのSQLクエリを含んでいます。`setup_capstone_file_stream.sql`の処理をステップごとに実行し、解説を加えます。

## 1. コンテキストの設定

まず、使用するロール、ウェアハウス、データベース、スキーマを設定します。ご自身の環境に合わせて値を変更してください。

In [None]:
use role sysadmin;  -- replace with your role
use warehouse compute_wh_xl;  -- replace with your warehouse
use database capstone; -- replace with your database
use schema capstone.public;  -- replace with your schema

## 2. ストアドプロシージャの作成

データストリーミングに必要な2つのストアドプロシージャを作成します。

### 2.1. `CHECK_S3_WRITE` ストアドプロシージャ

このプロシージャは、指定されたS3バケットにファイルを書き込むことができるかテストするために使用します。

In [None]:
create or replace procedure check_s3_write 
(v_kapa_offset integer, v_raw_table varchar, v_stage varchar, v_json_ff varchar, v_json_data varchar)
returns varchar(2000)
language sql
as
$$
declare
  dynamic_query varchar(2000);
  error_message varchar(2000);
--   kapa_offset number;
begin
  dynamic_query :=   
  'COPY INTO @'||v_stage||'/kapa-stream/ '||
  'FROM ('||
  'SELECT distinct OBJECT_CONSTRUCT_KEEP_NULL('||
  '''gs'', '||v_json_data||':"gs"::STRING '||  
  ', ''heading'', '||v_json_data||':"heading"::STRING'||
  ', ''baro_alt'', '||v_json_data||':"baro_alt"::STRING'||
  ', ''squawk'', '||v_json_data||':"squawk"::STRING'||
  ', ''alt'', '||v_json_data||':"alt"::STRING'||
  ', ''lon'', '||v_json_data||':"lon"::STRING'||
  ', ''facility_name'', '||v_json_data||':"facility_name"::STRING'||
  ', ''gps_alt'', '||v_json_data||':"gps_alt"::STRING'||
  ', ''pitr'', '||v_json_data||':"pitr"::STRING'||
  ', ''id'', '||v_json_data||':"id"::STRING'||
  ', ''hexid'', '||v_json_data||':"hexid"::STRING'||
  ', ''facility_hash'', '||v_json_data||':"facility_hash"::STRING'||
  ', ''ident'', '||v_json_data||':"ident"::STRING'||
  ', ''lat'', '||v_json_data||':"lat"::STRING'||
  ', ''type'', '||v_json_data||':"type"::STRING'||
  ', ''updateType'', '||v_json_data||':"updateType"::STRING'||
  ', ''air_ground'', '||v_json_data||':"air_ground"::STRING'||
  ', ''clock'', '||v_json_data||':"clock"::number+'||to_char(v_kapa_offset+28500)||
  ') AS flight_obj '||
  'FROM '||v_raw_table||
  ' where '||v_json_data||':clock::timestamp between timestampadd(''SECONDS'', '||to_char(-v_kapa_offset)||', current_timestamp())'||
  'and timestampadd(''SECONDS'', '||to_char(-v_kapa_offset+300)||', current_timestamp()))'||
  'PARTITION BY ( ''year=''|| date_part(''YEAR'', flight_obj:clock::timestamp)||''/month=''|| lpad(date_part(''MONTH'',flight_obj:clock::timestamp), ''2'', ''0'')||''/day=''|| lpad(date_part(''DAY'',flight_obj:clock::timestamp), ''2'', ''0'')) FILE_FORMAT = '''||v_json_ff||''';';

  begin
    execute immediate :dynamic_query;
    return 'COPY command executed successfully. Check your bucket for the file.';
  exception
    WHEN OTHER THEN
      -- If an exception occurs, capture the error message
      error_message := SQLERRM;
      RETURN 'Error executing query: ' || error_message;
  end
  ;
  -- return dynamic_query;
end;
$$
;


### 2.2. `CREATE_STREAM_TASK` ストアドプロシージャ

このプロシージャは、5〜6分ごとにS3バケットにファイルをストリーミングするタスクを作成します。

In [None]:
create or replace procedure create_stream_task 
(v_db_schema varchar, v_raw_table varchar, v_kapa_offset number, v_stage varchar, v_json_ff varchar, v_json_data varchar)
returns varchar(2000)
language sql
as
$$
declare
  dynamic_query varchar(2000);
  error_message varchar(2000);
begin
  dynamic_query := 
  'create or replace task '||v_db_schema||'.publish_kapa_adsb_data '||
  'SCHEDULE = ''6 MINUTES'' '||
  'USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = ''XSMALL'' '||
  'as '||
  'COPY INTO @'||v_stage||'/kapa-stream/ '||
  'FROM ('||
  'SELECT distinct OBJECT_CONSTRUCT_KEEP_NULL('||
  '''gs'', '||v_json_data||':"gs"::STRING '||  
  ', ''heading'', '||v_json_data||':"heading"::STRING'||
  ', ''baro_alt'', '||v_json_data||':"baro_alt"::STRING'||
  ', ''squawk'', '||v_json_data||':"squawk"::STRING'||
  ', ''alt'', '||v_json_data||':"alt"::STRING'||
  ', ''lon'', '||v_json_data||':"lon"::STRING'||
  ', ''facility_name'', '||v_json_data||':"facility_name"::STRING'||
  ', ''gps_alt'', '||v_json_data||':"gps_alt"::STRING'||
  ', ''pitr'', '||v_json_data||':"pitr"::STRING'||
  ', ''id'', '||v_json_data||':"id"::STRING'||
  ', ''hexid'', '||v_json_data||':"hexid"::STRING'||
  ', ''facility_hash'', '||v_json_data||':"facility_hash"::STRING'||
  ', ''ident'', '||v_json_data||':"ident"::STRING'||
  ', ''lat'', '||v_json_data||':"lat"::STRING'||
  ', ''type'', '||v_json_data||':"type"::STRING'||
  ', ''updateType'', '||v_json_data||':"updateType"::STRING'||
  ', ''air_ground'', '||v_json_data||':"air_ground"::STRING'||
  ', ''clock'', '||v_json_data||':"clock"::number+'||to_char(v_kapa_offset+28500)||
  ') AS flight_obj '||
  'FROM '||v_raw_table||
  ' where '||v_json_data||':clock::timestamp between timestampadd(''SECONDS'', '||to_char(-v_kapa_offset)||', current_timestamp())'||
  'and timestampadd(''SECONDS'', '||to_char(-v_kapa_offset+300)||', current_timestamp()))'||
  'PARTITION BY ( ''year=''|| date_part(''YEAR'', flight_obj:clock::timestamp)||''/month=''|| lpad(date_part(''MONTH'',flight_obj:clock::timestamp), ''2'', ''0'')||''/day=''|| lpad(date_part(''DAY'',flight_obj:clock::timestamp), ''2'', ''0'')) FILE_FORMAT = '''||v_json_ff||''';';

  begin
    execute immediate :dynamic_query;
    return 'CREATE TASK command executed successfully. Remember to Resume the Task and check status.';
  exception
    WHEN OTHER THEN
      error_message := SQLERRM;
      RETURN 'Error executing query: ' || error_message;
  end;
end;
$$;

## 3. SQL変数の設定

ワークシートで使用するSQL変数を設定します。ご自身の環境に合わせて値を変更してください。

In [None]:
set my_db_schema = 'capstone.public';  -- your capstone database and schema
set my_raw_table = 'capstone.public.kapa_raw';  -- fully qualified name of the table where your raw kapa data resides
set my_stage = 'CAPSTONE.PUBLIC.CAPSTONE_S3_STAGE';  -- fully qualified name of the external stage to your bucket
set my_json_data = 'v';  -- name of the variant column in your raw table that holds the kapa json data 
set my_json_ff = 'capstone.public.my_json_ff';  -- fully qualified name of your JSON file format
set kapa_offset = 0; -- kapa offset for resetting replay date

## 4. Kapaオフセット値の取得と設定

ストリーミングされるファイルは、バケットに既に存在する過去のKAPAファイルのリプレイです。このコードは、raw KAPAテーブルが既に作成され、生データがコピーされていることを前提としています。2022年5月13日からファイルのリプレイを開始し、KAPA_OFFSET変数を使用して、ファイルのリプレイをその時点に戻します。

In [None]:
-- Get the time difference between today and 13-May-2022.  You will
-- use this number as the time offset to start replaying files from that time.
-- Make the following changes to the sql below:
--   "v" = column name of the json variant column in your KAPA_RAW table
--   "kapa_raw" = the name of your raw kapa table
--   "file_name" = the name of the column that contains the File Name 
select timediff('SECONDS', min(v:clock::timestamp), current_timestamp())
from kapa_raw
where file_name = 'kapa-0001/year=2022/month=05/day=13/kapa-0001+0+0000017717.json.gz';

上記のクエリから得られた値をコピーして、以下の`kapa_offset`変数に貼り付けて実行してください。

In [None]:
set kapa_offset = 96990315; -- Paste the value from the query above

## 5. S3書き込みテストの実行

`CHECK_S3_WRITE`プロシージャを実行して、ファイルを書き込めるかテストします。

プロシージャが正常に実行されると、バケット内の `kapa-streams` フォルダに、`kapa-0001` と同じ `year/month/day` のフォルダ構造でファイルが作成されます。テストで作成されたファイルは、次のステップに進む前に削除してください。

In [None]:
call check_s3_write
(
  $kapa_offset,
  $my_raw_table,
  $my_stage,
  $my_json_ff,
  $my_json_data
);

## 6. ストリームタスクの作成

`CREATE_STREAM_TASK`プロシージャを実行して、ストリームタスクを作成します。

これを実行する前に、ご自身のロールに `EXECUTE MANAGED STREAM` と `EXECUTE STREAM` の権限があることを確認してください。正常に実行されると、6分ごとにバケットに新しいファイルを作成するタスクができます。

In [None]:
call create_stream_task
(
  $my_db_schema,
  $my_raw_table, 
  $kapa_offset, 
  $my_stage, 
  $my_json_ff,
  $my_json_data
);

## 7. タスクの管理

作成後のタスクを管理するには、以下のコマンドを使用します。

### タスクの状態確認

タスクは作成後に一時停止状態になります。`show` または `desc` コマンドを使用して、タスクの状態を確認します（`suspended`になっているはずです）。

In [None]:
show tasks like 'publish_kapa_adsb_data';
desc task publish_kapa_adsb_data;

### タスクの再開と一時停止

`alter task`コマンドを使用して、タスクを再開および一時停止します。

In [None]:
alter task publish_kapa_adsb_data resume;
alter task publish_kapa_adsb_data suspend;

### 次回実行までの時間確認

次のタスクが実行されるまでの時間を確認するには、このSQLを実行します。

In [None]:
select timestampdiff(second, current_timestamp, scheduled_time) next_run, scheduled_time, name, state
from table(information_schema.task_history())
where state = 'SCHEDULED' 
and name in ('PUBLISH_KAPA_ADSB_DATA')
order by completed_time desc;

### タスク履歴の確認

このクエリは、タスクに関するすべての情報を提供します。

In [None]:
select name, state, error_code, error_message, scheduled_time, query_start_time, 
next_scheduled_time, completed_time
from table(information_schema.task_history())
where name in ('PUBLISH_KAPA_ADSB_DATA')
-- order by completed_time desc;

### サーバーレスタスクのコピー履歴確認

In [None]:
select *
  from table(information_schema.serverless_task_history(
    date_range_start=>dateadd(D, -7, current_date),
    date_range_end=>current_date,
    task_name=>'PUBLISH_KAPA_ADSB_DATA'));

---------

# Capstone ファイルストリーム管理

このノートブックは、作成済みのファイルストリーミングタスクを管理およびリセットするためのSQLクエリを含んでいます。`manage_capstone_file_stream.sql`の処理をステップごとに実行し、解説を加えます。

## 1. コンテキストの設定

まず、使用するロール、ウェアハウス、データベース、スキーマを設定します。ご自身の環境に合わせて値を変更してください。

In [None]:
use role capstone26_role;  -- replace with your role
use warehouse cap26_wh ;  -- replace with your warehouse
use database capstone26_db; -- replace with your database
use schema capstone26_db.prod;  -- replace with your schema

## 2. タスクの管理

作成後のタスクを管理するには、以下のコマンドを使用します。

### 2.1. タスクの状態確認

タスクは作成後に一時停止状態になります。`show` または `desc` コマンドを使用して、タスクの状態を確認します（`suspended`になっているはずです）。

In [None]:
show tasks like 'publish_kapa_adsb_data';
desc task publish_kapa_adsb_data;

### 2.2. タスクの再開と一時停止

`alter task`コマンドを使用して、タスクを再開および一時停止します。

In [None]:
alter task publish_kapa_adsb_data resume;
alter task publish_kapa_adsb_data suspend;

### 2.3. 次回実行までの時間確認

次のタスクが実行されるまでの時間を確認するには、このSQLを実行します。

In [None]:
select timestampdiff(second, current_timestamp, scheduled_time) next_run, scheduled_time, name, state
from table(information_schema.task_history())
where state = 'SCHEDULED' 
and name in ('PUBLISH_KAPA_ADSB_DATA')
order by completed_time desc;

select timestampdiff(second, current_timestamp, scheduled_time) next_run, scheduled_time, name, state
from table(information_schema.task_history())
where state = 'SCHEDULED' 
and name in ('PUBLISH_KAPA_ADSB_DATA')
order by completed_time desc;

### 2.4. タスク履歴の確認

このクエリは、タスクに関するすべての情報を提供します。

In [None]:
select name, state, error_code, error_message, scheduled_time, query_start_time, 
next_scheduled_time, completed_time
from table(information_schema.task_history())
where name in ('PUBLISH_KAPA_ADSB_DATA')
-- order by completed_time desc
;

### 2.5. サーバーレスタスクのコピー履歴確認

In [None]:
select *
  from table(information_schema.serverless_task_history(
    date_range_start=>dateadd(D, -7, current_date),
    date_range_end=>current_date,
    task_name=>'PUBLISH_KAPA_ADSB_DATA'));

## 3. ファイルストリームのリセット

リセットするには、次の手順を実行します:
1. タスクを一時停止して削除します。
2. Snowpipeを一時停止します。
3. `kapa-stage-raw` テーブルからレコードを削除します。

### 3.1. タスクの一時停止と削除

In [None]:
alter task capstone26_db.prod.publish_kapa_adsb_data suspend;
drop task capstone26_db.prod.publish_kapa_adsb_data;

### 3.2. Snowpipeの一時停止

In [None]:
alter pipe capstone26_db.prod.dkp_pipe set pipe_execution_paused = true;

### 3.3. 生データテーブルのクリーンアップ

In [None]:
select count(*)
from capstone26_db.prod.kapa_stream_raw;

In [None]:
delete from capstone26_db.prod.kapa_stream_raw;