# Data Pipeline Flow

## 1. Data Integration Layer
### 1.1 MYSQL Source Integration
- Implementation of `usp_loadMySQLData` stored procedure
- Loading of product, customer, orders, inventory, and order items data  from MySQL Database
- Using DB APIs

### 1.2 File-based Data Integration
- JSON data integration for product reviews
- XML data integration for customer preferences and product specifications
- Implementation of `usp_loadProductReviews_json` and `usp_loadCustomerPreferences_xml`

## 2. Data Transformation Layer
### 2.1 Bronze to Silver Transformation
- Implementation of `usp_loadBronzeToSilver` stored procedure
- Data cleaning and standardization
- Telemetry integration for monitoring

### 2.2 Data Validation
- Integration with Great Expectations
- Using Artifact Repository 
- Data quality checks and validation

### 2.3 Silver to Gold Transformation
- Implementation of `usp_loadSilverToGold` stored procedure
- Business logic implementation
- Performance optimization


## 3. Analytics Layer

### 3.1 Order Analytics
- Implementation of `usp_loadOrderAnalytics` stored procedure
- Creation of analytical tables for Streamlit dashboard
- Business metrics and KPIs

### 3.2 Customer Analytics
- Implementation of `usp_load_customer_analytics` stored procedure
- Creation of analytical tables for Streamlit dashboard


## 4. Pipeline Orchestration
### 4.1 Task DAG Implementation
- Creation of `usp_createTaskDAG` stored procedure
- Task dependencies and scheduling
- Pipeline monitoring and history

## 5. Dashboard
- Implementation of `usp_saveChartsToStage` stored procedure
- Chart generation and storage
- Integration with Streamlit dashboard


## 6. Cleanup



In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


>Note:  **Ensure you copy all the py file from git repo to CODEFILES stage created in the setup section**

## 1. Data Integration Layer

### 1.1 MYSQL Source Integration
- Implementation of `usp_loadMySQLData` stored procedure
- Loading of product, customer, orders, inventory, and order items data  from MySQL Database
- Using DB APIs

### 1.2 File-based Data Integration
- JSON data integration for product reviews
- XML data integration for customer preferences and product specifications
- Implementation of `usp_loadProductReviews_json` and `usp_loadCustomerPreferences_xml`

### 1.1 MYSQL Source Integration



In [None]:
CREATE OR REPLACE PROCEDURE usp_loadMySQLData(
   source varchar 
   ,source_table varchar
   ,dest_table_name varchar
)
RETURNS VARCHAR 
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10' 
ARTIFACT_REPOSITORY = snowflake.snowpark.pypi_shared_repository
artifact_repository_packages=('snowflake-snowpark-python[pandas]','pymysql','snowflake-telemetry-python')
IMPORTS = ('@CODEFILES/data_integration_for_databases.py','@CODEFILES/db_connections.py')
-- PACKAGES = ('snowflake-telemetry-python') -- Include necessary packages
HANDLER = 'data_integration_for_databases.main'
EXTERNAL_ACCESS_INTEGRATIONS = (DBMS_ACCESS_INTEGRATION)
SECRETS = ('mysql_cred'=MYSQL_DB_SECRET,'mysql_hostname'=MYSQL_HOSTNAME)
;

To execute the SP you can run the following query:  \n

For Python - session.call("usp_loadMySQLData",'mysql','products','BRONZE_PRODUCTS') 

OR

For SQL - CALL usp_loadMySQLData('mysql','products','BRONZE_PRODUCTS')


Please note, we will be calling these SP's from the DAG that we will be created at the end of this notebook

In [None]:
-- session.call("usp_loadMySQLData",'mysql','products','BRONZE_PRODUCTS')
--  To test run the following query
 CALL usp_loadMySQLData('mysql','inventory','BRONZE_INVENTORY');


### 1.2 File-based Data Integration

Ensure you can copied the data_integration_for_files.py from your clone repo to CODEFILES stage that you have created as part of the setup step. Also ensure you have copied the JSON and XML files from the cloned repo to SOURCE_FILES stage you have created.


In [None]:
# Create a stored procedure to load product reviews from JSON to bronze layer
# This procedure uses the DataIntegration class to parse and load JSON data

# Import required modules for data integration and Snowpark functionality

from snowflake.snowpark.functions import sproc
from snowflake.snowpark import Session
from snowflake import telemetry
import logging


@sproc(session=session,
       name="usp_loadProductReviews_json", 
       replace=True, 
       is_permanent=True, 
       stage_location='@CODEFILES',
       imports=["@CODEFILES/data_integration_for_files.py"],
       packages=["snowflake-snowpark-python", "pandas",'python-dotenv',"snowflake-telemetry-python"])
