# Amazon tutorial
**Note**: This notebook runs on Python 3.6.

In this notebook we will show you:
- how to deploy a pipeline that selects the data and sends it to the model for prediction to UbiOps within seconds
- how to easily trigger the pipeline everyday and output the results

For this example we will use a model trained on (parts of) the amazon review dataset as taken from https://nijianmo.github.io/amazon/index.html*. The model takes in a written review of a product and outputs a classification of whether the review will be positive (1) or negative (0). This model can be used to automatically monitor the reviews of certain products. With using UbiOps, we can deploy a pipeline that is triggered every day that predicts and monitors the review score of products. If the score drops below a certain threshold, a signal is send, indicating that there may be something wrong with the product. 
This mechanism can of course be extended to other use cases involving textual (sentiment) analysis.


*Justifying recommendations using distantly-labeled reviews and fined-grained aspects
Jianmo Ni, Jiacheng Li, Julian McAuley
Empirical Methods in Natural Language Processing (EMNLP), 2019


## Establishing a connection with your UbiOps environment
Add your API token and project name. You can also adapt the deployment name and deployment version name or leave the default values. Afterwards we initialize the client library, which establishes the connection with UbiOps.

In [None]:
API_TOKEN = '<INSERT API_TOKEN WITH PROJECT EDITOR RIGHTS>'
PROJECT_NAME= '<INSERT PROJECT NAME IN YOUR ACCOUNT>'

# Import all necessary libraries
import shutil
import os
import ubiops


In [None]:
client = ubiops.ApiClient(ubiops.Configuration(api_key={'Authorization': API_TOKEN}, 
                                               host='https://api.ubiops.com/v2.1'))
api = ubiops.CoreApi(client)
api.service_status()

In [None]:
# Deployment configurations
DATA_COLLECTOR_DEPLOYMENT='data-collector'
AMAZON_REVIEW_DEPLOYMENT='amazon-review-model'
DEPLOYMENT_VERSION='v1'
deployments_list = [DATA_COLLECTOR_DEPLOYMENT, AMAZON_REVIEW_DEPLOYMENT]

# Pipeline configurations
PIPELINE_NAME = "amazon-pipeline"


If you run this entire notebook after filling in your access token, the pipeline will be deployed to your UbiOps (Free) environment. You can thus check your environment after running to explore. You can also check the individual steps in this notebook to see what was done exactly and how you can adapt it to your own use case.

We recommend to run the cells step by step, as some cells can take a few minutes to finish. You can run everything in one go as well and it will work, just allow a few minutes for building the individual deployments.

## Creating a pipeline in UbiOps

We will not focus in dept on the technicalities behind deploying a model or pipeline on UbiOps. However, we will show what the data collector deployment and amazon model's predict function looks like. You can then run the steps in this notebook to create the deployments, upload the code and connect them in a pipeline.
  

### Data collector

The first step of a machine learning pipeline often consists of a data collecting step. In our case, the data collector will take care of that. Data can be collected by connecting to a database or other data storage. However, we will read the data from a csv file. To simulate a real-life use case as close as possible, we only want to collect data from the given day, or, if that's not given, today's day. In the function below you can see what happens. 


In [None]:
# This section is taken directly from data-collector/deployment.py 

def return_days(n):
    return datetime.strptime(n, '%Y-%m-%d %H:%M:%S').strftime('%A')


class InvalidDateError(Exception):

    def __init__(self, day):
        self.message = f"No reviews found for day {day}"


def request(self, data):

    # Get today's day
    day = os.environ['DAY']  # This will be given through an environment variable
    if not day:
        day = datetime.today().strftime('%A')
    day = day.title()

    # Raise an error if the day is invalid (there's no data for saturday/sunday)
    if day not in ['Monday', 'Tuesday', 'Wednesday', 'Thursday' 'Friday']:
        raise InvalidDateError(day=day)

    # Read in the reviews as a dataframe
    reviews = pd.read_csv('reviews.csv')
    
    # Reformat the time of review into day_of_week so we can filter
    reviews['day_of_week'] = reviews['time_of_review'].apply(return_days)
    
    # Only collect data from `day`
    logger.info(f"Collecting data from {day}")
    day_reviews = reviews[reviews['day_of_week'] == day]

    logger.info(f"Data retrieved successfully, sending data to model")
    df_string = day_reviews.to_json()

    # Output the resulting dataframe (in JSON) to the model
    return df_string


