In [2]:
from dotenv import load_dotenv
import os
import logging
import snowflake.connector
from pyspark.sql import SparkSession
import pandas as pd
import time

# Load the path to .env
load_dotenv(dotenv_path=os.getenv("ENV_PATH"))

# Set global logging level to WARNING
logging.getLogger("snowflake").setLevel(logging.WARNING)

def get_snowflake_connection():
    retries = 3
    for i in range(retries):
        try:
            conn = snowflake.connector.connect(
                user=os.getenv("SNOWFLAKE_USER"),
                password=os.getenv("SNOWFLAKE_PASSWORD"),
                account="YUB81215.us-west-2",
                warehouse="COMPUTE_WH",
                database="SNOWFLAKE_SAMPLE_DATA",
                schema="TPCH_SF1"
            )
            return conn
        except Exception as ex:
            print(f"Retry {i + 1}: Unable to connect to Snowflake. Error: {ex}")
            time.sleep(2)
    raise Exception("Failed to connect to Snowflake after multiple retries.")

try:
    # Establish Snowflake connection
    conn = get_snowflake_connection()
    cur = conn.cursor()
    
    # Resume warehouse if suspended
    cur.execute("SHOW WAREHOUSES LIKE 'COMPUTE_WH'")
    warehouse_status = cur.fetchall()
    if warehouse_status[0][1] == "SUSPENDED":
        cur.execute("ALTER WAREHOUSE COMPUTE_WH RESUME")
    
    # Test connection
    cur.execute("SELECT CURRENT_USER(), CURRENT_VERSION()")
    print("Connection Successful:", cur.fetchall())

    # Fetch data from Snowflake
    cur.execute("SELECT * FROM CUSTOMER LIMIT 10")
    data = pd.DataFrame(cur.fetchall(), columns=[col[0] for col in cur.description])

    # Load data into Spark
    spark = SparkSession.builder.appName("Retail Analytics").getOrCreate()
    spark_df = spark.createDataFrame(data)
    spark_df.show()

    # Filter and process data using Spark
    filtered_df = spark_df.filter(spark_df["C_ACCTBAL"] > 1000)
    filtered_df.show()

except Exception as ex:
    print("An error occurred:", ex)
finally:
    if 'conn' in locals() and conn:
        conn.close()

Connection Successful: [('SNOWFLAY', '9.0.0')]
+---------+------------------+--------------------+-----------+---------------+--------------------+------------+--------------------+
|C_CUSTKEY|            C_NAME|           C_ADDRESS|C_NATIONKEY|        C_PHONE|           C_ACCTBAL|C_MKTSEGMENT|           C_COMMENT|
+---------+------------------+--------------------+-----------+---------------+--------------------+------------+--------------------+
|    60001|Customer#000060001|          9Ii4zQn9cX|         14|24-678-784-9652|9957.560000000000...|   HOUSEHOLD|l theodolites boo...|
|    60002|Customer#000060002|    ThGBMjDwKzkoOxhz|         15|25-782-500-8435|742.4600000000000...|    BUILDING| beans. fluffily ...|
|    60003|Customer#000060003|Ed hbPtTXMTAsgGhC...|         16|26-859-847-7640|2526.920000000000...|    BUILDING|fully pending dep...|
|    60004|Customer#000060004|NivCT2RVaavl,yUnK...|         10|20-573-674-7999|7975.220000000000...|  AUTOMOBILE| furiously above ...|
|    600