# Snowpark for Python Advanced DEMO

## Imports

In [1]:
# Import login values from config
from config import *

In [2]:
# Import packages needed
from snowflake.snowpark import Session
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *
import pandas as pd

## Session

In [3]:
# Function for creating session object
def snowpark_session_create():
    connection_params = {
        "account": account_id,
        "user": username,
        "password": password,
        "role": role,
        "warehouse": "CHRISTOPHERMARLAND"
    }
    session = Session.builder.configs(connection_params).create()
    session.sql_simplifier_enabled = True
    return session

In [4]:
# Creation of session using the above function
demo_session = snowpark_session_create()

In [5]:
# TODO: Replace with values relevant to you
demo_session.use_database("")
demo_session.use_schema("")

## Look At Data

In [None]:
# TODO: Run the below after importing the data in Snowflake UI
sales = demo_session.table("RAW_CREDITCO_SALES")
sales.show()

In [None]:
sales_items = demo_session.table("RAW_CREDITCO_SALES_ITEMS")
sales_items.show()

## UDF

In [None]:
# Run the below to see the transformed data
sales_items_flat = (
    sales_items.join_table_function("flatten", col("data"))
    .withColumn("credit_card_number", json_extract_path_text("value", lit("creditCardNumber")))
    .withColumn("date_time", json_extract_path_text("value", lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value", lit("id"))))
    .withColumn("items", json_extract_path_text("value", lit("items")))
    .select("ingestion_id", "ingested_at", "credit_card_number", "date_time", "id", "items")
)

sales_items_flat.show()

In [None]:
# TODO: Create an interal stage for our UDF


In [None]:
# TODO: Place our UDF file in the internal stage


In [9]:
# TODO: Register the UDF from the stage


In [26]:
# TODO: Repeat the transformation above with the UDF


In [None]:
# TODO: .show()


In [37]:
# TODO: Save as a table called `sales_items`


## Merge

In [34]:
# Read CSV
new_sales_item_data = pd.read_csv("new_sales_item_data.csv")

In [None]:
# Print the pandas DF
new_sales_item_data

In [None]:
# Write pandas to `new_sales_item_data` as a temp table and show the new Snowpark DF

new_sales_item_data_sp = demo_session.write_pandas(
    df = new_sales_item_data,
    table_name = "new_sales_item_data",
    overwrite=True,
    auto_create_table=True,
    table_type="temp",
    quote_identifiers=False
)

new_sales_item_data_sp.show()

In [38]:
# TODO: Merge `sales_items` with `new_sales_item_data`


In [None]:
# TODO: Run the below to see how many rows were inserted, updated and deleted
rged

## Reading from Stages

In [None]:
# TODO: Put the two json files into our `demo stage`


In [None]:
# TODO: Create a DataFrameReader to read `new_sales_items.json`


## Automated ELT Snowpark

In [14]:
# Run the below to create our `transformed` table
extract_sales = (
    sales
    .join_table_function("flatten", col("data"))
    .withColumn("credit_card_number", json_extract_path_text("value", lit("creditCardNumber")))
    .withColumn("credit_card_type", json_extract_path_text("value", lit("creditCardType")))
    .withColumn("date_time", json_extract_path_text("value", lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value", lit("id"))))
    .withColumn("sale_amount", json_extract_path_text("value", lit("saleAmount")))
    .select("credit_card_number", "credit_card_type", "date_time", "id", "sale_amount")
)

extract_sales_items = (
    sales_items
    .join_table_function("flatten", col("data"))
    .withColumn("credit_card_number", json_extract_path_text("value", lit("creditCardNumber")))
    .withColumn("date_time", json_extract_path_text("value", lit("dateTime")))
    .withColumn("id", md5(json_extract_path_text("value", lit("id"))))
    .withColumn("items", sort_alphabetically(parse_json(json_extract_path_text("value", lit("items")))))
    .select("credit_card_number", "date_time", "id", "items")
)

join_sales_and_sales_items = (
    extract_sales.join(
        right= extract_sales_items,
        on= extract_sales.id == extract_sales_items.id,
        rsuffix = "_si"
    )
    .select(
        "credit_card_number",
        "date_time", "id", "items",
        "sale_amount"
    )
)

join_sales_and_sales_items.write.mode("overwrite").save_as_table(
    table_name = "sales_data"
)

In [None]:
# TODO: Run and show
join_sales_and_sales_items.show()

In [44]:
# TODO: Create DataFrameReaders of the two .json files in our stage


In [None]:
# TODO: Transform the new data as you did the old data


In [None]:
# TODO: Merge the new data into the old


In [48]:
# TODO: Drop the transient tables


In [None]:
# TODO: Remove the files from the stage
