In [None]:
from datetime import datetime, timedelta
import random

import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.types import StructType, StructField, DateType, IntegerType, FloatType
from snowflake.core import Root, CreateMode
from snowflake.core.table import Table, TableColumn

In [None]:
# Initialize Snowflake session and configure database and schema
session = get_active_session()
root = Root(session)

# Set the active database and schema
session.use_database("my_db")
session.use_schema("my_schema")

In [None]:
sales_table = Table(
    name="fct_widget_sales",
    comment="Fact sales table for widgets A, B, and C",
    columns=[
        TableColumn(
            name="sale_date",
            datatype="date",
            nullable=False
        ),
        TableColumn(
            name="widget_a_total_sales_dollars",
            datatype="number(2, 2)",
            nullable=False
        ),
        TableColumn(
            name="widget_a_total_sales_units",
            datatype="int",
            nullable=False
        ),
        TableColumn(
            name="widget_b_total_sales_dollars",
            datatype="number(2, 2)",
            nullable=False
        ),
        TableColumn(
            name="widget_b_total_sales_units",
            datatype="int",
            nullable=False
        ),
        TableColumn(
            name="widget_c_total_sales_dollars",
            datatype="number(2, 2)",
            nullable=False
        ),
        TableColumn(
            name="widget_c_total_sales_units",
            datatype="int",
            nullable=False
        ),
    ]
)

In [None]:
# Access the tables collection from the specified database and schema
tables = root.databases["my_db"].schemas["my_schema"].tables

# Create or replace the sales_table
tables.create(sales_table, mode=CreateMode.or_replace)

In [None]:
truncate table fct_widget_sales

In [None]:
def populate_fct_widget_sales(session: Session):
    """
    Populates the fct_widget_sales table with 30 days of random sales data.

    Parameters:
        session (Session): An active Snowflake session.
    """
    # Generate data for 30 days
    today = datetime.today()
    data = [
        (
            (today - timedelta(days=i)).date(),
            round(random.uniform(0, 1000), 2),  # Widget A dollars
            random.randint(0, 100),            # Widget A units
            round(random.uniform(0, 1000), 2), # Widget B dollars
            random.randint(0, 100),            # Widget B units
            round(random.uniform(0, 1000), 2), # Widget C dollars
            random.randint(0, 100)             # Widget C units
        )
        for i in range(30)
    ]

    # Define schema
    schema = StructType([
        StructField("sale_date", DateType()),
        StructField("widget_a_total_sales_dollars", FloatType()),
        StructField("widget_a_total_sales_units", IntegerType()),
        StructField("widget_b_total_sales_dollars", FloatType()),
        StructField("widget_b_total_sales_units", IntegerType()),
        StructField("widget_c_total_sales_dollars", FloatType()),
        StructField("widget_c_total_sales_units", IntegerType()),
    ])

    # Create a Snowpark DataFrame
    df = session.create_dataframe(data, schema)

    # Write to the fct_widget_sales table
    table_name = "fct_widget_sales"
    df.write.mode("overwrite").save_as_table(table_name)

# Populate the fct_widget_sales table
populate_fct_widget_sales(session)

In [None]:
-- Drop the stage if it already exists
DROP STAGE IF EXISTS my_stage;

-- Create a new stage with encryption and directory enabled
CREATE STAGE my_stage
  ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE')
  DIRECTORY = (ENABLE = TRUE);