In [29]:
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
import pandas as pd
from dotenv import load_dotenv
import os

In [30]:
def configure():
    load_dotenv()

configure()

In [31]:
def snowpark_session():
    connection_parameters = {
        "account": os.getenv('SNOWFLAKE_ACCOUNT_NAME'),
        "user": os.getenv('SNOWFLAKE_USER_NAME'),
        "password": os.getenv('SNOWFLAKE_PASSWORD'),
        "warehouse": os.getenv('SNOWFLAKE_WAREHOUSE'),
        "database": os.getenv('SNOWFLAKE_DATABASE'),
    }
    session = Session.builder.configs(connection_parameters).create()
    session.sql_simplifier_enabled = True
    return session

In [32]:
# def check_snowpark_connection():
#     try:
#         # Create a Snowpark session
#         session = Session.builder.configs(connection_parameters).create()

#         # If connection is successful, execute a simple query
#         result = session.sql("SELECT CURRENT_USER(), CURRENT_ACCOUNT(), CURRENT_REGION()").collect()
        
#         # Print the result
#         print(f"Connection successful! Current User: {result[0][0]}, Account: {result[0][1]}, Region: {result[0][2]}")

#         # Close the session
#         session.close()

#     except Exception as e:
#         print(f"Failed to connect to Snowflake using Snowpark: {e}")

# # Call the function to check the connection
# check_snowpark_connection()

In [33]:
#transform columns
app_df = pd.read_csv('C:/Users/User/Desktop/Apple Store API/appstore_sales_data.csv', delimiter='\t')
app_df.columns = [col.replace(" ", "_") for col in app_df.columns]

In [34]:
#Generate primary key
app_df['Composite_Key'] = app_df['Product_Type_Identifier'].astype(str) + app_df['Units'].astype(str) + app_df['Begin_Date'].astype(str) + app_df['Country_Code'].astype(str) + app_df['Device'].astype(str) + app_df['Version'].astype(str) + app_df['Developer_Proceeds'].astype(str)
app_df['Composite_Key'] = app_df['Composite_Key'].str.replace('/', '', regex=False).str.replace('.', '', regex=False).str.lower()

# Check if the new composite column is unique
if app_df['Composite_Key'].is_unique:
    print("Composite key is unique")
else:
    print("Composite key is not unique")

Composite key is unique


In [35]:
session_sp = snowpark_session()


session_sp.sql(
"""
CREATE OR REPLACE TABLE demo.hiring_temp.sales_rg (
    Provider VARCHAR(5),
    Provider_Country VARCHAR(2),
    SKU VARCHAR(100),
    Developer VARCHAR(4000),
    Title VARCHAR(600),
    Version VARCHAR(100),
    Product_Type_Identifier VARCHAR(20),
    Units DECIMAL(18,2),
    Developer_Proceeds DECIMAL(18,2),
    Begin_Date DATE,
    End_Date DATE,
    Customer_Currency CHAR(3),
    Country_Code CHAR(2),
    Currency_of_Proceeds CHAR(3),
    Apple_Identifier DECIMAL(18,0),
    Customer_Price DECIMAL(18,2),
    Promo_Code VARCHAR(10),
    Parent_Identifier VARCHAR(100),
    Subscription VARCHAR(10),
    Period VARCHAR(30),
    Category VARCHAR(50),
    CMB VARCHAR(5),
    Device VARCHAR(10),
    Supported_Platforms VARCHAR(12),
    Proceeds_Reason VARCHAR(20),
    Preserved_Pricing VARCHAR(3),
    Client VARCHAR(30),
    Order_Type VARCHAR(30),
    Composite_Key VARCHAR(100) NOT NULL,
    PRIMARY KEY (Composite_Key)
);

"""
).collect()


You might have more than one threads sharing the Session object trying to update sql_simplifier_enabled. Updating this while other tasks are running can potentially cause unexpected behavior. Please update the session configuration before starting the threads.


[Row(status='Table SALES_RG successfully created.')]

In [36]:
session_sp.sql(
"""
CREATE OR REPLACE TABLE demo.hiring_prod.sales_rg (
    Provider VARCHAR(5),
    Provider_Country VARCHAR(2),
    SKU VARCHAR(100),
    Developer VARCHAR(4000),
    Title VARCHAR(600),
    Version VARCHAR(100),
    Product_Type_Identifier VARCHAR(20),
    Units DECIMAL(18,2),
    Developer_Proceeds DECIMAL(18,2),
    Begin_Date DATE,
    End_Date DATE,
    Customer_Currency CHAR(3),
    Country_Code CHAR(2),
    Currency_of_Proceeds CHAR(3),
    Apple_Identifier DECIMAL(18,0),
    Customer_Price DECIMAL(18,2),
    Promo_Code VARCHAR(10),
    Parent_Identifier VARCHAR(100),
    Subscription VARCHAR(10),
    Period VARCHAR(30),
    Category VARCHAR(50),
    CMB VARCHAR(5),
    Device VARCHAR(10),
    Supported_Platforms VARCHAR(12),
    Proceeds_Reason VARCHAR(20),
    Preserved_Pricing VARCHAR(3),
    Client VARCHAR(30),
    Order_Type VARCHAR(30),
    Composite_Key VARCHAR(100) NOT NULL,
    PRIMARY KEY (Composite_Key)
);

"""
).collect()

[Row(status='Table SALES_RG successfully created.')]

In [37]:
#Insert to temp table from pandas dataframe

df_sp = session_sp.write_pandas(

df = app_df,
table_name="sales_rg",
database = "demo",
schema = "hiring_temp",
quote_identifiers = False,
overwrite = True
)

In [38]:
#merge into prod

session_sp.sql(
"""

MERGE INTO demo.hiring_prod.sales_rg AS prod
USING demo.hiring_temp.sales_rg AS temp
ON prod.Composite_Key = temp.Composite_Key
WHEN NOT MATCHED THEN
    INSERT (
        Composite_Key, 
        Provider, 
        Provider_Country, 
        SKU, 
        Developer, 
        Title, 
        Version, 
        Product_Type_Identifier, 
        Units, 
        Developer_Proceeds, 
        Begin_Date, 
        End_Date, 
        Customer_Currency, 
        Country_Code, 
        Currency_of_Proceeds, 
        Apple_Identifier, 
        Customer_Price, 
        Promo_Code, 
        Parent_Identifier, 
        Subscription, 
        Period, 
        Category, 
        CMB, 
        Device, 
        Supported_Platforms, 
        Proceeds_Reason, 
        Preserved_Pricing, 
        Client, 
        Order_Type
    )
    VALUES (
        temp.Composite_Key, 
        temp.Provider, 
        temp.Provider_Country, 
        temp.SKU, 
        temp.Developer, 
        temp.Title, 
        temp.Version, 
        temp.Product_Type_Identifier, 
        temp.Units, 
        temp.Developer_Proceeds, 
        temp.Begin_Date, 
        temp.End_Date, 
        temp.Customer_Currency, 
        temp.Country_Code, 
        temp.Currency_of_Proceeds, 
        temp.Apple_Identifier, 
        temp.Customer_Price, 
        temp.Promo_Code, 
        temp.Parent_Identifier, 
        temp.Subscription, 
        temp.Period, 
        temp.Category, 
        temp.CMB, 
        temp.Device, 
        temp.Supported_Platforms, 
        temp.Proceeds_Reason, 
        temp.Preserved_Pricing, 
        temp.Client, 
        temp.Order_Type
    );

"""
).collect()

[Row(number of rows inserted=414)]