# Processing Flow

The processing flow is run on all of the review data once the models are all trained up and validated. This is the flow that will actually turn the raw data into processed output.

On each step, you will find a "Persistence Section" which is concerned with storing the information for use in the following steps. For the purpose of this exercise, we are using a Cloudant database instance. If you decide to use a different storage type, the Persistence section is the one you will need to modify.


# 1. Text Analysis

This step allows Alchemy to replace words in the sentences of the reviews by their semantic types (product, customer_service, company, etc); identify the sentiment of each sentence and the overall sentiment of a paragraph and identify the relationship between entities. 

The semantic types and relationship between entities were defined when the WKS model was trained and are usually associated with a given domain data.


In [None]:
import re
import os
import logging
import configparser
import cloudant
import nltk
import utils
import cloudanthelper as ch
from watson_developer_cloud import AlchemyLanguageV1

logger = logging.getLogger()
logger.setLevel(logging.ERROR)

#getting current directory
curdir = os.getcwd()
logger.debug(curdir)

#loading credentials from .env file
credFilePath = os.path.join(curdir,'..','.env')
config = configparser.ConfigParser()
config.read(credFilePath)
logger.debug(config.sections())
model_id = config['WKS']['WKS_MODEL_ID']
alchemy_api = AlchemyLanguageV1(api_key = 
                    config['ALCHEMY']['ALCHEMY_API_KEY'])
MAX_CHARS = 5024

def get_entities(review):
    """
    Get entities, along with sentiment, from Alchemy service.
    Input: text which contains entities.
    Output: json object with response from the service.
    """
    logger.debug(review)
    response = ''
    try:
        response = alchemy_api.entities(text=str(review), model=model_id, sentiment=True)
    except:
        logger.error("Error when getting entities.")
    logger.debug("Result from entities call: "+str(response))
    return response


def get_relations(review):
    """
    Get relations from Alchemy service.
    Input: text which contains relations between entities.
    Output: json object with response from the service.
    """
    split = {}
    #splitting long excerpts to send to the service
    if len(review) > MAX_CHARS:
        mid = utils.find_middle(review)
        while mid >= MAX_CHARS:
            mid = utils.find_middle(review[:mid])
        half = review[mid:]
        review = review[:mid]
        split = get_relations(half)
    #calling the Alchemy service
    try:
        response = alchemy_api.typed_relations(text=review, model=model_id)
        while response['status'] == 'ERROR':
            if 'language' in response:
                if response['language'] != 'english':
                    break
            time.sleep(60)
        #if excerpt was too long and split, make sure they are merged
        if split != {}:
            if 'typedRelations' in response and 'typedRelations' in split:
                response['typedRelations'] = response['typedRelations'] + split['typedRelations']
                response['text'] = response['text'] + split['text']
            elif 'typedRelations' in split and 'typedRelations' not in response:
                response['typedRelations'] = split['typedRelations']
        return response
    except:
        logger.error("Error when getting relations.")
    

def token_replacement_entities(review_text):
    """
    Replaces the identified tokens by their
        semantic types.
    Input: text to replace identified tokens.
    Output: sentences with tokens replaced by their
            semantic types.
    """
    review = get_relations(review_text)
    entity_info = get_entities(review_text)
    entity_info = utils.avg_sentiment(entity_info)
    entities = []
    sentences = []
    if 'entities' in entity_info:
        entities = entity_info['entities']
    try:
        sentences = nltk.tokenize.sent_tokenize(review_text)
    except:
        logger.error("Error when splitting text into sentences.")
    result = []
    sentence_dict = {}
    seq_no = 0
    i = seq_no
    entry = {}
    for sentence in sentences:
        entry = {}
        sentence_dict[sentence] = i - seq_no
        entry['sentence'] = sentence
        entry['seqno'] = i
        i += 1
        for entity in entities:
            token = entity['text']
            token = re.escape(token)
            token = re.sub(r'\\ ', ' ', token)
            if re.search(r'\b%s\b' % token, sentence) is not None:
                test = {}
                test['name'] = token
                if 'sentiment' in entity:
                    test['sentiment'] = [entity['sentiment']['type']]
                if entity['type'] in entry:
                    entry[entity['type']].append(test)
                else:
                    entry[entity['type']] = []
                    entry[entity['type']].append(test)
                count = int(entity['count'])
                classification = "<" + entity['type'] + ">"
                sentence = re.sub(r'\b%s\b' % token, classification, sentence, count=count)
                entry['replaced_sentence'] = sentence
        result.append(entry)
    #processing relationship between entities
    if review is not None and 'typedRelations' in review:
        types = review['typedRelations']
        if types != []:
            for text in types:
                temp_dict = {}
                temp_dict['hasrel'] = text['type']
                temp_dict['rel_name'] = text['arguments'][0]['entities'][0]['text']
                typed_entity = text['arguments'][0]['entities'][0]['type']
                temp_dict['second'] = text['arguments'][1]['entities'][0]['text']
                sentence = text['sentence']
                if sentence in sentence_dict:
                    entry = result[sentence_dict[sentence]]
                    if typed_entity in entry:
                        local = entry[typed_entity]
                        local.append(temp_dict)
                    else:
                        local = [temp_dict]
                        entry[typed_entity] = local

    final_result = [result, i]
    return final_result

