In [None]:
### Set Spark Configurations

import sys, os

'''
1. Sets Spark Home
2. Sets Warehouse Location for Hive
'''

def fnInitConfig():
    ## Setting Path for SPARK_HOME
    os.environ["SPARK_HOME"] = "/apps/opt/applicaitons/spark-2.1.0-bin-hadoop2.7/"
    os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "python/lib"
    sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
    sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

    ## Context Configurations
    #warehouse_location = '/apps/opt/applicaitons/datasets/chatAnalysis/'
    
    from pyspark.sql import SparkSession
    global spark
    spark = SparkSession \
    .builder \
    .appName('chatCategorizer') \
    .config('spark.master','local[*]') \
    .enableHiveSupport() \
    .getOrCreate()
    #.config('spark.master','local[*]') \
    #.config('spark.cores.max', '8') \
    #.config('spark.driver.memory','12g') \
    #.config('spark.executor.cores', '7') \
    #.config('spark.driver.cores','2') \
    #.config("spark.sql.warehouse.dir", warehouse_location) \
    #.config('spark.master', 'spark://ip-10-34-42-44.ebiz.verizon.com:7077')
    
    return spark

spark = None
spark = fnInitConfig()  # returning a Spark session
print('Spark Session created...good to go')

### Parameters to pass

from datetime import datetime

path = str(sys.argv[1])
filename = str(sys.argv[1]).split('/')[-1].split('.')[0]
outputFolder = str(sys.argv[2])

# Hardcoding it for the time being-

path = "/apps/opt/applicaitons/datasets/Datasets/feb_echat_for_vdsi_masked3.csv"
outputFolder = "/apps/opt/applicaitons/datasets/NLP/SuvroChatAnalysis"
columnsList = ['chatID','customercontent', 'chatcategory'] ## Expects ChatID as first column and chatcontent a second
filename = 'SUVRO'

starttime = datetime.now()

### Creating Schema

from pyspark.sql.types import *

# StructType : The data type representing rows. A StructType object comprises a list of StructFields.

schema = StructType([
    StructField("chatID", StringType()), 
    StructField("customercontent", StringType()),
    StructField("chatcategory", StringType())
])

### Data Cleaning

from pyspark import SparkContext
import re
from string import strip
import numpy as np

def removeMasking(line):
    try:
        #convert content to string since Spark does not have replace method
        # line[1] is the "customercontent" column. One row at a time comes in.
        # line[2] is the "chatcategory" column. One row at a time comes in.
        a = line[1]
        b = line[2]
        #c = format(line[0], 'f')
        #c = float(line[0])
        
        #remove punctuations
        a = re.sub('[^a-zA-Z0-9]+', ' ', a.replace('*',"").replace('-',"")).strip()
        
        #remove multiple white spaces
        a = re.sub('\s+', ' ', a.replace('*',"").replace('-',"")).strip()
        
        # remove all spaces in chatcategory
        b = b.replace(" ", "")
        
        #the zero indicates no parsing error, 1 means ascii to unicode conversion error
        return (line[0], a, b, 0)
    except AttributeError:
        return (line[0], a, b, 1)
    
def fnReadData(path, columnsList, spark, schema):
    # returns a data frame with a custom schema created above.
    cc_df = spark.read.csv(path=path, inferSchema=False, header=True,schema=schema) 
    
    # converting the dataframe into an RDD. I think we can also omit the select statement.
    cc_df = cc_df.select(columnsList[0],columnsList[1], columnsList[2]).rdd
    
    # cc_df.map() : it returns a new RDD by applying a function to each element of this RDD
    # here x: is one row at a time which are the elements of the main RDD
    cc_df = cc_df.map(lambda x: removeMasking(x))
    
    # Filtering the one with no error
    cc_df = cc_df.filter(lambda x: x[3] == 0)
    return cc_df

cc_df = fnReadData(path, columnsList, spark, schema)
cc_map_df = fnReadData(path, columnsList, spark, schema)

print(cc_df.count())
endtime = datetime.now()
timeElapsed = (endtime-starttime).total_seconds()
print('Time Taken : ' + str(int(timeElapsed/60)) + ' mins')

## Sentiment Analysis

# give a sentiment intensity scores to sentences
from nltk.sentiment.vader import SentimentIntensityAnalyzer

sid = SentimentIntensityAnalyzer()