def loadProductReviews(session: Session) -> None:
    from data_integration_for_files import DataIntegration
    # Initialize data integration handler
    data_ingestion = DataIntegration(session)
    
    # Log the start of data loading process
    logging.info("Loading product reviews data")
    
    # Call the JSON loading function to parse and load data to bronze layer
    # The function handles JSON parsing, data transformation and loading to BRONZE_PRODUCT_REVIEWS table
    telemetry.set_span_attribute("usp_loadProductReviews_json", "begin")
    telemetry.add_event("event_with_attributes", {"source_data": "Product Reviews JSON"})
    data_ingestion.load_product_reviews_to_bronze('@SOURCE_FILES/product_reviews.json')
    
    # Log successful completion of data loading
    logging.info("Product reviews data loaded")



In [None]:
call usp_loadProductReviews_json();

In [None]:

# Import required modules for data integration and Snowpark functionality
# This is loading the xml file which has the customer preferences and product specs.



# Create a stored procedure to load customer preferences from XML to bronze layer
# This procedure uses the DataIntegration class to parse and load XML data
@sproc(name="usp_loadCustomerPreferences_xml", replace=True, is_permanent=True, stage_location='@CODEFILES',
       imports=["@CODEFILES/data_integration_for_files.py"],
       packages=["snowflake-snowpark-python==1.31.1", "pandas","snowflake-telemetry-python"])
def loadCustomerPreferences(session: Session) -> None:
    from data_integration_for_files import DataIntegration
    from snowflake.snowpark.functions import sproc
    from snowflake.snowpark import Session
    session._use_scoped_temp_objects = False
    from snowflake import telemetry
    
    # Initialize data integration handler
    # session._use_scoped_temp_objects = False
    data_ingestion = DataIntegration(session)
    
    # The function handles XML parsing, data transformation and loading to BRONZE_CUSTOMER_PREFERENCES table
    telemetry.set_span_attribute("usp_loadCustomerPreferences_xml", "begin")
    telemetry.add_event("event_with_attributes", {"source_data": " XML Files"})
    data_ingestion.load_cust_pref_xml_to_bronze('@SOURCE_FILES/customer_preferences.xml','BRONZE_CUSTOMER_PREFERENCES')
    
    # Log successful completion of data loading
    logging.info("Customer preferences data loaded")

In [None]:
call usp_loadCustomerPreferences_xml()

In [None]:
# Import required modules for data integration and Snowpark functionality
# This is loading the xml file which has the customer preferences and product specs.

from snowflake.snowpark.functions import sproc
from snowflake.snowpark import Session
import logging
# Create a stored procedure to load customer preferences from XML to bronze layer
# This procedure uses the DataIntegration class to parse and load XML data
@sproc(name="usp_loadProductSpecification_xml", replace=True, is_permanent=True, stage_location='@CODEFILES',
       imports=["@CODEFILES/data_integration_for_files.py"],
       packages=["snowflake-snowpark-python==1.31.1", "pandas","snowflake-telemetry-python"])
def loadProductSpecification(session: Session) -> None:
    from data_integration_for_files import DataIntegration

    session._use_scoped_temp_objects = False
    from snowflake import telemetry
    
    # Initialize data integration handler
    # session._use_scoped_temp_objects = False
    data_ingestion = DataIntegration(session)
    
    # The function handles XML parsing, data transformation and loading to BRONZE_CUSTOMER_PREFERENCES table
    telemetry.set_span_attribute("usp_loadProductSpecification_xml", "begin")
    telemetry.add_event("event_with_attributes", {"source_data": " Prodcut Spec XML Files"})
    data_ingestion.load_product_specs_xml_to_bronze('@SOURCE_FILES/product_specifications.xml','BRONZE_PRODUCT_SPECIFICATIONS')
    
    # Log successful completion of data loading
    logging.info("Product Spec data loaded")

In [None]:
call usp_loadProductSpecification_xml()

In [None]:
select * from BRONZE_PRODUCT_SPECIFICATIONS limit 10;

## 2. Data Transformation Layer

### 2.1 Bronze to Silver Transformation

We are creating a SP which basically performing some transformation and loading the data from Bronze layer to Silver layer.

In [None]:
# Import required modules for data transformation and telemetry
from snowflake.snowpark.functions import sproc
from snowflake.snowpark import Session

