# Snowpark RFM Project — Snowsight (Ready-to-run)

This notebook is prepared to run **inside Snowflake Worksheets (Python / Anaconda environment)**. It is self-contained and includes steps to create stages/file formats, load NDJSON/JSON files, run Snowpark DataFrame transformations, compute Top-10 products for October 2025, create RFM features, and persist results to Snowflake tables.

**IMPORTANT:** Upload your data files (`FCT_SALES_10000.ndjson` and `DIM_PRODUCT.json`) to the internal stage `@RAW.JSON_STAGE` using the Snowsight stage upload UI before running the **COPY INTO** cell.

Generated: November 2025
Author: DAGPT

## 1) Prerequisites (Snowflake Worksheets - Python)

- Run this notebook in a **Python worksheet** within Snowsight (Snowflake UI) which has Anaconda integration.
- No local `pip install` or credentials needed — Snowflake manages the environment and credentials for you.
- Before running the ingestion cell, upload the files to the stage via the **Data -> Stages -> RAW.JSON_STAGE -> Upload** UI, or use SnowSQL to `PUT` the files to the stage from your local machine.

Files to upload:
- `FCT_SALES_10000.ndjson`
- `DIM_PRODUCT.json`


## 2.1) Snowpark Architecture — Client vs Server

**Client (Notebook / Snowpark API):**
- The client (your Snowsight Python worksheet) builds a logical plan using Snowpark DataFrame operations.
- It **does not** execute heavy computations locally; instead it creates a plan to send to Snowflake.

**Server (Snowflake Compute Engine):**
- Snowflake receives the logical plan and executes it inside its compute layer, pushing down SQL for efficient server-side processing.
- Advantages: reduced data transfer, secure execution, and scalable compute.

**Demo tips in this notebook:**
- Use `.explain()` on a DataFrame to view the SQL plan and confirm server-side pushdown.
- Use `.show()` to execute and preview results; avoid `.collect()` on large datasets.


In [None]:
# 1) Get Snowpark session inside Snowsight Notebook
# -------------------------------------------------
# New Snowflake Notebook runtimes do NOT auto-create a `session` variable.
# Instead, they provide a function get_active_session() to retrieve the live session.
# This block works in ALL Snowsight Notebook runtimes.

try:
    from snowflake.snowpark.context import get_active_session
    
    # Fetch the active Snowflake-managed session
    session = get_active_session()
    
    print("Snowpark session acquired via get_active_session().")
    print("Connected to account:", session.get_current_account())

except Exception as e:
    print("ERROR: Could not obtain the Snowpark session using get_active_session().")
    print("Details:", e)
    raise Exception(
        "Your notebook is not running under the Snowflake Python runtime.\n"
        "Check: Notebook Settings → Runtime → Snowflake Python Runtime"
    )




## 2.2) Create database, schemas, and stage

This cell creates `POS_DEMO` database, the `RAW`/`PROD` schemas, and the internal stage `@RAW.JSON_STAGE`. You only need to run it once.


In [None]:
# Create database and schemas
session.sql("CREATE DATABASE IF NOT EXISTS POS_DEMO").collect()
session.sql("CREATE SCHEMA IF NOT EXISTS POS_DEMO.RAW").collect()
session.sql("CREATE SCHEMA IF NOT EXISTS POS_DEMO.PROD").collect()

# Create internal stage under RAW schema
session.sql("CREATE OR REPLACE STAGE POS_DEMO.RAW.JSON_STAGE").collect()

print("✅ Database, schemas, and stage ensured: POS_DEMO.RAW.JSON_STAGE")



## 3) Create file format and raw table

We create a JSON file format and a raw table to load Variant JSON rows. The NDJSON file should be uploaded to `@POS_DEMO.RAW.JSON_STAGE` via Snowsight UI before running the COPY command below.


