# Install Snowpark

In [None]:
!pip install snowflake-snowpark-python



---



# Connect to Snowflake via SnowPark

In [6]:
import time


# --->  PYSPARK

# import pyspark.sql.functions as f
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import udf,col
# from pyspark.sql.types import IntegerType
# spark = SparkSession.builder.appName("DataEngeering1").getOrCreate()

# <---  PYSPARK

import snowflake.snowpark.functions as f
from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.functions import udf, col
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import call_udf


# <----- Make these changes before running the notebook -------
# 1. Change Connection params to match your environment

# <----------------------------------------------------------------------------

Warehouse_Name = 'MY_ETL_WH'
DB_NAME = 'DEMO_SNOWPARK'

CONNECTION_PARAMETERS1 = {
    "host": "<YourAccount>.snowflakecomputing.com",
    'account': '<YourAccount>',
    'user': '<Your_UserID>',
    'password': '<Your_Password>',
    'role': 'SYSADMIN',
}



print("Connecting to Snowflake.....\n")
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
print("Connected Successfully!...\n\n")



sql_cmd = "CREATE OR REPLACE WAREHOUSE {} WAREHOUSE_SIZE = 'X-Small' ".format(Warehouse_Name)
session.sql(sql_cmd).collect() 

sql_cmd = "CREATE OR REPLACE DATABASE {}".format(DB_NAME)
session.sql(sql_cmd).collect() 

sql_cmd = "USE SCHEMA {}.PUBLIC".format(DB_NAME)
session.sql(sql_cmd).collect() 

sql_cmd = "USE WAREHOUSE {}".format(Warehouse_Name)
session.sql(sql_cmd).collect() 


Connecting to Snowflake.....

Connected Successfully!...




[Row(status='Statement executed successfully.')]

## Start Data Engineering Process

In [7]:


# 1 - INCREASE COMPUTE TO 4 NODES
print("Resizing to from XS(1 Node) to MEDIUM(4 Nodes) ..\n")

sql_cmd = "ALTER WAREHOUSE {} SET WAREHOUSE_SIZE = 'LARGE' WAIT_FOR_COMPLETION = TRUE".format(Warehouse_Name)
session.sql(sql_cmd).collect()  

print("Completed!...\n\n")


# 2 - READ & JOIN 2 LARGE TABLES (600M & 1M rows)
print("Joining, Aggregating with 2 large tables(600M & 1M rows) & Writing results to new table(80M rows) ..\n")

dfLineItems = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM")  # 600 Million Rows
dfSuppliers = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.SUPPLIER")  # 1 Million Rows

print('Lineitems Table: %s rows' % dfLineItems.count())
print('Suppliers Table: %s rows' % dfSuppliers.count())

# 3 - JOIN TABLES
dfJoinTables = dfLineItems.join(dfSuppliers,
                                dfLineItems.col("L_SUPPKEY") == dfSuppliers.col("S_SUPPKEY"))  

# 4 - SUMMARIZE THE DATA BY SUPPLIER, PART, SUM, MIN & MAX
dfSummary = dfJoinTables.groupBy("S_NAME", "L_PARTKEY").agg([
    f.sum("L_QUANTITY").alias("TOTAL_QTY"),
    f.min("L_QUANTITY").alias("MIN_QTY"),
    f.max("L_QUANTITY").alias("MAX_QTY"),
])


Resizing to from XS(1 Node) to MEDIUM(4 Nodes) ..

Completed!...


Joining, Aggregating with 2 large tables(600M & 1M rows) & Writing results to new table(80M rows) ..

Lineitems Table: 600037902 rows
Suppliers Table: 1000000 rows


### **↑ Compute is NOT used** up to this point. (Lazy Execution Model) !!!

## 3. Storing the Results in Table or Showing results triggers the compute & previous steps.

In [8]:
start_time = time.time()
# 5 - WRITE THE RESULTS TO A NEW TABLE ( 80 Million Rows)
# <-- This is when all the previous operations are compiled & executed as a single job
dfSummary.write.mode("overwrite").saveAsTable("SALES_SUMMARY")
print("Completed!...\n\n")

# 6 - QUERY THE RESULTS (80 Million Rows)
print("Query the results..\n")
dfSales = session.table("SALES_SUMMARY")
dfSales.show()
end_time = time.time()

print("Completed!...\n\n")

# 7 - SCALE DOWN COMPUTE TO 1 NODE
print("Reducing the warehouse to XS..\n")
sql_cmd = "ALTER WAREHOUSE {} SET WAREHOUSE_SIZE = 'XSMALL'".format(Warehouse_Name)
session.sql(sql_cmd).collect()  

print("Completed!...\n")

print("--- %s seconds to Join, Summarize & Write Results to a new Table --- \n" % int(end_time - start_time))
print("--- %s Rows Written to SALES_SUMMARY table" % dfSales.count())

Completed!...


Query the results..

--------------------------------------------------------------------------
|"S_NAME"            |"L_PARTKEY"  |"TOTAL_QTY"  |"MIN_QTY"  |"MAX_QTY"  |
--------------------------------------------------------------------------
|Supplier#000754791  |5504785      |176.00       |6.00       |49.00      |
|Supplier#000276830  |2526823      |217.00       |11.00      |48.00      |
|Supplier#000313196  |9063186      |141.00       |9.00       |50.00      |
|Supplier#000098525  |17348473     |248.00       |6.00       |49.00      |
|Supplier#000548138  |6048125      |226.00       |7.00       |43.00      |
|Supplier#000602944  |13602943     |154.00       |1.00       |34.00      |
|Supplier#000654064  |2404061      |175.00       |4.00       |40.00      |
|Supplier#000708810  |11208787     |96.00        |8.00       |31.00      |
|Supplier#000716204  |9716203      |239.00       |7.00       |37.00      |
|Supplier#000267476  |17475        |129.00       |1.00       |3