# Create a stored procedure to transform data from Bronze to Silver layer
# This follows the Medallion architecture pattern for data transformation
@sproc(name="usp_loadBronzeToSilver", replace=True, is_permanent=True, stage_location='@CODEFILES',
       imports=["@CODEFILES/transformations.py"],
       packages=["snowflake-snowpark-python","snowflake-ml-python","snowflake-telemetry-python"])
def transform_bronze_to_silver(session: Session) -> None:
    from transformations import DataTransformations
    from snowflake import telemetry
    # Add telemetry for monitoring the transformation process
    telemetry.set_span_attribute("usp_loadBronzeToSilver", "begin")
    telemetry.add_event("event_with_attributes", {"source_data": "multiple tables", "transformation": "bronze to silver"})
    data_transformer = DataTransformations(session,'snowpark')
    data_transformer.bronze_to_silver_transformation()


Do not run the SP manually unless you have loaded all the mysql tables into Snowflake else the SP execution will fail as it fetches data from all the tables that is loaded from the MySQL.

#call usp_loadBronzeToSilver()

### 2.2 Data Validation

We are using Great Expectations (GE) library to perform some basic data validation. Here we are using artifact repository feature which will get the package from pypi and we don't have to perform any manual step to install the GE package.

In [None]:
CREATE OR REPLACE   PROCEDURE usp_generateValidationResults()
  RETURNS VARCHAR
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  ARTIFACT_REPOSITORY = snowflake.snowpark.pypi_shared_repository
  PACKAGES = ('snowflake-snowpark-python')
  ARTIFACT_REPOSITORY_PACKAGES = ('great-expectations==0.15.14','pandas==2.1.4')
  HANDLER = 'generateValidationResults'
  AS
$$
from great_expectations.data_context.types.base import DataContextConfig, DatasourceConfig, S3StoreBackendDefaults
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from snowflake.snowpark.types import IntegerType, StringType, StructField,VariantType,StructType,BooleanType
# from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult
from great_expectations.data_context import DataContext
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import DataContextConfig, DatasourceConfig, FilesystemStoreBackendDefaults
from great_expectations.checkpoint import Checkpoint   
from snowflake.snowpark import Session
                     
def generateValidationResults(session: Session) -> str:
    
    from pathlib import Path
    import os ,sys ,json ,tarfile
    
    data_context_config = DataContextConfig(
    datasources={
        "dataframe_datasource": DatasourceConfig(
            class_name="PandasDatasource",
            batch_kwargs_generators={
                "subdir_reader": {
                    "class_name": "SubdirReaderBatchKwargsGenerator",
                    "base_directory": "/tmp/great_expectation/",
                }
            },
        )
    },
    store_backend_defaults=FilesystemStoreBackendDefaults(root_directory="/tmp/great_expectation"),
    )
    
    # Creating the GE context here
    context = BaseDataContext(project_config=data_context_config)
    
    # Providing the datasource details which here is the pandas DF. We define the actual DF in the batch request which is defined after creating the DS
    
    datasource_config = {
    "name": "pandas_dataframe_datasource",
    "class_name": "Datasource",
    "module_name": "great_expectations.datasource",
    "execution_engine": {
        "module_name": "great_expectations.execution_engine",
        "class_name": "PandasExecutionEngine",
    },
    "data_connectors": {
        "default_runtime_data_connector_name": {
            "class_name": "RuntimeDataConnector",
            "module_name": "great_expectations.datasource.data_connector",
            "batch_identifiers": ["default_identifier_name"],
        },
    },
            }
    con='done'

    # Adding the DS to the context
    context.add_datasource(**datasource_config)
    
    # Converting the Snowpark DF into Pandas DF.
    df=session.sql("select top 2000 * from SILVER_PRODUCT_REVIEWS").to_pandas()
    
    #Creating the batch request whivh will be used 
    batch_request = RuntimeBatchRequest(
                                datasource_name="pandas_dataframe_datasource",
                                data_connector_name="default_runtime_data_connector_name",
                                data_asset_name="PandasData",  # This can be anything that identifies this data_asset for you
                                runtime_parameters={"batch_data": df},  # df is your dataframe, you have created above.
                                batch_identifiers={"default_identifier_name": "default_identifier"},
                                )
    
    # Creating the expecation suite
    context.create_expectation_suite(
    expectation_suite_name="pandas_expectation_suite", overwrite_existing=True)
    
    # Creating the validator which takes the batch request and expectation suite name
    validator = context.get_validator(
        batch_request=batch_request, expectation_suite_name="pandas_expectation_suite"
    )
    
    #Creating the required expectation. You can also create custom expectations as well. You can add additional inbuilt expectations as per the requirement
    validator.expect_column_values_to_be_in_set("VERIFIED_PURCHASE",["FALSE"])
    validator.expect_column_min_to_be_between("HELPFUL_VOTES",12,20)
    
    #Saving the expectation 
    validator.save_expectation_suite(discard_failed_expectations=False)
    
    # Creating the checkpoint without writing to the file system and running by passing the run time parameters

    my_checkpoint_name = "pandas_checkpoint"
    checkpoint_config = {
                "name": my_checkpoint_name,
                "config_version": 1.0,
                "class_name": "SimpleCheckpoint",
                "run_name_template": "%Y%m%d-%H%M%S-my-pandas_run-name-template",
            }
            
    context.add_checkpoint(**checkpoint_config)

    # run expectation_suite against Pandas dataframe
    res = context.run_checkpoint(
            checkpoint_name = my_checkpoint_name,
            validations=[
                {
                    "batch_request": batch_request,
                    "expectation_suite_name": "pandas_expectation_suite",
                }
            ],
        )
    
        
    # Defining the schema, creating the Snowpark DF and and writing the validation results to a table.
    schema = StructType([StructField("RunStatus", BooleanType()),StructField("RunId", VariantType()), StructField("RunValidation", VariantType())])

    df=session.create_dataframe([[res.success, json.loads(str(res.run_id)),json.loads(str(res.list_validation_results()))]], schema)

    df.write.mode('append').saveAsTable('GreatExpeactionValidationsResults')
    return 'SUCCESS'