def sentScore(line):
    # return engagement id, chat, aggregate sentiment score and existing customer
    return (line[0], line[1], line[2], sid.polarity_scores(line[1])['compound'])

def fnSentimentAnalyzer(cc_df):
    cc_df = cc_df.map(lambda x: sentScore(x))
    return cc_df

cc_df = fnSentimentAnalyzer(cc_df)
print('---> Done Sentiment Analysis !!!')
print(cc_df.count())

endtime = datetime.now()
timeElapsed = (endtime-starttime).total_seconds()
print('Time Taken : ' + str(int(timeElapsed/60.0)) + ' min')

### POS Tagging

from nltk import pos_tag
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

##POS tagging
def returnNnAdj(line):
    nounList = []
    for x in line:
        if x[1] in ['NN', 'NNP', 'NNS', 'ADJ']:
            nounList.append(x[0])
    return nounList

##Tagging sentence as Complaint or Query
def sentLabel(x):
    if x <= 0.25:  # any thing less than equals to 0 is 'Complaint' (negative) and above it is a 'Query' (query)
        return 'complaint'
    else:
        return 'query'
    
##Removing Junk words 
def removeBoguswords(tokens):
    
    #these are responses by customers which are concatenated with the last or first word of another sentence
    patternsToSearch = ['hi', 'ok', 'okay', 'yes', 'customer', 'hello']
    
    stopEnglish = stopwords.words('english')
    
    #convert to lower case and remove stop words
    tokens = [upperCaseWord.lower() for upperCaseWord in tokens if upperCaseWord.lower() not in stopEnglish]

    for pattern in patternsToSearch:
        tokens = [word for word in tokens if not re.findall(pattern=pattern,string=word, flags=re.IGNORECASE)]       
    
    return tokens

def fnPOSTagger(cc_df):
    cc_df = cc_df \
                .filter(lambda x: x[1] != '') \
                .map(lambda x: (x[0], pos_tag(word_tokenize(x[1])), x[2], sentLabel(x[3]))) \
                .map(lambda x: (x[0], returnNnAdj(x[1]), x[2], x[3])) \
                .map(lambda x: (x[0],removeBoguswords(x[1]),x[2], x[3])) \
                .filter(lambda x: len(x[1]) > 2)
    return cc_df

# only selects the chat which has some values
# (chatid, pos of tokens, label)
# returns nouns and adjectives
# removing the bogus words from the tokens

### Calling posTagging.py
cc_df = fnPOSTagger(cc_df)
print('---> Done POS Tagging !!!')
print(cc_df.count())
endtime = datetime.now()
timeElapsed = (endtime-starttime).total_seconds()
print('Time Taken : ' + str(int(timeElapsed)/60.0) + ' min')

### TF-IDF

from pyspark.ml.feature import Tokenizer, IDF, CountVectorizer, IDFModel
import numpy as np
import pandas as pd

