In [1]:
#Imports and Declarations

In [2]:
# Credentials import
from config import *

In [3]:
# Imports
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
from snowflake.snowpark.types import *
from datetime import date

In [4]:
# Function to create a session, it uses the imported credntials values
def snowpark_session_create():
    connection_params = {
        "account": account_id,
        "user": username,
        "password": pasword,
        "role": role,
        "warehouse": warehouse
    }
    session = Session.builder.configs(connection_params).create()
    return session

In [5]:
# Creation of the session
demo_session = snowpark_session_create()

In [6]:
df = demo_session.sql('SELECT * FROM PRODUCTION_BIZ_OPS.AWS_EXTERNAL.CSV_ENTITLEMENT_TEST_2')
#df = demo_session.sql('SELECT * FROM PRODUCTION_BIZ_OPS.AWS_EXTERNAL.SNOWPIPEPOC')

In [7]:
df.show()

------------------------------------------------------------------------------
|"VALUE"         |"Field_1"  |"Field_2"  |"Field_3"  |"Field_4"  |"Field_5"  |
------------------------------------------------------------------------------
|{               |AAA        |BBB        |CCC        |DDD        |EEE        |
|  "c1": "AAA",  |           |           |           |           |           |
|  "c2": "BBB",  |           |           |           |           |           |
|  "c3": "CCC",  |           |           |           |           |           |
|  "c4": "DDD",  |           |           |           |           |           |
|  "c5": "EEE"   |           |           |           |           |           |
|}               |           |           |           |           |           |
|{               |aa1        |bb1        |cc1        |dd1        |ee1        |
|  "c1": "aa1",  |           |           |           |           |           |
|  "c2": "bb1",  |           |           |          

In [8]:
# Point session to the desired database (External table)
demo_session.use_database("PRODUCTION_BIZ_OPS")

# Point session to the desired schema
demo_session.use_schema("AWS_EXTERNAL")

In [9]:
# Define the Schema
schema = StructType(
    [StructField("Field_1", StringType()),
     StructField("Field_2", StringType()),
     StructField("Field_3", StringType()),
     StructField("Field_4", StringType()),
     StructField("Field_5", StringType())
    ]
)

# Read the CSV files
df_csv = demo_session.read.schema(schema).csv("@AWS_EXTERNAL_TABLES/csv/test/")

In [10]:
df_csv.show()

-------------------------------------------------------------
|"FIELD_1"  |"FIELD_2"  |"FIELD_3"  |"FIELD_4"  |"FIELD_5"  |
-------------------------------------------------------------
|Field_1    |Field_2    |Field_3    |Field_4    |Field_5    |
|AAA        |BBB        |CCC        |DDD        |EEE        |
|aa1        |bb1        |cc1        |dd1        |ee1        |
|aa2        |bb2        |NULL       |NULL       |NULL       |
|aa3        |bb3        |cc3        |dd3        |ee3        |
|aa4        |bb4        |cc4        |NULL       |NULL       |
|Field_1    |Field_2    |Field_3    |Field_4    |Field_5    |
|AAA        |BBB        |CCC        |DDD        |EEE        |
|aa1        |bb1        |cc1        |dd1        |ee1        |
|aa2        |bb2        |NULL       |NULL       |NULL       |
-------------------------------------------------------------



In [11]:
#df_csv_2 = demo_session.read.option("INFER_SCHEMA", True).option("PARSE_HEADER", True).option("ERROR_ON_COLUMN_COUNT_MISMATCH", False).option("EMPTY_FIELD_AS_NULL", True).csv("@AWS_EXTERNAL_TABLES/csv/test/")

In [12]:
#df_csv_2.show()

In [13]:
df = demo_session.table("CSV_ENTITLEMENT_TEST_2")

df.columns

df = df.select(
    "Value",
    '"Field_1"',
    '"Field_2"',
    '"Field_3"',
    '"Field_4"',
    '"Field_5"'
)#.filter((F.col("Field_1").contains("A")))

In [14]:
df.show()

------------------------------------------------------------------------------
|"VALUE"         |"Field_1"  |"Field_2"  |"Field_3"  |"Field_4"  |"Field_5"  |
------------------------------------------------------------------------------
|{               |AAA        |BBB        |CCC        |DDD        |EEE        |
|  "c1": "AAA",  |           |           |           |           |           |
|  "c2": "BBB",  |           |           |           |           |           |
|  "c3": "CCC",  |           |           |           |           |           |
|  "c4": "DDD",  |           |           |           |           |           |
|  "c5": "EEE"   |           |           |           |           |           |
|}               |           |           |           |           |           |
|{               |aa1        |bb1        |cc1        |dd1        |ee1        |
|  "c1": "aa1",  |           |           |           |           |           |
|  "c2": "bb1",  |           |           |          

In [15]:
df = demo_session.table("SNOWPIPEPOC")

df.columns

['"Field_1"', '"Field_2"', '"Field_3"', '"Field_4"', '"Field_5"']

In [16]:
df.show()

-------------------------------------------------------------
|"Field_1"  |"Field_2"  |"Field_3"  |"Field_4"  |"Field_5"  |
-------------------------------------------------------------
|AAA        |BBB        |CCC        |DDD        |EEE        |
|aa1        |bb1        |cc1        |dd1        |ee1        |
|aa2        |bb2        |NULL       |NULL       |NULL       |
|aa3        |bb3        |cc3        |dd3        |ee3        |
|aa4        |bb4        |cc4        |NULL       |NULL       |
|AAA        |BBB        |CCC        |DDD        |EEE        |
|aa1        |bb1        |cc1        |dd1        |ee1        |
|aa2        |bb2        |cc2        |dd2        |ee2        |
|aa3        |bb3        |cc3        |dd3        |ee3        |
|aa4        |bb4        |cc4        |dd4        |ee4        |
-------------------------------------------------------------



In [19]:
df_transformed = df.distinct()

for i in df_transformed.columns:
    df_transformed = df_transformed.withColumn(i, F.lower(df_transformed[i]))

#df_transformed.columns

df_transformed = df_transformed.sort(df_transformed['"Field_2"'].asc())

#df_transformed = df_transformed.filter(F.col('"Field_2"').contains('2'))

df_null = [F.count(F.when(F.col(i).isNull(), 1)).alias(i + '_null_count') for i in df_transformed.columns]
df_null_count = df_transformed.agg(*df_null)


#df_transformed.show()
df_null_count.show()

----------------------------------------------------------------------------------------------------------------------------------------
|"""Field_1""_null_count"  |"""Field_2""_null_count"  |"""Field_3""_null_count"  |"""Field_4""_null_count"  |"""Field_5""_null_count"  |
----------------------------------------------------------------------------------------------------------------------------------------
|0                         |0                         |1                         |2                         |2                         |
----------------------------------------------------------------------------------------------------------------------------------------



In [20]:
demo_session.close()