### Amazon review model

The next deployment in the pipeline consists of the Amazon review model. This deployment takes care of predicting the review score and monitoring the outputted scores. It will predict a review score (0 or 1) on each product in the received data, take the average of the predicted scores and compare those with a threshold score. The threshold is the average score of yesterday or a certain set threshold (if yesterday was a weekend day). If the difference between the scores is too big, it wil give a signal in the logs. Lastly, the model outputs some metadata and a csv file with name "review_scores_{day}. csv" which can be inspected.

In [None]:
# This section is taken directly from amazon-review-model/deployment.py 
def _connect_api(api_token):
    client = ubiops.ApiClient(
        ubiops.Configuration(api_key={'Authorization': api_token}, host='https://api.ubiops.com/v2.1')
    )
    return ubiops.CoreApi(client)

def request(self, data):

    # Load the data of the day into a dataframe
    df = pd.read_json(data)

    # From the dataframe we can get the day
    day = df['day_of_week'].iloc[0]

    # Let's get those predictions and take the averages of all review scores
    average_predictions = self.return_predictions(dataframe=df)

    # Get yesterday's day
    days = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]
    index = days.index(day)
    yester_day_index = index - 1
    yesterday = days[yester_day_index]

    # Then retrieve yesterday's average review scores
    # If those are not here (because it was a weekend day),
    # we will compare the predicted review score to a set threshold
    review_scores = self.retrieve_review_scores(yesterday=yesterday)

    below_threshold = 0
    products_below_threshold = []

    # Now we are ready to see if the review scores are stable
    # For every product in the predictions, compare the average review score to the threshold score
    for index, value in average_predictions.items():

        if review_scores:
            review_threshold = review_scores.loc[review_scores['product_id'] == index, 'predictions'].iloc[0]
        else:
            # Load in the minimal threshold for the review
            review_threshold = float(os.environ['THRESHOLD'])
            
        # Prompt a signal if the value drops significantly (0.2) below the threshold
        if value < (review_threshold - 0.2):
            below_threshold += 1
            product_title = df.loc[df['product_id'] == index, 'product_name'].iloc[0]
            products_below_threshold.append(product_title, )
            logger.error(f"Product with id {index}, has not met its review standards")

    # Output the review_scores to a csv, so it will be available for the next request.
    average_predictions.to_csv(f"review_scores_{day.lower()}.csv", index=True)

    # Return the results
    return {
        'total_products': len(average_predictions),
        'below_threshold': below_threshold,
        'products_below_threshold': products_below_threshold,
        'review_scores': f"review_scores_{day.lower()}.csv"
    }

def return_predictions(self, dataframe):

    # The test column of the data is the 'review' column
    x_test = dataframe['review']

    # Because our X test data is text, it has to be transformed to vectors to be able to make predictions on
    x_test_transformed = self.count_vectorizer.transform(x_test)

    # Let's predict and paste the resulting array right onto the existing dataframe
    dataframe['predictions'] = self.model.predict(x_test_transformed)

    # Generate a new series with the average of the predictions per product
    return dataframe.groupby('product_id')['predictions'].mean()

def retrieve_review_scores(self, yesterday):

    # Connect to APi to retrieve the file from UbiOps
    api = self._connect_api(api_token=os.environ['API_TOKEN'])

    # If the review score of yesterday is there retrieve it from UbiOps
    project_name = os.environ['PROJECT_NAME']
    blobs_list = api.blobs_list(project_name=project_name)
    review_scores = None

    for blob in blobs_list:
        if blob.filename == f"review_scores_{yesterday.lower()}.csv":
            response = api.blobs_get(project_name=project_name, blob_id=str(blob.id))
            review_scores = pd.read_csv(response)

    return review_scores

### Create two deployments

Let's create our data collector and our predictor model in UbiOps

In [None]:
# Create the data-collector deployment
# A structured input without input fields specified, signals to UbiOps that the deployment is a connector (it retrieves data)
deployment_template = ubiops.DeploymentCreate(
    name=DATA_COLLECTOR_DEPLOYMENT,
    description='Collect data from file according to day',
    input_type='structured',
    output_type='plain',
    input_fields=[],
    output_fields=[],
    labels={'demo': 'amazon-review-pipeline'}
)