def fnTfidf(cc_df):
    
    ## TFIDF
    # type(preTfIdfSchema) -- pyspark.sql.types.StructType
    preTfIdfSchema = StructType([StructField(name = 'chatID', dataType = StringType()),
                                StructField(name = 'tokens', dataType = ArrayType(elementType = StringType())),
                                 StructField(name = 'chatcategory', dataType = StringType()),
                                StructField(name = 'label', dataType = StringType()), ])
    
    # type(cc_df.toDF(schema=preTfIdfSchema)) -- pyspark.sql.dataframe.DataFrame
    tagged_df = cc_df.toDF(schema=preTfIdfSchema)
    
    ## CountVectorizer
    # It takes tokens as an input and the output is Sparse Vector (which is a dictionary of the key : index (tokens) value : 
    # no. of times it has occurred)
    # countVec=SparseVector(270, {0: 1.0, 1: 1.0, 3: 1.0, 4: 1.0, 6: 1.0, 13: 1.0, 16: 1.0, 39: 2.0, 81: 1.0, 92: 1.0})
    cv = CountVectorizer(inputCol="tokens", outputCol="countVec", vocabSize=100000, minDF=10)
    cvModel = cv.fit(tagged_df)
    tagged_df = cvModel.transform(tagged_df)
    '''So, tagged_df contains the following columns-
    chatID, tokens, label (query / complaint), SparseVector
    [Row(chatID=u'e723769821540362226', 
    tokens=[u'name', u'fios', u'area', u'forge', u'thankswhen', u'problem', u'tv', u'service', u'thanks', 
    u'helpno', u'thank', u'name', u'chat', u'window'], label=u'query', 
    countVec=SparseVector(270, {0: 1.0, 1: 1.0, 3: 1.0, 4: 1.0, 6: 1.0, 13: 1.0, 16: 1.0, 39: 2.0, 81: 1.0, 92: 1.0}))]'''
    
    ## IDF : the input is the sparse vector calculated in the previous step
    idf = IDF(inputCol="countVec", outputCol="tfIdfVec") # Compute the Inverse Document Frequency (IDF) given a collection of documents.
    idfModel = idf.fit(tagged_df) # idfModel.idf creates a dense vector for each word (token). There are 270 tokens
    
    # cvModel.vocabulary has all the unique words. This is exactly same as the no. of words in the IDF.
    wordToIndexDict ={} # mapping of indices 0, 1, 2, ... with the words 'window', 'chat', 'internet'
    for i,j in enumerate(cvModel.vocabulary):
        wordToIndexDict[i] = j
    vecSize=len(cvModel.vocabulary)
    
    idfDict ={} # mapping of the indices 0, 1, 2, ... with the IDF score  0.4291, 0.5971, 1.1868 ...
    for i,j in enumerate(idfModel.idf):
        idfDict[i] = j
    
    # Now we need to tie up the words and the IDF score
    wordtoIdfDict = {} # mapping of words like 'window', 'chat', 'internet' and their IDF scores like 0.4291, 0.5971, 1.1868 ...
    for i,j in zip(wordToIndexDict,idfDict):
        wordtoIdfDict[wordToIndexDict[i]] = idfDict[j]
        
    # Cut-off of TFIDF is 50 percentile-
    cutoff_tfidf = np.percentile(pd.Series(idfModel.idf),50)
    
    # selecting the High Info Word-
    highInfoWords = []
    for i in wordtoIdfDict:
        if wordtoIdfDict[i] >= cutoff_tfidf:
            highInfoWords.append(i)
            
    del idfModel, cvModel, wordToIndexDict, idfDict, wordtoIdfDict, tagged_df,cv,preTfIdfSchema
    
    return highInfoWords

highInfoWords = fnTfidf(cc_df)
print('---> Done TFIDF !!!')
endtime = datetime.now()
timeElapsed = (endtime-starttime).total_seconds()
print('Time Taken : ' + str(int(timeElapsed)/60.0) + ' min')

### QCR : Query to Complaint Ratio

## Function will return a list of tuples for each line in the RDD. 
## Where each tuple will be of the form:
##(noun/adj i.e. a token,(complaint_count, query_count, total_count)) 

def nounCategorizer(line, highInfoWords):    
    triplesList =[] 
    for word in line[1]:
        if word in highInfoWords:  # only select the tokens which are part of the High Info Words corpus
            if line[3] == 'complaint':
                triplesList.append((word, (1, 0, 1)))
            else:
                triplesList.append((word, (0, 1, 1)))
        else:
            pass
            
    return triplesList

def qcr(line):
    ratio = line[1][1]/ float(line[1][0]+1) # no. of query (25) / (no. of complaint(1) + 1) 
    if ratio > 1:
        return float(np.log(ratio))  # for query.. the value will be higher
    else:
        return float(-np.log(1/(ratio+0.01)))  # for complaint.. the value will be lower

## Function to score each sentence with qcr
def qcrCompoundScorer(sentence,searchDict): # inputs : for a given chat : tokens , (word: QCR_score)
    # extracting the score for each token which are present in the search dictionary
    score = [searchDict[x.lower()] for x in sentence if searchDict.has_key(x.lower())]
    if score:
        normScore = np.sum(score)/len(score) # lower the normScore, more the complaint it is.
        return float(normScore)
    else:
        return 9999
    
def normScore(x, mx, mn):
    return (x - mn) / (mx - mn)

