# Data Engineering -- Data Analysis and Data PreparationThis Notebooe will focus on Data Engineering in Snowflake using Snowpark for Python.

Load data from Snowflake tables into Snowpark DataFrames
Perform Exploratory Data Analysis on Snowpark DataFrames
Pivot and Join data from multiple tables using Snowpark DataFrames
Demostrate how to automate data preparation using Snowflake Tasks

# Import Libraries

In [1]:
# Snowpark for Python
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import month,year,col,sum
from snowflake.snowpark.version import VERSION
from snowflake.core import Root
from snowflake.core.task import Task, StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.core import CreateMode

# Misc
from datetime import timedelta
import json
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

# Establish Secure Connection to Snowflake
Using the Snowpark Python API, itâ€™s quick and easy to establish a secure connection between Snowflake and Notebook.

In [2]:
# Create Snowflake Session object
connection_parameters = json.load(open('E:\Projects\Snowflake Project 2\Config\connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('select current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

User                        : SIDDHANT20
Role                        : "ACCOUNTADMIN"
Database                    : "DASH_DB"
Schema                      : "DASH_SCHEMA"
Warehouse                   : "DASH_L"
Snowflake version           : 8.22.0
Snowpark for Python version : 1.16.0


# Automation: Run Campaign Spend Data Transformations As a Snowflake Task

In [4]:
def campaign_spend_data_pipeline(session: Session) -> str:
  # DATA TRANSFORMATIONS
  # Perform the following actions to transform the data

  # Load the campaign spend data
  snow_df_spend_t = session.table('campaign_spend')

  # Transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions
  snow_df_spend_per_channel_t = snow_df_spend_t.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
      with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

  # Transform the data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions
  snow_df_spend_per_month_t = snow_df_spend_per_channel_t.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
  snow_df_spend_per_month_t = snow_df_spend_per_month_t.select(
      col("YEAR"),
      col("MONTH"),
      col("'search_engine'").as_("SEARCH_ENGINE"),
      col("'social_media'").as_("SOCIAL_MEDIA"),
      col("'video'").as_("VIDEO"),
      col("'email'").as_("EMAIL")
  )

  # Save transformed data
  snow_df_spend_per_month_t.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')


# Automation: Run Monthly Revenue Data Transformations As a Snowflake Task 

In [15]:
def monthly_revenue_data_pipeline(session: Session) -> str:
  # Load revenue table and transform the data into revenue per year/month using group_by and agg() functions
  snow_df_spend_per_month_t = session.table('spend_per_month')
  snow_df_revenue_t = session.table('monthly_revenue')
  snow_df_revenue_per_month_t = snow_df_revenue_t.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')

  # Join revenue data with the transformed campaign spend data so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training
  snow_df_spend_and_revenue_per_month_t = snow_df_spend_per_month_t.join(snow_df_revenue_per_month_t, ["YEAR","MONTH"])

  # SAVE in a new table for the next task
  snow_df_spend_and_revenue_per_month_t.write.mode('overwrite').save_as_table('SPEND_AND_REVENUE_PER_MONTH')

# Create DAG

In [14]:
root = Root(session)
tasks = root.databases[session.get_current_database()].schemas[session.get_current_schema()].tasks

with DAG("de_pipeline_dag", schedule=timedelta(minutes=3)) as dag:
    # Create a task that runs our first pipleine
    dag_spend_task = DAGTask(name='campaign_spend_data_pipeline_task'
                        , definition=StoredProcedureCall(
                                    campaign_spend_data_pipeline, stage_location='@dash_sprocs'
                                )
                        ,warehouse='DASH_L'
                        )
    # Create a task that runs our second pipleine
    dag_revenue_task = DAGTask(name='monthly_revenue_data_pipeline'
                          , definition=StoredProcedureCall(
                                monthly_revenue_data_pipeline, stage_location='@dash_sprocs'
                            )
                        ,warehouse='DASH_L'
                        )
# Shift right and left operators can specify task relationships.
dag_spend_task >> dag_revenue_task  # dag_spend_task is a predecessor of dag_revenue_task

schema = root.databases[session.get_current_database()].schemas[session.get_current_schema()]
dag_op = DAGOperation(schema)

dag_op.deploy(dag,mode=CreateMode.or_replace)

# Run DAG

In [16]:
dag_op.run(dag)

# Resume DAG

In [17]:
root_task = tasks["DE_PIPELINE_DAG"]
root_task.resume()

# Suspend Tasks

In [18]:
root_task = tasks["DE_PIPELINE_DAG"]
root_task.suspend()