<span style="color:red">**Insert Project Token (us the menu in the top right corner). If you don't complete this step, you will get an error in write_data_by_product_line()**</span>

# Use Databand SDK with Spark data pipelines
**This notebook provides sample code for using Databand SDK**

In [None]:
# Run once to install dbnd library
!pip install dbnd-spark

In [None]:
# Import Databand libraries
from dbnd import dbnd_tracking, task, dataset_op_logger

<span style="color:red">**Make sure to replace code as documented in comments.**</span>

In [None]:
# Global variables

# TODO: Update url and token

databand_url = 'insert_url'
databand_access_token = 'insert_token'

# Data used in this pipeline
RETAIL_FILE = "https://raw.githubusercontent.com/elenalowery/data-samples/main/Retail_Products_and_Customers.csv"

# TODO: Provide a unique suffix that will be added to various assets tracked in Databand. We use this approach because
# in a workshop many users are running the same sample pipelines. For example '_mi'
unique_suffix = '_mi'

In [None]:
@task
def read_raw_data():
    
    import ibmos2spark, os
    from pyspark.sql import SparkSession
    
    
    # TODO: Replace code in this section with the code you generate with Insert to Code (for the Retail Products and Customers file)
    # The easiest way to accomplish this task is to insert a cell above and generate code in that cell
    
    import ibmos2spark, os
    # @hidden_cell
    credentials = {
        'endpoint': 'https://s3.private.us.cloud-object-storage.appdomain.cloud',
        'service_id': 'iam-ServiceId-8f035f08-7edb-4e66-b1f4-8c8ff3c2b837',
        'iam_service_endpoint': 'https://iam.cloud.ibm.com/oidc/token',
        'api_key': '4Ink9MxVC7S1QEhnzxfqljmw9WtilM6bSxBJesS'
    }

    configuration_name = 'os_1e498447d5f74cd6b90b92b35bb6514e_configs'
    cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    retailData = spark.read\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option('header', 'true')\
      .load(cos.url('Retail_Products_and_Customers.csv', 'datapipelines-donotdelete-pr-wb8fasnklfoldc'))
    retailData.take(5)
    
    # End Replace section
    
    # Log the data read
    with dataset_op_logger("CPDaaS://Weekly_Sales/Retail_Products_and_Customers.csv", "read", with_schema=True, with_preview=True) as logger:
        logger.set(data=retailData)
    
    return retailData

In [None]:
@task
def filter_data(rawData):
    
    # Drop a few columns
    filteredRetailData = rawData.drop('Buy','PROFESSION','EDUCATION')
    
    with dataset_op_logger("script://WeeklySales/Filtered_spark_df", "read", with_schema=True, with_preview=True) as logger:
        logger.set(data=filteredRetailData)
    
    return filteredRetailData

In [None]:
@task
def write_data_by_product_line(filteredData):
    
    from pyspark.sql.functions import col

    # Select any product line - we will write it to a separate file
    campingEquipment = filteredData.filter(col("Product line")=="Camping Equipment")
    
    # Log the filtered data read
    with dataset_op_logger("CPDaaS://Weekly_Sales/Camping_Equipment.csv", "write", with_schema=True, with_preview=True) as logger:
        logger.set(data=campingEquipment)
    
    
    # Workaround - Spark dataframe write fails when using project lib, that's why we are converting it to pandas
    tempPandas = campingEquipment.toPandas()
    # Write the csv file
    project.save_data("Camping_Equipment.csv", tempPandas.to_csv(index=False), overwrite=True)
    
    # If the issue is fixed, then the code will look like this
    #project.save_data("CampingEquipment_spark.csv", campingEquipment.write.csv("CampingEquipment_spark.csv"), overwrite=True)
             
    
    # Select any product line
    golfEquipment = filteredData.filter(col("Product line")=="Golf Equipment")
    
    # Log the filtered data read
    with dataset_op_logger("CPDaaS://Weekly_Sales/Golf_Equipment.csv", "write", with_schema=True, with_preview=True) as logger:
        logger.set(data=golfEquipment)
    
    
    # Workaround - Spark dataframe write fails when using project lib, that's why we are converting it to pandas
    tempPandas = campingEquipment.toPandas()
    # Write the csv file
    project.save_data("Golf_Equipment.csv", tempPandas.to_csv(index=False), overwrite=True)
    
    # If the issue is fixed, then the code will look like this
    #project.save_data("GolfEquipment_spark.csv", campingEquipment.write.csv("GolfEquipment_spark.csv"), overwrite=True)

In [None]:
# Call and track all steps in a pipeline

def prepareRetailData():
    
    with dbnd_tracking(
            conf={
                "core": {
                    "databand_url": databand_url,
                    "databand_access_token": databand_access_token,
                }
            },
            job_name = "prepare_sales_data_spark",
            run_name = "weekly",
            project_name = "Retail Analytics",
    ):
    
        # Call the step job - read data
        rawData = read_raw_data()

        # Filter data
        filteredData = filter_data(rawData)

        # Write data by product line
        write_data_by_product_line(filteredData)

        print("Finished running the pipeline")


In [None]:
# Invoke the main function
prepareRetailData()