In [1]:
import json

from snowflake.snowpark import Session
from snowflake.ml.modeling.preprocessing import OneHotEncoder

from snowflake.core import Root
from snowflake.core.task import Task, StoredProcedureCall, Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, CreateMode

In [None]:
# Create session connection
session = Session.builder.configs(json.load(open("connection.json"))).create()

# Make Feature Engineering Function

In [None]:
def feature_engineering(session: Session) -> str:
    """
    Perform feature engineering on the modeled data by applying one-hot 
    encoding to categorical columns and creating separate tables for each 
    category.
    
    This function takes a session object connected to Snowflake and processes 
    a pre-modeled dataset from the 'data_model_one' table. The steps involved 
    are as follows:
    
    Args:
        session (Session): An active Snowflake session object used for SQL 
                           operations.
    
    Returns:
        str: A status message indicating the completion of the feature 
             engineering process.
    """

    # Get modeled data
    data_model_df = session.table('data_model_one')

    # List of categorical columns
    cat_cols = [
        'site',
        'storage',
        'start_date',
        'type',
        'end_date'
    ]

    # Initialize the OneHotEncoder
    ohe = OneHotEncoder(
        input_cols=cat_cols,
        output_cols=cat_cols,
        drop_input_cols=True,
        drop="first",
        handle_unknown="ignore"
    )

    # Apply one-hot encoding
    ohe_df = ohe.fit(data_model_df).transform(data_model_df)

    # Create separate dataframes for each category by dropping irrelevant columns
    category_1_ohe = ohe_df.drop(['category_2_pct', 'category_3_pct'])
    category_2_ohe = ohe_df.drop(['category_1_pct', 'category_3_pct'])
    category_3_ohe = ohe_df.drop(['category_1_pct', 'category_2_pct'])

    # Save each category dataframe as a separate table in Snowflake
    category_1_ohe.write.mode("overwrite").save_as_table("category_1_feats")
    category_2_ohe.write.mode("overwrite").save_as_table("category_2_feats")
    category_3_ohe.write.mode("overwrite").save_as_table("category_3_feats")

    return "success"

In [None]:
# Register the feature_engineering function as a Snowflake stored procedure
procedure = session.sproc.register(
    func=feature_engineering,
    name="feature_engineering",
    packages=[
        'snowflake-snowpark-python',
        'snowflake-ml-python'
    ],
    is_permanent=True,
    stage_location="@ML",
    replace=True
)

# Create feature engineering task with SQL

In [None]:
# SQL to create the task
create_task_sql = """
CREATE OR REPLACE TASK feature_engineering_task
WAREHOUSE = ml_warehouse
AFTER data_engineering_task
COMMENT = 'Run feature engineering after data engineering'
AS
CALL feature_engineering();
"""

# SQL to enable the task
enable_task_sql = "ALTER TASK feature_engineering_task RESUME"

# Create the task
with session.connection() as conn:
    conn.cursor().execute(create_task_sql)

# Enable the task
with session.connection() as conn:
    conn.cursor().execute(enable_task_sql)

# Create feature engineering task with Python

In [None]:
# Create the task object
feature_engineering_task_py = Task(
    definition=StoredProcedureCall(procedure),
    name='data_engineering_task_py',
    warehouse='fishtalk_ml_warehouse',
    comment='Run of feature_engineering',
)

# Create the task
root = Root(session)
tasks = root.databases["data"].schemas["ml"].tasks
tasks.create(feature_engineering_task_py)

# Create simple DAG to run feature engineering after data engineering

In [None]:
# Define the schema
schema = root.databases["data"].schemas["ml"]

# Create DAG object
dag = DAG(
    name="ml_dag",
    schedule=Cron('0 0 * * *', 'UTC'),
)

# Create DAG tasks
with dag:
    data_eng_task = DAGTask(
        name="Data Engineering",
        definition=StoredProcedureCall(
            'data_engineering',
            stage_location='@ML'
        ),
        warehouse="ML_WH",
    )
    
    feat_eng_task = DAGTask(
        name="Feature Engineering",
        definition=StoredProcedureCall(procedure),
        warehouse="ML_WH",
    )
    
    # Set task dependencies
    data_eng_task >> feat_eng_task

# Deploy DAG
dag_op = DAGOperation(schema)
dag_op.deploy(dag, mode=CreateMode.or_replace)

# Enable DAG
dag_op.run(dag)
