This notebook is intended to give a user a step-by-step walkthrough for creating a collection
with custom configuration, and ingesting docs using Python's requests and multiprocessing 
modules.  To run it, you must have valid service credentials for the "discods@us.ibm.com" 
functional ID.

__Note:__ This is not meant to be a rigorous or production-ready workflow.  It is simply meant
to facilitate quickly getting a new dataset into WDS.

In [None]:
import json
import requests
import multiprocessing
from numpy import array_split
import time
import pprint

pp = pprint.PrettyPrinter(indent=2, width=60)

In [None]:
# REQUIRED: Service credentials for functional ID "discods@us.ibm.com"

USERNAME = ''
PASSWORD = ''

### Helper functions

In [None]:
def get_enrichment_template():
  '''Returns a blank enrichment object template.'''
  
  return {
    'destination_field': '',
    'source_field': '',
    'enrichment': 'alchemy_language',
    'options': {}
  }

def render_configuration(config, name, enrichments, description=u''):
  '''
  Returns a populated configuration JSON. Enrichments can be either
  a dictionary representing a one-step workflow, or a list of
  such objects indicating multi-step workflow.
  '''
  
  config['name'] = name
  config['description'] = description  
  
  if type(enrichments) is dict:
    enrichments = [enrichments]
  config['enrichments'] = enrichments
  
  return config

def doc_generator(filename):
  '''Yields a dict from line of file if valid JSON.  Else returns False.'''
  with open(filename, 'r') as f:
    for line in f:
      try:
        datum = json.loads(line)
        yield datum
      except:
        yield False
        
def doc_iterator(filename):
  '''
  Returns a list of JSON documents from newline separated file. We'll simply
  skip files that fail the JSON parse and let the user know which line it was.
  '''
  
  data = []
  with open(filename, 'r') as f:
    for ix, line in enumerate(f):
      try:
        data.append(json.loads(line))
      except:
        print('failed to parse line {}'.format(ix))
        pass
  return data
        
def delete(kind, guid, env_id=None):
  '''Returns response from attempting to delete a collection or configuration.'''
  
  # Check that we're trying to delete a supported thing.
  assert kind in ['collections', 'configurations']
  
  # Set default environment ID if none specified.
  if not env_id:
    env_id = ENVIRONMENT_ID
  
  # Form and populate the url.
  url = ('https://gateway.watsonplatform.net/discovery/api/v1/environments/' +
         '{}/{}/{}?version=2016-12-01').format(env_id, kind, guid)
  
  # Execute request and print response.
  res = requests.delete(url, auth=(USERNAME, PASSWORD)).json()
  print(res)
  
  return res

def insert(args):
  '''Insert a document into the collection.  Return False on error.'''
  
  # Create dictionary for multi-part request.
  files = {'file' : ('file', json.dumps(args[1]), 'application/json')}
  try:
    res = requests.post(args[0], files=files, auth=(USERNAME, PASSWORD))    
    return res.json()
  except:
    return False
  
def batch_insert(args, processes=4):
  '''
  Returns list of results from parallel insert. The i-th element
  will be False if the request throws an exception for the i-th doc,
  otherwise it will contain the JSON response (which might also
  indicate failure via status code or message fields).
  '''
  
  pool = multiprocessing.Pool(processes=processes)
  result = []
  try:
    result = map(insert, args)
  except:
    # If this happens, something outside of requests went wrong...
    print('fatal error in multiprocessing call')
  finally:
    # Regardless of outcome, shut down the pool correctly.
    pool.close()
    pool.join()
  
  return result

### Select an environment

In [None]:
# First let's get a list of our available environments.

url = 'https://gateway.watsonplatform.net/discovery/api/v1/environments?version=2016-12-01'
res = requests.get(url, auth=(USERNAME, PASSWORD)).json()
env_ids = [x['environment_id'] for x in res['environments']]

print(env_ids)

In [None]:
# The e627174c environment is for News.  Let's pick a different one.

ENVIRONMENT_ID = env_ids[1]

### Create a custom configuration

In [None]:
# Define the collection name and description.

NAME = u'arxiv'
DESCRIPTION = u'Standard configuration for the arxiv dataset.'

In [None]:
# Load a configuration template that we'll populate with info specific
# to this collection.

with open('config.json', 'r') as f:
  config_template = json.load(f)

In [None]:
# Define the enrichment array for this collection.  At minimum, set
# the "source_field", "destination_field" which will use default 
# enrichments.  Specific enrichments are specified in "options" object.