$$;

### 2.3 Silver to Gold Transformation

Here we are loading the aggregrated results into the Gold layer

In [None]:

from snowflake import telemetry
from snowflake.snowpark.functions import sproc
from snowflake.snowpark import Session
# session.add_import("transformations.py")
@sproc(name="usp_loadSilverToGold", replace=True, is_permanent=True, stage_location='@CODEFILES',imports=["@CODEFILES/transformations.py"]
       ,packages=["snowflake-snowpark-python","snowflake-ml-python","snowflake-telemetry-python"])
def transform_silver_to_gold(session: Session) -> None:
    from transformations import DataTransformations
    telemetry.set_span_attribute("usp_loadSilverToGold", "begin")
    telemetry.add_event("event_with_attributes", {"source_data": "Silver tables", "transformation": "silver to gold"})
    data_transformer = DataTransformations(session,'snowpark')
    data_transformer.silver_to_gold_transformation()

## 3. Analytics Layer

### 3.1 Order Analytics
- Implementation of `usp_loadOrderAnalytics` stored procedure
- Creation of order analytical tables for Streamlit dashboard
- Business metrics and KPIs

In [None]:
from snowflake import telemetry
from snowflake.snowpark.functions import sproc
from snowflake.snowpark import Session
# session.add_import("order_analytics.py")
@sproc(name="usp_loadOrderAnalytics", replace=True, is_permanent=True, stage_location='@CODEFILES',imports=["@CODEFILES/order_analytics.py"]
       ,packages=["snowflake-snowpark-python","pandas","snowflake-ml-python","snowflake-telemetry-python"])
def load_order_analytics(session: Session) -> None:
    from order_analytics import OrderAnalytics,main
    telemetry.set_span_attribute("usp_loadOrderAnalytics", "begin")
    telemetry.add_event("event_with_attributes", {"source_data": "Gold tables", "transformation": "order analytics"})
    main(session)

### 3.2 Customer Analytics
- Implementation of `usp_load_customer_analytics` stored procedure
- Creation of analytical tables for Streamlit dashboard

In [None]:
CREATE OR REPLACE PROCEDURE usp_load_customer_analytics()
RETURNS VARCHAR 
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10' 
ARTIFACT_REPOSITORY = snowflake.snowpark.pypi_shared_repository
artifact_repository_packages=('snowflake-snowpark-python[modin]','pymysql','snowflake-telemetry-python')
-- PACKAGES = ('snowflake-telemetry-python','snowflake-snowpark-python','modin','pymysql','snowflake-telemetry-python') -- Include necessary packages
-- imports=["@CODEFILES/customer_analytics.py","@CODEFILES/data_integration_for_analytics.py"],
IMPORTS = ('@CODEFILES/customer_analytics.py','@CODEFILES/data_integration_for_analytics.py','@CODEFILES/db_connections.py')
HANDLER = 'customer_analytics.run'
EXTERNAL_ACCESS_INTEGRATIONS = (DBMS_ACCESS_INTEGRATION)
SECRETS = ('mysql_cred'=MYSQL_DB_SECRET,'mysql_hostname'=MYSQL_HOSTNAME)
;


