In [1]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, StopWordsRemover}

In [2]:
val spark = SparkSession.builder.appName("intercom_tfidf").master("local[8]").getOrCreate

In [3]:
val documents = spark.sparkContext.wholeTextFiles("dataWithoutBot/*.txt")

Got 501 conversations from Intercom, used those for the PoC

In [45]:
documents.count

501

Did some cleaning of the data, getting rid of punctuation, unnecessary spacing, taking out stopwords(they would skew the results), making everything lowercase and then tokenizing, which is just turning them into a list of words as opposed to a string.

In [52]:
def remove_multiple_spacing(inputString: String) =
    inputString.trim.replaceAll(" +", " ")

In [53]:
def remove_newline_characters(inputString: String) =
    inputString.filter(_ >= ' ')

In [54]:
//this messes with email addresses and links, shall i set those aside first?

def remove_punctuation_make_lowercase(inputString: String) =
    inputString.replaceAll("\\p{Punct}|\\d","").toLowerCase

In [55]:
def tokenizer(inputString: String) =
   inputString.split(" ").toSeq

In [56]:
val cleanDocuments = documents.map(_._2).map(remove_newline_characters).map(remove_multiple_spacing).map(remove_punctuation_make_lowercase).map(tokenizer).map(cleanConversation => (cleanConversation.head, cleanConversation.tail))

In [57]:
import spark.implicits._
val stopWordsRemover = new StopWordsRemover().setInputCol("value").setOutputCol("withoutStopWords")
val withoutStopWords = stopWordsRemover.transform(cleanDocuments.map(_._2).toDS) 

Then comes the TF-IDF. 

TF is Term Frequency. This is the number of times a word appears in a document(each conversation is a document) normalized by the number of words in the document. 

IDF is Inverse Document Frequency. This is the number of documents that contain a term normalized by the total number of documents.

Multiplying the 2 values gives us the TF-IDF of each term. This process also turns the conversations into vectors of the same length. With a mathematical representation of the conversations, we can now start using them in mathematical ways...read ML.

In [63]:
val countVectorizer = new CountVectorizer().setMinDF(40).setInputCol("withoutStopWords").setOutputCol("term_frequency")
val countVectorizerModel = countVectorizer.fit(withoutStopWords)

In [59]:
val vectorizedDocuments = countVectorizerModel.transform(withoutStopWords)

In [36]:
vectorizedDocuments.cache()
val idf = new IDF().setInputCol("term_frequency").setOutputCol("tfidf").fit(vectorizedDocuments)
val tfidf = idf.transform(vectorizedDocuments)

In [37]:
val idfScoresList = idf.idf.toString.drop(1).dropRight(1).split(",").toList

In [38]:
val vocabularyList = countVectorizerModel.vocabulary.toList

In [39]:
val idfScoresLookUp = vocabularyList zip idfScoresList

I ordered the terms in descending order of TF-IDF scores and printed the top 100. Remember, this score tells us what terms are deemed most important/relevant to the subject matter.

In [67]:
import org.apache.spark.sql.functions._
idfScoresLookUp.toDF("word", "idf_score").orderBy(desc("idf_score")).show(100)

+------------+------------------+
|        word|         idf_score|
+------------+------------------+
|         way|2.5050280529874214|
|        take|2.5050280529874214|
|      revert|2.5050280529874214|
|     details|2.5050280529874214|
|applications|2.5050280529874214|
|        paid|2.5050280529874214|
|     talking|2.5050280529874214|
|     mondays| 2.480930501408361|
|   thursdays| 2.480930501408361|
|     provide| 2.480930501408361|
|    caroline| 2.480930501408361|
|        much| 2.457400003998167|
|       issue| 2.457400003998167|
|         vat| 2.434410485773468|
|       admin| 2.434410485773468|
|    platform| 2.434410485773468|
|      client| 2.434410485773468|
|        user|2.4119376299214093|
|   integrate|2.4119376299214093|
|         new|2.4119376299214093|
|         pay|2.4119376299214093|
|   shortcode|2.4119376299214093|
|    payments|2.4119376299214093|
|    possible|2.3684525179816704|
|     charges|2.3684525179816704|
|          go| 2.347399108783838|
|      trying|

# Conclusion

Output is pretty encouraging at this point. I first looked to see if the various products would emerge. Spotted payments, ussd and bulk. Plus certain terms that are commonly used when talking about those products. I see paybill, mpesa -> payments. I see shared -> ussd. Also spotted sandbox.

Something else I thought really cool is the prominence of mention of Monday and Thursday. These are th 2 days when sender IDs are raised. To be honest, even I didn't know this until I asked MC if anything outsanding happens on these days. Points to loads of customer enquiries possibly. Also tells us TF-IDF is a good place to start.

# Moving forward

I'm going to turn this into a more organised pipeline in preparation for the rest of the data. We have about 20,000 intercom conversation and I did this with only 500, so we should be able to get more intuitive results when we scale this.

Also did some reading on the what we are trying to achieve which is essentially 'Topic Modelling'.
I'd like to try clustering the documents before getting the TF-IDF scores. I want to see what vocabulary will emerge around the different products, if the data even clusters in a product-wise manner to start with, and then compare that method with getting TF-IDF scores from the entire corpus.
Once we are satisfied with the results of the TF-IDF, then we can train a classification model that can do real-time tagging of the conversations as they are happening and see how that goes.