## TPC-DS Snowflake Import
Import data from GCS into Snowflake dataset

In [1]:
import config, sf

### Start Snowflake WAREHOUSE

In [2]:
# initiate SnowflakeHelper with Test type and dataset size specified
sf_helper = sf.SnowflakeHelper('ds', '100GB', config)

# start Warehouse
sf_helper.warehouse_start()

preparing to open connection to Snowflake
connection opened
running query: ALTER WAREHOUSE TEST1 RESUME;
warehouse start: Statement executed successfully.
running query: USE WAREHOUSE TEST1
result: Statement executed successfully.
running query: USE DATABASE SF_TUTS
result: Statement executed successfully.
running query: USE ROLE ACCOUNTADMIN
result: Statement executed successfully.


### Setup STAGE: Link to GCS data source

In [3]:
if not sf_helper.is_integrated():
    integration_id = sf_helper.create_integration(is_dry_run=True)
    print(f'integrated with gcs: {integration_id}')

create or replace file format csv_file_format
            type = csv
            field_delimiter = '|'
            skip_header = 1
            null_if = ('NULL', 'null')
            empty_field_as_null = true
            compression = none;


--integrating "gcs_ds_100GB_integration" ... 
CREATE STORAGE INTEGRATION gcs_ds_100GB_integration TYPE=EXTERNAL_STAGE STORAGE_PROVIDER=GCS ENABLED=TRUE STORAGE_ALLOWED_LOCATIONS=('gcs://tpc-benchmark-5947/');
GRANT USAGE on INTEGRATION gcs_ds_100GB_integration to ROLE ACCOUNTADMIN;
CREATE STAGE gcs_ds_100GB_integration_stage URL='gcs://tpc-benchmark-5947' STORAGE_INTEGRATION=gcs_ds_100GB_integration FILE_FORMAT=csv_file_format;
list  @gcs_ds_100GB_integration_stage;
--finished staging "gcs_ds_100GB_integration"


integrated with gcs: gcs_ds_100GB_integration


### Test STAGE

In [4]:
db_files = sf_helper.list_integration(integration_id)
for table, files in db_files.items():
    print(f'{table}:')
    for file in files:
        print(f'\t{file}')
    print(f'\tmissing {sf_helper.gcs_file_range - len(files)} files\n\n')

call_center:
	gcs://tpc-benchmark-5947/ds_100GB_call_center_1_96.dat
	missing 95 files


catalog_returns:
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_10_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_13_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_15_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_16_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_17_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_18_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_19_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_20_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_21_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_22_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_24_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_25_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_29_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_30_96.dat
	gcs://tpc-benchmark-5947/ds_100GB_catal

### Import Data from STAGE to target table

In [None]:
for table, files in db_files.items():
    print(f'Starting to import table: {table}')
    for file in files:
        print(f'\timporting file: {file}')
        sf_helper.import_data(table, file, integration_id)

Starting to import table: call_center
	importing file: gcs://tpc-benchmark-5947/ds_100GB_call_center_1_96.dat
running query: copy into call_center from 'gcs://tpc-benchmark-5947/ds_100GB_call_center_1_96.dat' storage_integration=gcs_ds_100GB_integration file_format=(format_name=csv_file_format);
result Copy executed with 0 files processed.
Starting to import table: catalog_returns
	importing file: gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_10_96.dat
running query: copy into catalog_returns from 'gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_10_96.dat' storage_integration=gcs_ds_100GB_integration file_format=(format_name=csv_file_format);
result Copy executed with 0 files processed.
	importing file: gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_13_96.dat
running query: copy into catalog_returns from 'gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_13_96.dat' storage_integration=gcs_ds_100GB_integration file_format=(format_name=csv_file_format);
result Copy executed with

result gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_50_96.dat
	importing file: gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_53_96.dat
running query: copy into catalog_returns from 'gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_53_96.dat' storage_integration=gcs_ds_100GB_integration file_format=(format_name=csv_file_format);
result gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_53_96.dat
	importing file: gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_56_96.dat
running query: copy into catalog_returns from 'gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_56_96.dat' storage_integration=gcs_ds_100GB_integration file_format=(format_name=csv_file_format);
result gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_56_96.dat
	importing file: gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_57_96.dat
running query: copy into catalog_returns from 'gcs://tpc-benchmark-5947/ds_100GB_catalog_returns_57_96.dat' storage_integration=gcs_ds_100GB_integration file_format=(format_nam

result gcs://tpc-benchmark-5947/ds_100GB_catalog_sales_21_96.dat
	importing file: gcs://tpc-benchmark-5947/ds_100GB_catalog_sales_28_96.dat
running query: copy into catalog_sales from 'gcs://tpc-benchmark-5947/ds_100GB_catalog_sales_28_96.dat' storage_integration=gcs_ds_100GB_integration file_format=(format_name=csv_file_format);
result gcs://tpc-benchmark-5947/ds_100GB_catalog_sales_28_96.dat
	importing file: gcs://tpc-benchmark-5947/ds_100GB_catalog_sales_30_96.dat
running query: copy into catalog_sales from 'gcs://tpc-benchmark-5947/ds_100GB_catalog_sales_30_96.dat' storage_integration=gcs_ds_100GB_integration file_format=(format_name=csv_file_format);
result gcs://tpc-benchmark-5947/ds_100GB_catalog_sales_30_96.dat
	importing file: gcs://tpc-benchmark-5947/ds_100GB_catalog_sales_35_96.dat
running query: copy into catalog_sales from 'gcs://tpc-benchmark-5947/ds_100GB_catalog_sales_35_96.dat' storage_integration=gcs_ds_100GB_integration file_format=(format_name=csv_file_format);
resu

### Suspend WAREHOUSE

In [6]:
sf_helper.warehouse_suspend()

warehouse suspend: Statement executed successfully.