In [None]:
# Create JSON file format
session.sql("""
CREATE OR REPLACE FILE FORMAT POS_DEMO.RAW.FF_JSON_NDJSON
  TYPE = 'JSON'
  STRIP_OUTER_ARRAY = FALSE
""").collect()

# Create raw JSON table
session.sql("""
CREATE OR REPLACE TABLE POS_DEMO.RAW.RAW_JSON (raw VARIANT)
""").collect()

print("✅ File format 'POS_DEMO.RAW.FF_JSON_NDJSON' and table 'POS_DEMO.RAW.RAW_JSON' created successfully.")



### Upload your files to the stage

Use the Snowsight UI: Data → Databases → POS_DEMO → Stages → RAW.JSON_STAGE → **Upload**. Upload both files: `FCT_SALES_10000.ndjson` and `DIM_PRODUCT.json`.

After uploading, run the next cell to copy data into the raw table and normalized tables.


In [None]:
# Copy FCT_SALES NDJSON lines into RAW_JSON
copy_sales = session.sql("""
COPY INTO POS_DEMO.RAW.RAW_JSON
FROM @POS_DEMO.RAW.JSON_STAGE
PATTERN = '.*FCT_SALES_10000.ndjson'
FILE_FORMAT = (FORMAT_NAME = POS_DEMO.RAW.FF_JSON_NDJSON)
ON_ERROR = CONTINUE
""")
print("COPY INTO raw_json for sales started...")
print(copy_sales.collect()[:3])

# Create file format for JSON arrays (if DIM_PRODUCT.json is an array)
session.sql("""
CREATE OR REPLACE FILE FORMAT POS_DEMO.RAW.FF_JSON_ARRAY
  TYPE = 'JSON'
  STRIP_OUTER_ARRAY = TRUE
""").collect()

# Copy DIM_PRODUCT JSON array
copy_prod = session.sql("""
COPY INTO POS_DEMO.RAW.RAW_JSON
FROM @POS_DEMO.RAW.JSON_STAGE
PATTERN = '.*DIM_PRODUCT.json'
FILE_FORMAT = (FORMAT_NAME = POS_DEMO.RAW.FF_JSON_ARRAY)
ON_ERROR = CONTINUE
""")
print("COPY INTO raw_json for products started...")
print(copy_prod.collect()[:3])



## 5) Normalize RAW JSON into structured tables

We will extract fields from the Variant `raw` column and insert into `FCT_SALES` and `DIM_PRODUCT` structured tables.


In [None]:
# 5a) Create structured FCT_SALES and DIM_PRODUCT tables separately
session.sql("""
CREATE OR REPLACE TABLE POS_DEMO.RAW.FCT_SALES (
  SALE_ID STRING,
  CUSTOMER_ID STRING,
  PRODUCT_ID STRING,
  SALE_DATE DATE,
  QUANTITY NUMBER,
  UNIT_PRICE FLOAT,
  TOTAL_AMOUNT FLOAT,
  REGION STRING
)
""").collect()

session.sql("""
CREATE OR REPLACE TABLE POS_DEMO.RAW.DIM_PRODUCT (
  PRODUCT_ID STRING,
  PRODUCT_NAME STRING,
  CATEGORY STRING,
  BRAND STRING,
  PRICE FLOAT
)
""").collect()

print('✅ Structured tables POS_DEMO.RAW.FCT_SALES and POS_DEMO.RAW.DIM_PRODUCT created successfully.')


# 5b) Insert sales rows
insert_sales = session.sql("""
INSERT INTO POS_DEMO.RAW.FCT_SALES (SALE_ID, CUSTOMER_ID, PRODUCT_ID, SALE_DATE, QUANTITY, UNIT_PRICE, TOTAL_AMOUNT, REGION)
SELECT
  raw:"SALE_ID"::STRING,
  raw:"CUSTOMER_ID"::STRING,
  raw:"PRODUCT_ID"::STRING,
  TO_DATE(raw:"SALE_DATE"::STRING,'YYYY-MM-DD'),
  raw:"QUANTITY"::NUMBER,
  raw:"UNIT_PRICE"::FLOAT,
  raw:"TOTAL_AMOUNT"::FLOAT,
  raw:"REGION"::STRING
FROM POS_DEMO.RAW.RAW_JSON
WHERE raw:"SALE_ID" IS NOT NULL;
""")