## 4. Pipeline Orchestration


### 4.1 Task DAG Implementation
- Creation of `usp_createTaskDAG` stored procedure
- Task dependencies and scheduling
- Pipeline monitoring and history



In [None]:
from snowflake import telemetry
from snowflake.snowpark.functions import sproc
from snowflake.snowpark import Session
# session.add_import("order_analytics.py")
@sproc(name="usp_createTaskDAG", replace=True, is_permanent=True, stage_location='@CODEFILES'
       ,imports=["@CODEFILES/create_task_DAG.py"],packages=["snowflake-snowpark-python","snowflake-telemetry-python"])
def create_task_dag(session: Session) -> None:
    from create_task_DAG import create_task_dag,resume_tasks
    telemetry.set_span_attribute("usp_createTaskDAG", "begin")
    telemetry.add_event("event_with_attributes", {"source_data": "Gold tables and Analytics","status":"create dag"})
    create_task_dag(session,'retail_root_task')
    telemetry.add_event("event_with_attributes", {"source_data": "Gold tables and Analytics","status":"resume dag"})
    resume_tasks(session,'retail_root_task')


In [None]:
session.call('usp_createTaskDAG')

Go to Tasks in Retail_Pipeline_DB database and run the graph manually before going to next step.






## 5. Dashboard
- Creation of Streamlit App
- Implementation of `usp_saveChartsToStage` stored procedure which is called from the streamlit app to save specific chart as html file


In [None]:

from snowflake.snowpark.functions import sproc
from snowflake.snowpark import Session
from snowflake.snowpark.types import StringType
@sproc(name="usp_saveChartsToStage", replace=True, is_permanent=True, stage_location='@CODEFILES'
       ,imports=["@CODEFILES/save_charts_to_stage.py"],packages=["snowflake-snowpark-python"],input_types=[StringType()])
def save_charts_to_stage(session: Session,html_file_name:str) -> None:
    from save_charts_to_stage import main
    main(session,html_file_name)


In [None]:
CREATE OR REPLACE FUNCTION UDF_SAVE_CUSTOMER_PREF_CHART_HTML("CSVFILE" VARCHAR)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
PACKAGES = ('snowflake-snowpark-python','altair')
HANDLER = 'read_write'
AS '
from snowflake.snowpark.files import SnowflakeFile

import pandas as pd
import altair as alt
import sys
import io

  
def read_write(csvfile):
  dst = SnowflakeFile.open_new_result("w")

  with SnowflakeFile.open(csvfile, ''rb'') as f:
    df = pd.read_csv(f)
    df.columns = df.columns.str.lower()
    
    
  bar_chart = alt.Chart(df).mark_bar().encode(
      x=alt.X(''preference'', title=''preference'', sort=''-y''),
      y=alt.Y(''count'', title=''count''),
      color=''preference'',
      tooltip=[''preference'', ''count'']
  ).properties(
      width=1200,
      height=900
  ).configure_axis(
      labelFontSize=12,
      titleFontSize=14
  ).configure_legend(
      titleFontSize=14,
      labelFontSize=12
  ).configure_title(
      fontSize=16
  )
  # Save the chart to a string buffer
  chart_buffer = io.StringIO()
  bar_chart.save(chart_buffer, format=''html'')
  chart_html = chart_buffer.getvalue()
  dst.write(chart_html)
  return dst


';

In [None]:
CREATE OR REPLACE STREAMLIT RETAIL_ANALYTICS_DASHBOARD
  FROM @CODEFILES
  MAIN_FILE = 'dashboard.py'
  QUERY_WAREHOUSE = retail_wh
  ;


### After you create the Streamlit App from the Snowsight go to Streamlit option under Projects open the RETAIL_ANALYTICS_DASHBOARD streamlit app.

## 6. Cleanup 

In [None]:
# Dropping the Task Graph created

root_task_name='retail_root_task'
session.sql(f'alter task {root_task_name} suspend').collect()
res= session.sql(f''' select name
    from table(information_schema.task_dependents(task_name => '{root_task_name}', recursive => true))''').collect()
for r in res:
    session.sql(f'drop task {r.NAME}').collect()
    print(f'Dropped Task {r.NAME}')

In [None]:
select current_role()

In [None]:
Drop DATABASE RETAIL_PIPELINE_DB;


In [None]:
--  Dropping the Event Table Database

USE ROLE ACCOUNTADMIN;
ALTER ACCOUNT UNSET EVENT_TABLE;
DROP DATABASE central_log_trace_db;
DROP ROLE DE_DEMO_ROLE;