def fnQcrScoring(cc_df, highInfoWords):
    '''1st O/p after nounCategorizer(): It's a list of list
    [[(u'area', (0, 1, 1)), (u'problem', (0, 1, 1))],
 [(u'weeks', (0, 1, 1)), (u'today', (0, 1, 1)), (u'support', (0, 1, 1))]]'''
    
    '''2nd O/p: flapMap flattens the list as one list
    Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
    [(u'area', (0, 1, 1)),
 (u'problem', (0, 1, 1)),
 (u'weeks', (0, 1, 1)),
 (u'today', (0, 1, 1))]'''
    
    '''3rd O/p: Aggregate the values of each key, using given combine functions and a neutral "zero value".
    [(u'code', (1, 25, 26)),
 (u'money', (2, 32, 34)),
 (u'move', (1, 10, 11)),
 (u'questions', (1, 25, 26))]'''

    nounCounts_rdd = cc_df \
                        .map(lambda x: nounCategorizer(x, highInfoWords)) \
                        .flatMap(lambda x: x) \
                        .aggregateByKey(zeroValue = (0, 0, 0), \
                                   seqFunc = (lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2])), \
                                   combFunc = (lambda rdd1, rdd2: (rdd1[0] + rdd2[0], rdd1[1] + rdd2[1], rdd1[2] + rdd2[2])))
                
    qcr_rdd = nounCounts_rdd.map(lambda x: (x[0],qcr(x),x[1][2])) 
    # [(u'code', 2.5257286443082556, 26), (u'money', 2.367123614131617, 34)]
    
    ## Re-scoring the chats based on the QCR weights of the words in our search list
    ## collecting the words into a dictionary
    
    searchDict = qcr_rdd \
                .map(lambda x: (x[0],x[1])) \
                .collectAsMap()
    '''{u'access': 2.0794415416798357,
 u'acct': 1.6739764335716716,
 u'agent': 1.0986122886681098, ... }'''
    
    # Only take the chats where atleast one token matches with the high info words
    mostNegativeChats = cc_df \
                        .filter(lambda x: len(set(x[1]).intersection(set(highInfoWords))) >= 1) \
                        .map(lambda x: (x[0], x[1], x[2], qcrCompoundScorer(x[1],searchDict)))
    
    mostNegativeChats_df = mostNegativeChats.toDF()
    mx = mostNegativeChats_df.select("_4").rdd.max()[0]
    mn = mostNegativeChats_df.select("_4").rdd.min()[0]
    mostNegativeChats = mostNegativeChats.map(lambda x : (x[0], x[1], x[2], normScore(x[3], mx, mn)))
    
    del qcr_rdd,cc_df,nounCounts_rdd
    
    return mostNegativeChats # note it contains both negative and positive sentiments... it's incorrectly named

 # returns (chatid, chat tokens which are present in the high info words, category and  QCR compound score)

mostNegativeChats = fnQcrScoring(cc_df,highInfoWords)
print('---> Done QCR Computation !!!')
print(mostNegativeChats.count())
endtime = datetime.now()
timeElapsed = (endtime-starttime).total_seconds()
print('Time Taken : ' + str(int(timeElapsed)/60.0) + ' min')


### Topic Extraction

from pyspark.ml.clustering import LDA, LocalLDAModel

def trainLDA(df, inputCol, k, maxIter=5,seed=0):
        lda = LDA(k=k, maxIter=maxIter, featuresCol=inputCol).setSeed(seed)
        return lda.fit(df.select(inputCol))
    
def pred_topic(x):
    return int(np.argmax(x))

def fnGetWords(indexList):
    wordsList = []
    for i in indexList:
        wordsList.append(vocabDict[i])
    return wordsList

def fnGoodWords(line,highInfoWords):
    wordList = []
    for word in line[1]:
        if word in highInfoWords:
            wordList.append(word)
    return wordList

def fnCombWord(line):
    combSentence = ''
    for word in line[1]:
        combSentence += ' '+word
    return strip(combSentence)

