## Overview

This notebook shows how to use the Databand SDK to observe pipeline execution in Databand. The notebook assumes that the **Spark Listener** has been configured on the cluster that's used to run this notebook. Spark Listener configuration contains URL and credentials for connecting to Databand. It also provides automatic tracking of datasets.

In [0]:
# Install the SDK
!pip install dbnd-spark

In [0]:
# Import Databand libraries
from dbnd import dbnd_tracking, task, dataset_op_logger

In [0]:
# Global variables

# databand_url = 'insert_url'
# databand_access_token = 'insert_token'

# Provide a unique suffix that will be added to various assets tracked in Databand. We use this approach because
# in a workshop many users are running the same sample pipelines. For example '_mi'
unique_suffix = '_mi'

In [0]:
@task
def read_raw_data():
  
  # Log the data read
  with dataset_op_logger("/FileStore/tables/Retail_Products_and_Customers.csv", "read", with_schema=True, with_preview=True) as logger:
    retailData = spark.read.csv("/FileStore/tables/Retail_Products_and_Customers.csv", inferSchema=True, header=True, sep=",")
    logger.set(data=retailData)
  
  retailData.show()
    
  return retailData

In [0]:
@task
def filter_data(rawData):
    
    # Get customers with medium LTV    
    filteredRetailData = rawData.where("LTV = 'MEDIUM VALUE'")
    
    filteredRetailData.show()
    
    return filteredRetailData

In [0]:
@task
def write_data_by_state(filteredData):
    
    from pyspark.sql.functions import col

    oregonSales = filteredData.where("State = 'Oregon'") 
    arizonaSales = filteredData.where("State = 'Arizona'")
    
    # Write filtered data
    with dataset_op_logger("dbfs:/retail_data/us/oregon/oregon_sales.csv", "write", with_schema=True, with_preview=True) as logger: 
      oregonSales.write.format("csv").mode("overwrite").save("dbfs:/retail_data/us/oregon/oregon_sales.csv")
      logger.set(data=oregonSales)
      
    with dataset_op_logger("dbfs:/retail_data/us/arizona/arizona_sales.csv", "write", with_schema=True, with_preview=True) as logger: 
      oregonSales.write.format("csv").mode("overwrite").save("dbfs:/retail_data/us/arizona/arizona_sales.csv")
      logger.set(data=arizonaSales)
             

In [0]:
def prepare_retail_data():
    
    with dbnd_tracking(
            conf={
                "core": {
                    "databand_url": databand_url,
                    "databand_access_token": databand_access_token,
                }
            },
            job_name="prepare_sales_data_spark_with_logger" + unique_suffix,
            run_name="weekly",
            project_name="Retail Analytics" + unique_suffix,
    ):
        
        # Call the step job - read data
        rawData = read_raw_data()

        # Filter data
        filteredData = filter_data(rawData)

        # Write data by product line
        write_data_by_state(filteredData)

        print("Finished running the pipeline")


# Invoke the main function
prepare_retail_data()