api.deployments_create(project_name=PROJECT_NAME, data=deployment_template)

# Create the amazon-model deployment
deployment_template = ubiops.DeploymentCreate(
    name=AMAZON_REVIEW_DEPLOYMENT,
    description='Predicts on reviews of products, outputs if their review score is below a certain threshold',
    input_type='plain',
    output_type='structured',
    input_fields=[], # Plain type data does not require defined fields
    output_fields=[
        ubiops.DeploymentOutputFieldCreate(
            name='total_products',
            data_type='int'
        ),
        ubiops.DeploymentOutputFieldCreate(
            name='below_threshold',
            data_type='int'
        ),
        ubiops.DeploymentOutputFieldCreate(
            name='products_below_threshold',
            data_type='array_string'
        ),
        ubiops.DeploymentOutputFieldCreate(
            name='review_scores',
            data_type='blob'
        )
    ],
    labels={'demo': 'amazon-review-pipeline'}
)

api.deployments_create(project_name=PROJECT_NAME, data=deployment_template)

### Environment variables
The next step is to add some environment variables to the deployments. This allows us to give some global and/or secrect parameters to a deployment. We use environment variables to give a day (e.g. "monday") to our data-collector and to set a threshold for our amazon-review-model. We also give the API_TOKEN and PROJECT_NAME to the model, so it is able to connect to UbiOps to retrieve the review_scores of yesterday. 

In [None]:
api.deployment_environment_variables_create(
    project_name=PROJECT_NAME,        
    deployment_name=DATA_COLLECTOR_DEPLOYMENT,
    data= {
      "name": "DAY",
      "value": "monday",
      "secret": False
    }
)

api.deployment_environment_variables_create(
    project_name=PROJECT_NAME,        
    deployment_name=AMAZON_REVIEW_DEPLOYMENT,
    data= {
      "name": "THRESHOLD",
      "value": "0.7",
      "secret": False
    }
)

# Setting these environment variables allows the model to connect to 
# the UbiOps client libraries from within the model's code.
api.deployment_environment_variables_create(
    project_name=PROJECT_NAME,        
    deployment_name=AMAZON_REVIEW_DEPLOYMENT,
    data= {
      "name": "API_TOKEN",
      "value": API_TOKEN,
      "secret": True
    }
)

api.deployment_environment_variables_create(
    project_name=PROJECT_NAME,        
    deployment_name=AMAZON_REVIEW_DEPLOYMENT,
    data= {
      "name": "PROJECT_NAME",
      "value": PROJECT_NAME,
      "secret": False
    }
)

### Versions
It is possible to create multiple versions of the same deployment. Let's create the first version and deploy the models. 

In [None]:
# Create a version for both deployments
version_template = ubiops.VersionCreate(
    version=DEPLOYMENT_VERSION,
    language='python3.6',
    memory_allocation=1024,
    minimum_instances=0,
    maximum_instances=1,
    maximum_idle_time=1800 # = the model will wait for 30 minutes after the last request
)

for deployment_name in deployments_list:
    api.versions_create(
        project_name=PROJECT_NAME,
        deployment_name=deployment_name,
        data=version_template
    )

    # Upload a zipped deployment package
    file_upload_result =api.revisions_file_upload(
        project_name=PROJECT_NAME,
        deployment_name=deployment_name,
        version=DEPLOYMENT_VERSION,
        file=shutil.make_archive(f"{deployment_name}_package", 'zip', '.', deployment_name)
    )

    # Status of the version will be building
    version_status = api.versions_get(       
        project_name=PROJECT_NAME,        
        deployment_name=deployment_name,        
        version=DEPLOYMENT_VERSION    
    )    
    print(version_status.status)

Both models will now have been deployed to your UbiOps environment. Go ahead and take a look in the UI in the tab deployments to see it for yourself. 


### Attaching the deployments in a pipeline

Now that we have both deployments and all environment variables ready, we can create a pipeline and add our deployment objects to them. For this, first both deployments need to have passed the building stage.