enrichment = get_enrichment_template()
enrichment[u'source_field'] = u'abstract'
enrichment[u'destination_field'] = u'enriched_abstract'
enrichment[u'options'] = {
  u'extract': u'keyword, concept, taxonomy'
}

pp.pprint(enrichment)

In [None]:
# Render the full configuration json.

custom_config = render_configuration(config_template, NAME, enrichment, 
  description=DESCRIPTION)

### Register configuration with your environment

In [None]:
# First let's get a list of the currently available configurations to make sure the
# new one has a unique "name" field.

config_url = ('https://gateway.watsonplatform.net/discovery/api/v1/environments/' + 
  '{env_id}/configurations?version=2016-12-01').format(env_id=ENVIRONMENT_ID)

res = requests.get(config_url, auth=(USERNAME, PASSWORD)).json()
pp.pprint(res)

In [None]:
# Try to register the new configuration and get its ID.  If failed, print response.

res = requests.post(config_url, json=custom_config, auth=(USERNAME, PASSWORD)).json()
try:
  CONFIGURATION_ID = res['configuration_id']
  print('New configuration ID: {}'.format(CONFIGURATION_ID))
except:
  print(res)

### Create a collection using the new configuration

In [None]:
# First let's list the collections that already exist.

collections_url = ('https://gateway.watsonplatform.net/discovery/api/v1/environments/' +
                   '{env_id}/collections?version=2016-12-01').format(env_id=ENVIRONMENT_ID)

res = requests.get(collections_url, auth=(USERNAME, PASSWORD)).json()
pp.pprint(res)

In [None]:
# This requires POST-ing a JSON object with some info about collection.

collection_info = {
  'name': NAME,
  'description': DESCRIPTION,
  'configuration_id': CONFIGURATION_ID
}

In [None]:
# Post the collection JSON and grab the collection ID if successful.

res = requests.post(collections_url, auth=(USERNAME, PASSWORD), json=collection_info).json()
try:
  COLLECTION_ID = res['collection_id']
  print('New collection ID: {}'.format(COLLECTION_ID))
except:
  pp.pprint(res)

### Insert documents

In [None]:
# If you're jumping in here, you'll need to manually specify the collection
# and configuration ID's manuall.  Otherwise they were set above (hopefully).

insert_url = ('https://gateway.watsonplatform.net/discovery/api/v1/' +
              'environments/{}/collections/{}/documents?version=2016-12-01' +
              '&configuration_id={}').format(ENVIRONMENT_ID, COLLECTION_ID, CONFIGURATION_ID)

In [None]:
# Create a document iterator for small datasets.  For larger datasets
# where memory is an issue you'll have to modify the code to use the
# doc_generator method instead.

# We assume here that the doc file contains newline-separated strings that
# can be parsed to JSON.

data = doc_iterator('arxiv-iso8602.json')
print('read {} documents'.format(len(data)))

In [None]:
# To use multiprocessing we need to zip the url with each document.

data = zip([insert_url]*len(data), data)
print(len(data))

In [None]:
# Get set up to ingest.

# Number of multiprocessing batches.
num_batches = 100

# Number of processes for multiprocessing.
processes = 8

# Precompute indices per batch.
slices = [list(x) for x in array_split(range(len(data)), num_batches)]

In [None]:
# Ingest!

results = []
start = time.time()

for batch in xrange(num_batches):
  ia, ib = slices[batch][0], slices[batch][-1]
  results.extend(batch_insert(data, processes=processes))
  elapsed = time.time() - start
  print('finished batch {}/{} ({:1.0f}%), {:.1f} s'.format(batch+1, num_batches, 
    (100.*batch+1)/(1.*num_batches), elapsed))

### Scratch cells

In [None]:
#res = delete('configurations', CONFIGURATION_ID)
#res = delete('collections', '13fa2ffc-26b7-4911-87fb-2e5afaf2b75d')

In [None]:
# import numpy as np
# x = range(10)
# [list(z) for z in np.array_split(x, 3)]

In [None]:
# # Query document details

# doc_url = ('https://gateway.watsonplatform.net/discovery/api/v1/environments/' +
#            '{}/collections/{}/documents/{}?version=2016-12-01')

# for r in results:
#   url_tmp = doc_url.format(ENVIRONMENT_ID, COLLECTION_ID, r['document_id'])
#   print(requests.get(url_tmp, auth=(USERNAME, PASSWORD)).json())