def fnLDAModel(mostNegativeChats, highInfoWords, folderToWrite, train):
    mostNegativeChats = mostNegativeChats \
                                .map(lambda x: (x[0], fnGoodWords(x,highInfoWords), x[2], x[3]))
        
    preCVSchema = StructType([StructField(name = 'chatID', dataType=StringType()),
                              StructField(name = 'tokens', dataType=ArrayType(elementType=StringType())),
                              StructField(name = 'chatcategory', dataType=StringType()),
                              StructField(name = 'avgQCRscore', dataType=FloatType()),])
    
    mostNegativeChats = mostNegativeChats.toDF(schema=preCVSchema)
    
    ## CountVectorizer
    cv = CountVectorizer(inputCol="tokens", outputCol="countVec", vocabSize=100000, minDF=10)
    cvModel = cv.fit(mostNegativeChats)
    mostNegativeChats = cvModel.transform(mostNegativeChats)
    
    '''
    [Row(chatID=u'e723769821540362226', tokens=[u'area', u'problem'], chatcategory=u'Cat1', avgQCRscore=1.2384692430496216, countVec=SparseVector(203, {14: 1.0, 26: 1.0})),
 Row(chatID=u'e723769822320682275', tokens=[u'weeks', u'today', u'support'], chatcategory=u'Cat1', avgQCRscore=1.3547954559326172, countVec=SparseVector(203, {5: 1.0, 133: 1.0, 147: 1.0}))]
    '''
    if train == 'optimize':
        ### The wrapper algorithm to build the model. Auto Identify the number of topics
        k_best = 2
        k_next = k_best
        cutOff_counter = 0
        perplexity = None

        while cutOff_counter < 1:
            lda_model = trainLDA(df=mostNegativeChats, inputCol='countVec',k=k_next)
            current_perplexity = lda_model.logPerplexity(mostNegativeChats.select('countVec'))

            if perplexity == None:
                perplexity = current_perplexity
                print "--> k_best=%d, k_curr=%d, cutoff_counter=%d, k_next_perp=%.4f, k_best_perp=%.4f" \
                    %(k_best,k_next,cutOff_counter,current_perplexity, perplexity)

            else:
                improvement = (current_perplexity - perplexity) / (perplexity *1.0)

                if improvement >= -0.0099:
                    cutOff_counter += 1

                else:
                    perplexity = current_perplexity
                    cutOff_counter = 0
                    k_best = k_next


                print "--> k_best=%d, k_curr=%d, cutoff_counter=%d, improvement%%=%.2f, k_next_perp=%.4f, k_best_perp=%.4f" \
                                %(k_best,k_next,cutOff_counter,improvement,current_perplexity, perplexity)
            k_next += 1
        lda_model = trainLDA(df=mostNegativeChats, inputCol='countVec', k=k_best, maxIter=10,seed=0)
        print('---> Model Built...!!!')

    elif train == 'BuildOnce':
        k_best = 5
        lda_model = trainLDA(df=mostNegativeChats, inputCol='countVec', k=k_best, maxIter=5,seed=0)
        print('---> Model Built...!!!')

    else:
        lda_model = LocalLDAModel.load('/apps/opt/applicaitons/datasets/chatAnalysis/ConsolidatedCode_V1/unsup_input_10k2017_04_11_2_50/lda_model/')
        k_best = 5
        print('---> Model picked up from disk...!!!')
            
    transformed = lda_model.transform(mostNegativeChats)
    transformed = transformed\
                        .rdd \
                        .map(lambda x: (x[0], fnCombWord(x), x[2], x[3], pred_topic(x[5]))) \
                        .toDF(schema=StructType([
                                        StructField("chatID", StringType()),
                                        StructField("sentence", StringType()),
                                        StructField("chatcategory", StringType()),
                                        StructField("avgQCRscore", FloatType()),
                                        StructField("predictedTopic", IntegerType())]))
        
        
    lda_model.write().overwrite().save(folderToWrite +'/lda_model')
    
    return k_best, transformed

outputFolder = "/apps/opt/applicaitons/datasets/NLP/SuvroChatAnalysis"
fileName = 'SUVRO'
folderToWrite = outputFolder + '/'+ fileName + str(datetime.now().date())+'_'+ str(datetime.now().hour)+'_'+str(datetime.now().minute)+'/'
k_best, transformed = fnLDAModel(mostNegativeChats, highInfoWords, folderToWrite='testfolder',train = 'BuildOnce')
print('---> Done Topic Extraction. ' + str(k_best) + ' topics identified !!! ')
print(transformed.count())
endtime = datetime.now()
timeElapsed = (endtime-starttime).total_seconds()
print('Time Taken : ' + str(int(timeElapsed)/60.0) + ' min')

### Trigram Extraction

import os, shutil, string, re, json
from collections import Counter
from nltk.util import ngrams

def remMiddleSpace(x):
    strToken = x.split(' ')
    tokLen = len(strToken)
    if tokLen > 1:
        return strToken[0]+strToken[1] # a short cut
    else:
        return x     
    
def remNone(val):
    return unicode(val) or u''


def get_ngrams(text, min = 1, max = 4):
    s = []
    for n in range(min, max):
        for ngram in ngrams(text, n):
            s.append('_'.join(str(i) for i in ngram))
    return s