In [None]:
from time import sleep
status1 = 'building'
status2 = 'building'
while (status1 not in ['available', 'unavailable']) or (status2 not in ['available', 'unavailable']):    
    version_status1 = api.versions_get(       
        project_name=PROJECT_NAME,        
        deployment_name=DATA_COLLECTOR_DEPLOYMENT,        
        version=DEPLOYMENT_VERSION    
    )    
    status1 = version_status1.status
    version_status2 = api.versions_get(       
        project_name=PROJECT_NAME,        
        deployment_name=AMAZON_REVIEW_DEPLOYMENT,        
        version=DEPLOYMENT_VERSION    
    )   
    status2 = version_status2.status
    sleep(1)

print(f"{DATA_COLLECTOR_DEPLOYMENT}: {status1}")
print(f"{AMAZON_REVIEW_DEPLOYMENT}: {status2}")

In [None]:
# The input of the pipeline is the same as the input of the first deployment (data-collector): structured type, without input fields
pipeline_template = ubiops.PipelineCreate(
    name=PIPELINE_NAME,
    description='Pipeline that retrieves reviews of Amazon products '
                'from a file and predicts the review score. It will '
                'give a signal if a product does not meet its threshold review score.',
    input_type='structured',
    input_fields=[],
    labels={'demo': 'amazon-review-pipeline'}
)

api.pipelines_create(project_name=PROJECT_NAME, data=pipeline_template)

We have a pipeline, now we just need to add our two components to it and connect it.

**IMPORTANT**: If you get an error like: "error":"Version is not available: The version is currently in the building stage"
Your model is not yet available and still building. 
Check in the UI if your model is ready and then rerun the block below.

In [None]:
# Add both deployments to the pipeline
for deployment_name in deployments_list:
    object_template = ubiops.PipelineObjectCreate(
        name=deployment_name,
        reference_name=deployment_name,
        version=DEPLOYMENT_VERSION
    )
    api.pipeline_objects_create(project_name=PROJECT_NAME, pipeline_name=PIPELINE_NAME, data=object_template)

In [None]:
# Connecting the components

# First connecting start --> data-collector
pipeline_source = ubiops.AttachmentSourcesCreate(
    source_name='pipeline_start', 
    mapping=[]
)

pipeline_attachment = ubiops.AttachmentsCreate(
    destination_name=DATA_COLLECTOR_DEPLOYMENT, 
    sources=[pipeline_source]
)

api.pipeline_object_attachments_create(
    project_name=PROJECT_NAME, 
    pipeline_name=PIPELINE_NAME, 
    data=pipeline_attachment
)

# Connection data-collector --> amazon-model
pipeline_source = ubiops.AttachmentSourcesCreate(
    source_name=DATA_COLLECTOR_DEPLOYMENT, 
    mapping=[]
)

pipeline_attachment = ubiops.AttachmentsCreate(
    destination_name=AMAZON_REVIEW_DEPLOYMENT, 
    sources=[pipeline_source]
)

api.pipeline_object_attachments_create(
    project_name=PROJECT_NAME, 
    pipeline_name=PIPELINE_NAME, 
    data=pipeline_attachment
)

## Request schedules 
Now that everything is set up, we can create a request schedule. Request schedules are handy when you want to kick off requests at certain specified times. They are easily set-up with a cron notation of the schedule.

In [None]:
schedule = ubiops.ScheduleCreate(
    name="amazon-request",
    object_type="pipeline",
    object_name=PIPELINE_NAME,
    schedule="0 8 * * 1-5", # Run pipeline every working day Mo-Fri at 08:00 UTC (that is 09:00 for the Netherlands)
    batch=True, # Set to True, so the requests + results will be stored for a week
    enabled=False
)
api.request_schedules_create(project_name=PROJECT_NAME, data= schedule)

## Making a request and exploring further
You can go ahead to the Web App and take a look in the user interface at what you have just built. If you want you can create a request to the pipeline with empty input, to see what happens.

So there we have it! We have made a pipeline that gets triggered each day with a model that monitors it's own predictions. You can use this notebook as a reference for your own pipeline and use case. Just adapt the code in the deployment packages and alter the input and output fields as you wish and you should be good to go. 

For any questions, feel free to reach out to us via the customer service portal: https://ubiops.atlassian.net/servicedesk/customer/portals