print('✅ Inserted sales rows into FCT_SALES.')
print(insert_sales.collect()[:3])


# 5c) Insert ONLY product rows (FIXED)
insert_prod = session.sql("""
INSERT INTO POS_DEMO.RAW.DIM_PRODUCT (PRODUCT_ID, PRODUCT_NAME, CATEGORY, BRAND, PRICE)
SELECT
  raw:"PRODUCT_ID"::STRING,
  raw:"PRODUCT_NAME"::STRING,
  raw:"CATEGORY"::STRING,
  raw:"BRAND"::STRING,
  raw:"PRICE"::FLOAT
FROM POS_DEMO.RAW.RAW_JSON
WHERE raw:"PRODUCT_NAME" IS NOT NULL;   -- FIXED HERE
""")

print('✅ Inserted product rows into DIM_PRODUCT.')
print(insert_prod.collect()[:3])




## 6) Quick Validation

Check counts and a few sample rows to ensure data loaded correctly.


In [None]:
# Quick checks
print('Total sales rows:', session.sql('SELECT COUNT(*) FROM POS_DEMO.RAW.FCT_SALES').collect())
print('Distinct products:', session.sql('SELECT COUNT(DISTINCT PRODUCT_ID) FROM POS_DEMO.RAW.FCT_SALES').collect())
print('Sample sales rows:')
print(session.sql('SELECT * FROM POS_DEMO.RAW.FCT_SALES LIMIT 5').collect())


## 7) Snowpark DataFrame Examples — select, filter, join (Lazy vs Eager)

This section demonstrates building a lazy expression (no execution) and triggering execution with `.show()` and `.count()`.


In [None]:
from snowflake.snowpark.functions import col

# Load tables as DataFrames
sales_df = session.table('POS_DEMO.RAW.FCT_SALES')
prod_df = session.table('POS_DEMO.RAW.DIM_PRODUCT')

# SALE_DATE is already DATE type — no need to re-convert
# sales_df = sales_df.with_column('SALE_DATE', to_date(col('SALE_DATE'), 'YYYY-MM-DD'))  ❌ REMOVE THIS

# Build lazy expression
expr = sales_df.select('SALE_ID','CUSTOMER_ID','PRODUCT_ID','SALE_DATE','QUANTITY','TOTAL_AMOUNT') \
               .filter(col('TOTAL_AMOUNT') > 0)

print('Built expression (lazy). Now trigger a small action:')
expr.show(5)
print('Count of expression (executes on server):', expr.count())


## 8) Aggregation — Top 10 Selling Products for October 2025

Compute total quantity and sales for each product during Oct 1–31, 2025 and join with DIM_PRODUCT to show names.


In [None]:

from snowflake.snowpark.functions import sum as ssum, lit, col

last_month_start = '2025-10-01'
last_month_end = '2025-10-31'

top10_oct = (
    sales_df
    .filter((col('SALE_DATE') >= lit(last_month_start)) & (col('SALE_DATE') <= lit(last_month_end)))
    .group_by('PRODUCT_ID')
    .agg(
        ssum(col('QUANTITY')).alias('TOTAL_QTY'),
        ssum(col('TOTAL_AMOUNT')).alias('TOTAL_SALES')
    )
    .order_by(col('TOTAL_SALES').desc())
    .limit(10)
)

# Join with product names
top10_with_name = (
    top10_oct
    .join(prod_df, top10_oct['PRODUCT_ID'] == prod_df['PRODUCT_ID'], how='left')
    .select(top10_oct['PRODUCT_ID'], prod_df['PRODUCT_NAME'], col('TOTAL_QTY'), col('TOTAL_SALES'))
    .order_by(col('TOTAL_SALES').desc())
)

