## Scoring Big Datasets - Batch Prediction API

### Scope

The scope of this notebook is to provide instructions on how to use DataRobot's batch prediction script to get predictions out of a DataRobot deployed model

### Background

The Batch Prediction API provides flexible options for intake and output when scoring large datasets using the prediction servers you have already deployed. The API is exposed through the DataRobot Public API and can be consumed using a REST-enabled client or the DataRobot Python Public API bindings.

The main features of the API include:

- Flexible options for intake and output.
- Support for streaming local files and the ability to start scoring while still uploading—while simultaneously downloading the results.
- Ability to score large datasets from, and to, Amazon S3 buckets.
- Connection to external data sources using JDBC with bidirectional streaming of scoring data and results.
- A mix of intake and output options; for example, scoring from a local file to an S3 target.
- Protection against prediction server overload with a concurrency control level option.
- Inclusion of Prediction Explanations (with an option to add thresholds).
- Support for passthrough columns to correlate scored data with source data.
- Addition of prediction warnings in the output.

(If you are a DataRobot customer, you can find more information in the in-app documentation for "Batch Prediction API.")

### Requirements

- Python version 3.7.3
-  DataRobot API version 2.19.0. 

Small adjustments might be needed depending on the Python version and DataRobot API version you are using.

Full documentation of the Python package can be found here: https://datarobot-public-api-client.readthedocs-hosted.com/en/

It is assumed you already have a DataRobot <code>Deployment</code> object.

#### Scoring a local CSV file

For the below script to work, you will have to provide DataRobot with the <code>api_key</code> (of your account), the <code>deployment_id</code>, and the <code>api_endpoint</code> (which should be https://app.datarobot.com/api/v2/batchPredictions/ for NA Managed AI Cloud users and https://app.eu.datarobot.com/api/v2/batchPredictions/ for EU Managed AI Cloud users).

Lastly, you need to provide the CSV file that will be used as input.

By default, DataRobot will expect a CSV file with this format:

- delimiter Default: , (comma).
- quotechar: "
- encoding: utf-8

In [None]:
import time
import requests

api_key = '...'
api_endpoint = 'https://app.datarobot.com/api/v2/batchPredictions/'

deployment_id = '...'

csv_input_file = './to_predict.csv'
csv_output_file = './predicted.csv'

session = requests.Session()
session.headers = {
    'Authorization': 'Bearer {}'.format(api_key),
}

job_details = {
    'deploymentId': deployment_id,
    'intakeSettings': {'type': 'local_file'},
    'outputSettings': {'type': 'local_file'},
}

# Initialize the job
resp = session.post(api_endpoint, json=job_details)
resp.raise_for_status()

job = resp.json()

print("created job: {}".format(job['links']['self']))

# Feed the CSV data
with open(csv_input_file, 'rb') as f:
    resp = session.put(job['links']['csvUpload'], data=f, headers={
        'Content-Type': 'text/csv'
    })
    resp.raise_for_status()

# Wait for processing to start
while not (job['status'] == 'ABORTED' or job['links'].get('download')):

    time.sleep(5)

    resp = session.get(job['links']['self'])
    resp.raise_for_status()
    job = resp.json()

if job['status'] == 'ABORTED':

    print('failed to complete batch predictions: {}', job['statusDetails'])

else:

    # Download the results
    resp = session.get(job['links']['download'], stream=True)
    resp.raise_for_status()
    with open(csv_output_file, 'wb') as f:
         for chunk in resp.iter_content(chunk_size=8192):
             if chunk:
                 f.write(chunk)

#### Requesting Prediction Explanations
In order to get Prediction Explanations alongside predictions, you need to change the job configuration.

In [None]:
job_details = {
    'deploymentId': deployment_id,
    'intakeSettings': {'type': 'local_file'},
    'outputSettings': {'type': 'local_file'},
    'maxExplanations': 10,
    'thresholdHigh': 0.5,
    'thresholdLow': 0.15,
}

#### Custom CSV Format
If your CSV file does not match the custom CSV format, you can modify the expected CSV format by setting <code>csvSettings</code>:

In [None]:
job_details = {
    'deploymentId': deployment_id,
    'intakeSettings': {'type': 'local_file'},
    'outputSettings': {'type': 'local_file'},
    'csvSettings': {
        'delimiter': ';',
        'quotechar': '\'',
        'encoding': 'ms_kanji',
    },
}

#### End-to-end scoring of CSV files on S3 using Python requests

To score data that sits in an S3 Bucket:

In [None]:
import requests

api_key = '...'
api_endpoint = 'https://app.datarobot.com/api/v2/batchPredictions/'

deployment_id = '...'
credential_id = '...'

s3_csv_input_file = 's3://my-bucket/data/to_predict.csv'
s3_csv_output_file = 's3://my-bucket/data/predicted.csv'

session = requests.Session()
session.headers = {
    'Authorization': 'Bearer {}'.format(api_key),
}

job_details = {
    'deploymentId': deployment_id,
    'intakeSettings': {
        'type': 's3',
        'url': s3_csv_input_file,
        'credentialId': credential_id,
    },
    'outputSettings': {
        'type': 's3',
        'url': s3_csv_output_file,
        'credentialId': credential_id,
    }
}

# Send the job
resp = session.post(api_endpoint, json=job_details)
resp.raise_for_status()

job = resp.json()

print('queued batch job: {}'.format(job['links']['self']))

You can poll the status (self) endpoint to check for progress or wait until the job is done:

In [None]:
while job['status'] not in {'COMPLETED', 'ABORTED'}:

    time.sleep(5)
        
    resp = session.get(job['links']['self'])
    resp.raise_for_status()

    job = resp.json()

print('job finished with status: {}'.format(job['status']))

#### Prediction Explanations

You can include Prediction Explanations by adding the desired Prediction Explanation parameters to the job configuration:

In [None]:
job_details = {
    'deploymentId': deployment_id,
    'intakeSettings': {
        'type': 's3',
        'url': s3_csv_input_file,
        'credentialId': credential_id,
    },
    'outputSettings': {
        'type': 's3',
        'url': s3_csv_output_file,
        'credentialId': credential_id,
    }
    'maxExplanations': 10,
    'thresholdHigh': 0.5,
    'thresholdLow': 0.15,
}

#### End-to-end scoring from a JDBC PostgreSQL database using Python requests 
The following reads a scoring dataset from the table public.scoring_data and saves the scored data back to public.scored_data (assuming that table already exists).

In [None]:
import requests
import sys

api_key = '...'
api_endpoint = 'https://app.datarobot.com/api/v2/batchPredictions/'

deployment_id = '...'
credential_id = '...'
data_store_id = '...'

session = requests.Session()
session.headers = {
    'Authorization': 'Bearer {}'.format(api_key),
}

job_details = {
    'deploymentId': deployment_id,
    'intakeSettings': {
        'type': 'jdbc',
        'dataStoreId': data_store_id,
        'credentialId': credential_id,
        'table': 'scoring_data',
        'schema': 'public',
    },
    'outputSettings': {
        'type': 'jdbc',
        'dataStoreId': data_store_id,
        'credentialId': credential_id,
        'table': 'scored_data',
        'schema': 'public',
        'statementType': 'insert'
    }
}

# Send the job
resp = session.post(api_endpoint, json=job_details)

if resp.status_code > 299:
    print(resp.json())
    sys.exit(-1)

job = resp.json()

print('queued batch job: {}'.format(job['links']['self']))