##---------------------------PERSISTENCE SECTION ------------------------------##

#Initializing Cloudant client
client = ch.getConnection()
db = client[config['CLOUDANT']['CLOUDANT_DB']]

try:
    status = db['tracker']
except KeyError:
    "Tracker document not found."
    
#Making sure documents are not added multiple
#times to the database
for doc in db:
    if 'type' in doc and doc['type'] == 'replaced':
        doc.delete()
status['replace_switch'] = 0
status['replaced'] = []
status.save()

#Adding token replacement to an augmented version of the
#document so that history can be preserved
if utils.add_review(status) < 2:
    status['replace_switch'] = 1
    status.save()

for doc in db:
    if 'type' not in doc and doc['_id'] != 'tracker':
        new_doc = {}
        asin = doc['asin']
        if doc['_id'] not in status['replaced']:
            try:
                review_id = doc['_id']
                new_doc['review_id'] = review_id
                text = doc['reviewText']
                text = token_replacement_entities(text)
                new_doc['review'] = text
                new_doc['type'] = 'replaced'
                new_doc['asin'] = asin
                new_doc['helpful'] = doc['helpful']
                new_doc['title'] = doc['title']
                status['replaced'].append(review_id)
                db.create_document(new_doc)
                status.save()
            except:
                logger.error('Error saving token replaced document to Cloudant.')            
status['classify_switch'] = 1
status.save()

client.disconnect()

# 2. Creating views and indexes

NOTE: This step is required if:
    1. You are using Cloudant to store your data
    2. You want to execute each one of the following steps using Cloudant
    
To increase performance when using a Cloudant database, the use of views is recommended.

For the purpose of this exercise, four views are used and they should be created on the Cloudant tool. This speeds up the retrieval of documents allowing the application to get the needed data with a lower number of calls to the Cloudant service.

The views are:
    _design/names/replaced
    
    _design/names/classified
    
    _design/names/clustered
    
    _design/names/final
    
