In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *

import pandas as pd

# import matplotlib.pyplot as plt

# %matplotlib inline
import datetime as dt
import numpy as np
import seaborn as sns

#Snowflake connection info is saved in config.py
from config import snowflake_conn_prop


# lets import some tranformations functions
from snowflake.snowpark.functions import udf, col, lit, translate, is_null, iff



In [2]:
from snowflake.snowpark import version
print(version.VERSION)
#session.close()
session = Session.builder.configs(snowflake_conn_prop).create()
session.sql("use role accountadmin").collect()
session.sql("create database if not exists  {}".format(snowflake_conn_prop['database'])).collect()
session.sql("use database {}".format(snowflake_conn_prop['database'])).collect()
session.sql("create schema if not exists {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("use schema {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("create warehouse if not exists {} with \
                WAREHOUSE_SIZE = XSMALL \
                AUTO_SUSPEND = 120 \
                AUTO_RESUME = TRUE".format(snowflake_conn_prop['warehouse'])).collect()
session.sql("use warehouse {}".format(snowflake_conn_prop['warehouse']))
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

(0, 8, 0)
[Row(CURRENT_WAREHOUSE()='DVD_FROSTY_FRIDAYS_SNOWPARK', CURRENT_DATABASE()='TEST_DB', CURRENT_SCHEMA()='DVD_FROSTY_FRIDAYS_SNOWPARK')]


In [3]:
session.get_current_database(), session.get_fully_qualified_current_schema()

('"TEST_DB"', '"TEST_DB"."DVD_FROSTY_FRIDAYS_SNOWPARK"')

In [4]:
def create_stage(session, database, schema, name, additional_info):
    sql = f'create stage if not exists {database}.{schema}.{name} {additional_info}'
    session.sql(sql).collect()

In [8]:
url_stage = 's3://frostyfridaychallenges/challenge_1/'
create_stage(session,
             session.get_current_database(), 
             session.get_current_schema(),
             "SNOWPARK_FFSP_01",
             f"url='{url_stage}' file_format=(type=csv SKIP_HEADER =1)")

##### Single column import to explore data

In [7]:
user_schema = StructType([StructField("VALUE", StringType())])
df = session.read.options({"field_delimiter": "\0", "skip_header": 1}).schema(user_schema).csv("@SNOWPARK_FF_01")
df.show()

--------------------
|"VALUE"           |
--------------------
|right             |
|NULL              |
|totally_empty     |
|congratulations!  |
|you               |
|have              |
|gotten            |
|it                |
--------------------



##### Proper types and columns

In [8]:
#user_schema = StructType([StructField("metadata$filename", StringType())])


In [9]:
user_schema = StructType([
    StructField("WORD", StringType())
])
# ,
#     StructField("FILE_ROW_NUMBER", StringType()),
#     StructField("WORD", StringType())
# ])
df = session.read.options({"field_delimiter": "\0", "skip_header": 1})\
            .schema(user_schema).csv("@SNOWPARK_FFSP_01")


In [17]:
q0 = df._plan.queries[1]
q0.sql

' SELECT $1::STRING AS "WORD" FROM @SNOWPARK_FF_01( FILE_FORMAT  => \'"TEST_DB"."DVD_FROSTYFRIDAYS_SPARK".SNOWPARK_TEMP_FILE_FORMAT_522CLB55WX\')'

In [23]:
session.sql(f'drop table if exists {session.get_fully_qualified_current_schema()}.CH_01').collect()

[Row(status='CH_01 successfully dropped.')]

In [51]:
df_lines.queries['queries'][1]

'SELECT metadata$filename AS "FILENAME", metadata$file_row_number AS "FILE_ROW_NUMBER", $1::STRING AS "WORD" FROM @SNOWPARK_FF_01/1.csv( FILE_FORMAT  => \'"TEST_DB"."DVD_FROSTYFRIDAYS_SPARK".SNOWPARK_TEMP_FILE_FORMAT_5PAUQXGYTC\')'

In [24]:
staged_files = session.sql("list @SNOWPARK_FFSP_01")

file_names_list = staged_files.select(col('"name"')).collect()

mode_loop = {0:"overwrite"}
loop = 0
for file in file_names_list:
    print(file.asDict()['name'])
    df_lines = session.read.options({"field_delimiter": "\0", "skip_header": 1})\
        .schema(user_schema).csv("@SNOWPARK_FF_01/" + file.asDict()['name'].replace(url_stage,''))

#    df_lines = session.read.schema(user_schema).csv("@SNOWPARK_FF_01/" + file.asDict()['name'].replace(url_stage,''))

    for i,q in enumerate(df_lines._plan.queries):
        print('before')
        print(i, q.sql)
    df_lines._plan.queries[1].sql = df_lines.queries['queries'][1].replace('SELECT ', 'SELECT metadata$filename AS "FILENAME", metadata$file_row_number AS "FILE_ROW_NUMBER", ')
    for i,q in enumerate(df_lines._plan.queries):
        print('after')
        print(i, q.sql)
    df_lines.write.save_as_table([session.get_fully_qualified_current_schema(), 'CH_01'],
                                  mode=mode_loop.get(loop,'append')
                                )
    loop += 1

s3://frostyfridaychallenges/challenge_1/1.csv
before
0  CREATE  TEMPORARY  FILE  FORMAT  If  NOT  EXISTS "TEST_DB"."DVD_FROSTY_FRIDAYS_SNOWPARK".SNOWPARK_TEMP_FILE_FORMAT_RTTJXZVKWT TYPE  = csv  FIELD_DELIMITER = '\x00' SKIP_HEADER = 1 
before
1  SELECT $1::STRING AS "WORD" FROM @SNOWPARK_FF_01/1.csv( FILE_FORMAT  => '"TEST_DB"."DVD_FROSTY_FRIDAYS_SNOWPARK".SNOWPARK_TEMP_FILE_FORMAT_RTTJXZVKWT')
after
0  CREATE  TEMPORARY  FILE  FORMAT  If  NOT  EXISTS "TEST_DB"."DVD_FROSTY_FRIDAYS_SNOWPARK".SNOWPARK_TEMP_FILE_FORMAT_RTTJXZVKWT TYPE  = csv  FIELD_DELIMITER = '\x00' SKIP_HEADER = 1 
after
1 SELECT metadata$filename AS "FILENAME", metadata$file_row_number AS "FILE_ROW_NUMBER", $1::STRING AS "WORD" FROM @SNOWPARK_FF_01/1.csv( FILE_FORMAT  => '"TEST_DB"."DVD_FROSTY_FRIDAYS_SNOWPARK".SNOWPARK_TEMP_FILE_FORMAT_RTTJXZVKWT')
s3://frostyfridaychallenges/challenge_1/2.csv
before
0  CREATE  TEMPORARY  FILE  FORMAT  If  NOT  EXISTS "TEST_DB"."DVD_FROSTY_FRIDAYS_SNOWPARK".SNOWPARK_TEMP_FILE_FORMAT_

In [25]:
t = session.table([session.get_fully_qualified_current_schema(), 'CH_01'])
t.collect()

[Row(FILENAME='challenge_1/1.csv', FILE_ROW_NUMBER=1, WORD='you'),
 Row(FILENAME='challenge_1/1.csv', FILE_ROW_NUMBER=2, WORD='have'),
 Row(FILENAME='challenge_1/1.csv', FILE_ROW_NUMBER=3, WORD='gotten'),
 Row(FILENAME='challenge_1/2.csv', FILE_ROW_NUMBER=1, WORD='it'),
 Row(FILENAME='challenge_1/3.csv', FILE_ROW_NUMBER=1, WORD='right'),
 Row(FILENAME='challenge_1/3.csv', FILE_ROW_NUMBER=2, WORD='NULL'),
 Row(FILENAME='challenge_1/3.csv', FILE_ROW_NUMBER=3, WORD='totally_empty'),
 Row(FILENAME='challenge_1/3.csv', FILE_ROW_NUMBER=4, WORD='congratulations!')]