print('Top 10 selling products in Oct 2025:')
top10_with_name.show(10)   # ✅ No truncate arg

# Optional: pretty full output
# print(top10_with_name.to_pandas())

# Persist top10
session.sql('CREATE SCHEMA IF NOT EXISTS POS_DEMO.PROD').collect()
top10_with_name.write.mode('overwrite').save_as_table('POS_DEMO.PROD.TOP10_PRODUCTS_OCT2025')
print('✅ Saved POS_DEMO.PROD.TOP10_PRODUCTS_OCT2025')


## 9) RFM Feature Engineering

Compute LAST_PURCHASE_DATE, RECENCY_DAYS (relative to analysis date), FREQUENCY, and MONETARY for each customer.


In [None]:
from snowflake.snowpark.functions import col, max as smax, count as scount, sum as ssum, datediff, lit

analysis_date = '2025-11-04'

rfm_df = (
    sales_df
    .group_by('CUSTOMER_ID')
    .agg(
        smax(col('SALE_DATE')).alias('LAST_PURCHASE_DATE'),
        scount(col('SALE_ID')).alias('FREQUENCY'),
        ssum(col('TOTAL_AMOUNT')).alias('MONETARY')
    )
    .with_column('RECENCY_DAYS', datediff('day', col('LAST_PURCHASE_DATE'), lit(analysis_date)))
    .select('CUSTOMER_ID', 'LAST_PURCHASE_DATE', 'RECENCY_DAYS', 'FREQUENCY', 'MONETARY')
)

print('Preview RFM (top by monetary):')
rfm_df.order_by(col('MONETARY').desc()).show(10)

# Persist RFM results
rfm_df.write.mode('overwrite').save_as_table('POS_DEMO.PROD.RFM_CUSTOMER_FEATURES')
print('✅ Saved POS_DEMO.PROD.RFM_CUSTOMER_FEATURES')


## 10) Churn Label & Export for Modeling

We define a simple churn label: `CHURN_LABEL = 1 if RECENCY_DAYS > 30 else 0`.

**Note:** If you want to export to CSV for model training locally, you can `to_pandas()` a small dataset. Avoid pulling large tables to client in production.


In [None]:
# Add CHURN_LABEL server-side and show distribution
from snowflake.snowpark.functions import when

rfm_labeled = rfm_df.with_column('CHURN_LABEL', when(col('RECENCY_DAYS') > 30, 1).otherwise(0))
print('Churn label distribution:')
print(rfm_labeled.group_by('CHURN_LABEL').count().order_by('CHURN_LABEL').collect())

# Example: write labeled data to PROD for downstream ML
rfm_labeled.write.mode('overwrite').save_as_table('POS_DEMO.PROD.RFM_CUSTOMER_FEATURES_LABELED')
print('Saved POS_DEMO.PROD.RFM_CUSTOMER_FEATURES_LABELED')

# If you need CSV, export using a stage (example writes to table, then user can unload to stage with COPY INTO)


## 11) Validation & Explain Plans

Run these quick checks and view the execution plan to confirm server-side pushdown.


In [None]:
# Validation checks
print('FCT_SALES count:', session.table('POS_DEMO.RAW.FCT_SALES').count())
print('Distinct products:', session.table('POS_DEMO.RAW.FCT_SALES').select('PRODUCT_ID').distinct().count())
print('RFM rows:', session.table('POS_DEMO.PROD.RFM_CUSTOMER_FEATURES').count())

# Show explain for top10 query
print('\nTop10 explain plan:')
print(top10_with_name.explain())


## 12) Next Steps & Cleanup

- Extend features: average order value (`MONETARY/FREQUENCY`), tenure, recency buckets, rolling features.
- For production, consider clustering on `CUSTOMER_ID` or `SALE_DATE` for performance.
- To clean up created objects (optional), run DROP statements for the tables/stage.