def fnReturnQueryString(quantileCuts,predictedTopic):
    quantileCount = 0
    queryStrings = []
    for j in range(len(quantileCuts)+1):
        if quantileCount == 0:
            queryString = 'predictedTopic == {} and avgQCRscore <= {:.20f}'.format(predictedTopic,quantileCuts[0])
            quantileCount += 1
            queryStrings.append(queryString)
        elif quantileCount == len(quantileCuts):
            queryString = 'predictedTopic == {} and avgQCRscore >  {:.20f}'.format(predictedTopic,quantileCuts[len(quantileCuts)-1])
            quantileCount += 1
            queryStrings.append(queryString)
        else:
            queryString = 'predictedTopic == {} and avgQCRscore > {:.20f} and avgQCRscore <= {:.20f}'.format(predictedTopic,quantileCuts[quantileCount-1],quantileCuts[quantileCount])
            quantileCount += 1
            queryStrings.append(queryString)
    return(queryStrings)

def fnGetWordCounts(transformed,queryString):
    '''Explanation of code:
        1.retain chats for a predicted topic and quantile cut-off value
        2.convert df to RDD
        3.create a paired RDD
        4.concatenate all chats into one giant string
        5.find the top 50 trigrams
        6.collect all trigrams into one list
        7.collect all the words in the trigram into one list
        8.create another key-value RDD of the type (word,1)
        9.calculate word count
        10.sort in descending order
        11.show values
       wordCounts = transformed.\
                        filter(queryString).\
                        select('chatID','sentence','chatcategory', 'avgQCRscore').\
                        rdd.\
                        map(lambda x: (x[1],x[2])).\
                        map(lambda x: (get_ngrams(str(x[0]).split(" "),2), x[1])).\
                        map(lambda x : (1,(' '.join(x[0]),x[1]))).\
                        aggregateByKey(zeroValue = ('', ''), seqFunc = (lambda x,y: (x[0] + ' ' + y[0], x[1] + ' ' + y[1])) , \
                                       combFunc = (lambda x,y: (x[0] + ' ' + y[0], x[1] + ' ' + y[1]))).\
                        collect()''' 

    # Only the words (tokens)
    #wordCounts = transformed.\
    #                    filter(queryString).\
    #                    select('chatID','sentence','chatcategory', 'avgQCRscore').\
    #                    rdd.\
    #                    map(lambda x : (1,(remNone(x[1]),remMiddleSpace(remNone(x[2]))))).\
    #                    aggregateByKey(zeroValue = ('', ''), seqFunc = (lambda x,y: (x[0] + ' ' + y[0], x[1] + ' ' + y[1])) , \
    #                                   combFunc = (lambda x,y: (x[0] + ' ' + y[0], x[1] + ' ' + y[1]))).collect()
                        
    # The n-grams
    wordCounts = transformed.\
                        filter(queryString).\
                        select('chatID','sentence','chatcategory', 'avgQCRscore').\
                        rdd.\
                        map(lambda x: (remNone(x[1]),remMiddleSpace(remNone(x[2])))).\
                        map(lambda x: (get_ngrams(str(x[0]).split(" ")), x[1])).\
                        map(lambda x : (1,(' '.join(x[0]),x[1]))).\
                        aggregateByKey(zeroValue = ('', ''), seqFunc = (lambda x,y: (x[0] + ' ' + y[0], x[1] + ' ' + y[1])) , \
                                       combFunc = (lambda x,y: (x[0] + ' ' + y[0], x[1] + ' ' + y[1]))).collect()

    # a tuple of word counts and category counts
    wordCounts = Counter(wordCounts[0][1][0].split(' ')), Counter(wordCounts[0][1][1].split(' '))
    return(wordCounts)

def fnQuantileCuts(topic,transformed):
    quantileCuts = transformed.\
                    filter('predictedTopic == {}'.format(topic)).\
                    approxQuantile(col='avgQCRscore',probabilities=[0.5],relativeError=0.01)
    return(quantileCuts)

# 'predictedTopic == {}' : filters predictedTopic == 0 , then 1 ...
# Calculates the approximate quantiles of a numerical column of a DataFrame. It gives a numeric value, in our case it is 
# 50% percentile i.e. the median