Please refer to the [service tutorial](https://cloudant.com/learning-center/#getstarted) to learn how to create views. The map functions that you should enter when creating the views are the following:

    - for replaced:
        function (doc) 
          {if(doc.type == "replaced")
            {emit(doc._id, doc);
          }
        }
    - for classified:
        function (doc) 
          {if(doc.type == "classified")
            {emit(doc._id, doc);
          }
        }
    - for clustered:
        function (doc) 
          {if(doc.type == "clustered")
            {emit(doc._id, doc);
          }
        }
    - for final:
        function (doc) 
          {if(doc.type == "final")
            {emit(doc._id, doc);
          }
        }

No reduce functions are needed.

Some steps perform queries to the database. These are specific to the products needed to be retrieved, so no views are created for those. Instead, in order to query the documents, indices should be created in the database. The inidices can be created in the Query section of the tools and should be for the document fields:
    "type"
    "asin"
    "review_id"
    
The content of the new index should look like:

    {
      "index": {
        "fields": [
          "type",
          "asin",
          "review_id"
        ]
      },
      "type": "json"
    }

# 3. Classification of reviews

This step uses the Natural Language Classifier (NLC) created on the Training notebook. It classifies a review and adds the result of the classification to a new document with type = 'classified' in the database.

In [None]:
import json
import os
import logging
import configparser
import cloudant
import cloudanthelper as ch
from watson_developer_cloud import NaturalLanguageClassifierV1


logger = logging.getLogger()
logger.setLevel(logging.ERROR)

#getting current directory
curdir = os.getcwd()
logger.debug(curdir)

#loading credentials from .env file
credFilePath = os.path.join(curdir,'..','.env')
config = configparser.ConfigParser()
config.read(credFilePath)
NLC_USERNAME = config['NLC']['NLC_USERNAME']
NLC_PASSWORD = config['NLC']['NLC_PASSWORD']
NLC_CLASSIFIER = config['NLC']['NLC_CLASSIFIER']

#initializing classifier object
nlc = NaturalLanguageClassifierV1(username=NLC_USERNAME, 
                                  password=NLC_PASSWORD)

def classify(review):
    """
    Classifies a sentence based on the trained classifier.
    Input: text to classify.
    Output: top class returned by the service.
    """
    logger.debug(review)
    #Classify sentence
    try:
        response = nlc.classify(NLC_CLASSIFIER, review)
        logger.debug(response)
        return response['top_class']
    except:
        logger.error('Failed at sentence classification')
        return 'no class'


##---------------------------PERSISTENCE SECTION ------------------------------##

#Initializing Cloudant client
client = ch.getConnection()
db = client[config['CLOUDANT']['CLOUDANT_DB']]
    
#Making sure documents are not added multiple
#times to the database
try:
    status = db['tracker']
except KeyError:
    "Tracker document not found."
    
classified = ch.getResultsfromView("classified", "names", db)
for doc in classified.result:
    doc = doc['value']
    doc = ch.convertToDocument(db, doc['_id'])
    doc.fetch()
    doc.delete()
status['cluster_switch'] = 0
status['classified'] = []
status.save()
    
if utils.add_review(status) == 2:
    replaced = ch.getResultsfromView("replaced", "names", db)
    for doc in replaced.result:
        doc = doc['value']
        rev_id = doc['review_id']
        if rev_id not in status['classified']:
            doc_id = doc['_id']
            del doc['_id']
            new_doc['type'] = 'classified'
            new_doc['review_id'] = doc['review_id']
            new_doc['asin'] = doc['asin']
            new_doc['review'] = doc['review']
            new_doc['helpful'] = doc['helpful']
            new_doc['title'] = doc['title']
            for line in new_doc['review'][0]:
                logger.debug(doc)
                try:
                    if('replaced_sentence' in line):
                        sentence = line['replaced_sentence']
                    else:
                        sentence = line['sentence']
                    if len(sentence) < 1024:
                        line['layer1type'] = classify(sentence)
                    else:
                        line['layer1type'] = 'Sentence too long to Classify'
                except:
                    logger.error('Error classifying review.')
            status['classified'].append(doc_id)
            db.create_document(new_doc)
            status.save()
    status['cluster_switch'] = 1

client.disconnect()


# 4. Clustering features

This step allows features of a given product that are semantically similar to be grouped together. They are listed with the set of synonyms and the respective sentiment expressed by the reviewers when talking about them.

Please not that this step is only required in order to have the data needed to populate the application presented in the demo. Feel free to skip this and the next steps if you already accomplished what you wanted with your data on the previous steps.

In [None]:
import cloudant
import logging
import os
import json
import configparser
import utils
import cloudanthelper as ch
from cloudant.query import Query

logger = logging.getLogger()
logger.setLevel(logging.ERROR)

#getting current directory
curdir = os.getcwd()
logger.debug(curdir)

#loading credentials from .env file
credFilePath = os.path.join(curdir,'..','.env')
config = configparser.ConfigParser()
config.read(credFilePath)

#Please provide the path to the word2vec model you created. 
#The path provided by default points to the available sample
#model file
W2V_MODEL = os.path.join(curdir,'..','data','sample_model.bin')


def cluster(asin, doc, db):
    """
    Cluster features based on their word vector similarity.
    Input: document containing features and database connection.
    Output: json object to be added to database document.
    """
    query = Query(db, selector={'asin': asin, 'type':'replaced'},fields=["helpful", "title", "review_id"])
    name = query.result[0][0]['title']
    rev_id = []
    helpful={}
    for data in query.result:
        rev_id.append(data['review_id'])
        if 'helpful' in data:
            helpful[data['review_id']]=data['helpful'][0]
        else:
            helpful[data['review_id']]=0
    temp = {}
    keys = []
    local_dump = {}
    for rev in rev_id:
        query_id = Query(db, selector={'review_id':rev, 'type':'classified'})
        for i in query_id.result:
            if len(query_id.result[0]) == 0:
                continue
        for res in query_id.result[0]:
            text = res['review']
            local_dump[res['review_id']] = text
            for obj in text[0]:
                if 'Feature' in obj:
                    feature = obj['Feature']
                    for data in feature:
                        if 'name' in data:
                            temp = {}
                            temp['word'] = data['name']
                            if 'sentiment' in data:
                                temp['sentiment'] = data['sentiment']
                            else:
                                temp['sentiment'] = ['neutral']
                            temp['review_id'] = res['review_id']
                            temp['sentence_id'] = obj['seqno']
                            keys.append(temp)

    model = word2vec.Word2Vec.load_word2vec_format(W2V_MODEL, binary=True)
    [vecs, mapping] = utils.generate_vectors(keys, model)
    featureDict = {}
    if len(vecs) > 0:
        clusters = utils.cluster_try(vecs)
        cluster_data = []
        features = utils.create_json(clusters, cluster_data, mapping, keys, helpful, local_dump)
        features = sorted(features, key=lambda k: k['keyword_count'], reverse=True)
        featureDict['features'] = features[:10]
    featureDict['product_name'] = name
    return featureDict

##---------------------------PERSISTENCE SECTION ------------------------------##

#Initializing Cloudant client
client = ch.getConnection()
db = client[config['CLOUDANT']['CLOUDANT_DB']]

#Making sure documents are not added multiple
#times to the database
try:
    status = db['tracker']
except KeyError:
    "Tracker document not found."
    
clustered = ch.getResultsfromView("clustered", "names", db)
for doc in clustered.result:
    doc = doc['value']
    doc = ch.convertToDocument(db, doc['_id'])
    doc.fetch()
    doc.delete()
status['final_switch'] = 0
status['clustered'] = []
status.save()

if utils.add_review(status) == 3:
    classified = ch.getResultsfromView("classified", "names", db)
    for doc in classified.result:
        doc = doc['value']
        rev_id = doc['review_id']
        del doc['_id']
        review = db[rev_id]
        asin = review['asin']
        if asin not in status['clustered']:
            processed = cluster(asin, doc, db)
            #processed['review_id'] = doc['review_id']
            #processed['review'] = doc['review']
            processed['product_id'] = asin
            processed['type'] = 'clustered'
            status['clustered'].append(asin)
            db.create_document(processed)
            status.save()
    status['final_switch'] = 1

client.disconnect()    

# 4. Aggregating information

This step creates a json with the compiled information about feature clustering. It will allow the sentences that are about specific features of a product to be grouped together and assess the sentiment associated with it.

It compiles the sentences based on features that were identified as synonyms by the similarity between their word vectors and saves the information to the database with type = 'final'.

In [None]:
import json
import utils
import logging
import os
import configparser
import cloudanthelper as ch
from cloudant.query import Query
from cloudant.client import Cloudant

logger = logging.getLogger()
logger.setLevel(logging.ERROR)

#getting current directory
curdir = os.getcwd()
logger.debug(curdir)

#loading credentials from .env file
credFilePath = os.path.join(curdir,'..','.env')
config = configparser.ConfigParser()
config.read(credFilePath)

##---------------------------PERSISTENCE SECTION ------------------------------##

#Initializing Cloudant client
client = ch.getConnection()
db = client[config['CLOUDANT']['CLOUDANT_DB']]

#Making sure documents are not added multiple
#times to the database
try:
    status = db['tracker']
except KeyError:
    "Tracker document not found."
    
final = ch.getResultsfromView("final", "names", db)
for doc in final.result:
    doc = doc['value']
    doc = ch.convertToDocument(db, doc['_id'])
    doc.fetch()
    doc.delete()
status['finished_switch'] = 0
status['final'] = []
status.save()

if utils.add_review(status) == 4:
    clustered = ch.getResultsfromView("clustered", "names", db)
    for doc in clustered.result:
        doc = doc['value']
        name = doc['product_name']
        asin = doc['product_id']
        if asin not in status['final']:
            del doc['_id']
            final = {}
            final = utils.make_final(db, doc)
            final['type'] = 'final'
            final['product_id'] = asin
            final['product_name'] = name
            status['final'].append(asin)
            db.create_document(final)
            status.save()
    status['finished_switch'] = 1
    status.save()
    
client.disconnect()

# Summary

At this point you should have been able to:

1. Analyze the data you will be using for the application by:
    - Replacing words by their semantic types;
    - Identifying relationships between entities;
    - Identifying the sentiment associated with the entities;
2. Classify sentences of your data based on the NLC model trained previously;
3. Cluster sentences, along with their overall sentiments, associated with the entity classes you trained on your
    linguistic model;
4. Deploy the sample app locally pointing to the Cloudant instance you created (if you took this route), or to 
    use your own developed UI to point to your persisted data.