def fnExtractTrigrams(spark, k_best, folderToWrite, transformed, fileName):
    # Reading the original data-
    transformed.coalesce(1).write.csv(path=folderToWrite + 'outputData/',header = True)
    # # Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions.
    
    ### Creating Folders and Directories
    print('---> Creating Folders and Directories')
    os.chdir(folderToWrite) # change directory 
    
    if os.path.isdir('topic_trigram'):
        shutil.rmtree('topic_trigram') # deletes the old directory structure
        os.mkdir('topic_trigram') # creates a directory
        os.chdir(folderToWrite + 'topic_trigram/') # change directory
    else:
        os.mkdir('topic_trigram')
        os.chdir(folderToWrite + 'topic_trigram/')
        
    for i in range(k_best): # looping over all the topics
        if os.path.isdir('topic'+str(i)):
            shutil.rmtree('topic'+str(i))
            os.mkdir('topic'+str(i))
            os.chdir(folderToWrite + 'topic_trigram/'+'topic'+str(i))
            
        else:
            os.mkdir('topic'+str(i))
            os.chdir(folderToWrite + 'topic_trigram/'+'topic'+str(i))
            
        quantileCuts  = fnQuantileCuts(i,transformed)
        
        for j in range(2):
            with open('wordCounts_' + str(j) + '.json', 'w') as fp:
                json.dump(fnGetWordCounts(transformed,fnReturnQueryString(quantileCuts, predictedTopic=i)[j]),fp)
                
        os.chdir(folderToWrite + 'topic_trigram/')
        
    print('---> Files written to '+ folderToWrite )
    
fnExtractTrigrams(spark, k_best, folderToWrite, transformed,fileName)
endtime = datetime.now()
timeElapsed = (endtime-starttime).total_seconds()
print('Time Taken : ' + str(int(timeElapsed/60.0)) + ' mins')

### Topic Distribution

import glob
import operator

for name in glob.glob('*/wordCounts_*.json'):
    print name
    
def find_grams(x):
    num_grams = len(x.split('_'))
    if num_grams == 1:
        return 1
    elif num_grams == 2:
        return 2
    elif num_grams == 3:
        return 3
    else:
        return 100

fullData = pd.DataFrame()
for name in glob.glob('*/wordCounts_*.json'):
    with open(name) as json_data: 
        data = json.load(json_data)
        data0 = data[0] # word counts
        data1 = data[1] # category counts
    
    data0 = dict((k, v) for k, v in data0.items() if k) # # removing the null (key)
    ### Unpacking the categories and adding it to the data frame    
    data1 = dict((k, v) for k, v in data1.items() if k) # removing the null (key) from the category dict
    sorted_token_dict = sorted(data1.items(), key=operator.itemgetter(1), reverse = True)[0:5] 
    # sorting dict by values and taking the top 5
    name_hmap, values = zip(*sorted_token_dict) # unpacking the tuple
    
    data = pd.DataFrame.from_dict(data0, orient='index').reset_index()  # df of the word counts
    
    data.columns = ['ChatKeyWords','count'] 
    data['topic'] = str(name).split('/')[0]
    data['Quantile'] = re.findall(r'\d+', name)[1]

    for i in range(len(name_hmap)):
        us_cat = 'PageCategory_'+str(i+1)
        data[us_cat] = name_hmap[i]

    data = data.sort_values(by='count', ascending=False)
    fullData = fullData.append(data)
    
fullData['n_grams'] = fullData['ChatKeyWords'].apply(lambda x: find_grams(x))

#cc_map_schema = StructType([StructField(name = 'chatID', dataType=StringType()),
#                              StructField(name = 'chatContent', dataType=StringType()),
#                              StructField(name = 'chatcategory', dataType=StringType()),
#                              StructField(name = 'ErrorCode', dataType=StringType()),])

#cc_map_df = cc_map_df.toDF(schema=cc_map_schema)
#cc_map_df = cc_map_df.select('chatID', 'chatContent')
#chat_topic_df = cc_map_df.join(transformed, on='chatID', how='inner')
#final_chat_df = chat_topic_df.select('chatID', 'chatcategory', 'avgQCRscore', 'predictedTopic')
# final_chat_df.write.csv('Suvro_final_chat_score.csv')

fullData.to_csv('Suvro_fullOutput_ngrams.csv',index=False)
mostNegativeChats.toDF().toPandas().to_csv('Suvro_chat_final_score.csv')

mostNegativeChats.toDF().toPandas().to_csv('Suvro_chat_final_score.csv')