# EventsExport
This notebook look inside all the events sequences that appeair often. This is then saved into a json file for plotting.

In [None]:
%matplotlib inline
from pyspark import SparkContext, SparkConf, StorageLevel
from pyspark.sql import HiveContext, Row
from pyspark.sql.types import *
from datetime import datetime, date
import calendar
import time, logging
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib.patches as patches
import matplotlib.path as path
import operator
import json

In [None]:
hdfs_churn = '/user/hadoop/churn_project' # Shared folder for storing Churn files
hdfs_churn_sas = hdfs_churn + '/sas_data'
period = '2015_04'
tablePath = "/eventsMonthly" # relative path # events_repart # eventsRedif

In [None]:
sc.setCheckpointDir('checkpointSpark')

In [None]:
from pylab import rcParams
rcParams['figure.figsize'] = 10, 6

# Functions

In [None]:
#Loading Event Explanation for Converting Letter, for displaying 
cols=["Conc", "Letter", "MDLevent", "Refined Names"]
EventsDef=pd.read_csv('../EventDefExplanation.csv',delimiter=';',skiprows=1,names=cols)

print len(EventsDef)

conversionDict = {}
for i in range(len(EventsDef)):
    conversionDict[EventsDef["Letter"][i]]=EventsDef["Refined Names"][i]

#Add Churning Event
conversionDict["CE"] = "ChurningEvent"

In [None]:
Segments = ['Cluster2','Cluster6','Cluster10','Cluster12','Cluster4']
def filterSegment(col):
    cond = ""
    for s in Segments:
        cond = cond + col + " = '" + s + "' or "
    return cond[:-3]

# Loading the Event

In [None]:
# Loading Parquet file 
start = time.time()
eventsParquet = sqlContext.read.parquet(hdfs_churn+tablePath)
#if not useVal: #Then filder Sample=Training ...
#eventsParquet = eventsParquet.filter("SEGMENT = 'Cluster10'")
#eventsParquet = eventsParquet.filter(filterSegment('SEGMENT')) # Filter only the segments we need
#eventsParquet = eventsParquet.filter("period='"+period+"'")
eventsParquet = eventsParquet.filter("Churn=1")
eventsParquet.persist(StorageLevel.MEMORY_ONLY)
eventsParquet.registerTempTable("events")
print eventsParquet.printSchema()
eventsParquet = eventsParquet.sort(["period", "customer_number", "datetime", "letter"]) # Be careful there is an order sort the event def
print eventsParquet.take(1)

print time.time() - start

## Event definition

In [None]:
query_events = """
SELECT
    period,
    customer_number,
    letter as event,
    Churn as churn,
    SEGMENT as segment
FROM events a
ORDER BY period, customer_number, datetime, letter
"""
# Should be 284 different event

eventsForPatterns = sqlContext.sql(query_events)
eventsForPatterns.printSchema()

In [None]:
# cache rdd of events
events = eventsForPatterns.rdd.map(lambda (per, cn, e, f, s): ((cn, (f,s), per), e))
print eventsForPatterns.take(1)
# events is now in the right form for computing ngrams; key: (CustomerNumber, Flag) value: Event
events.persist(StorageLevel.MEMORY_AND_DISK)

# Compute variables and histograms on Events
Variables are required for computing patterns

In [None]:
# Compute number of customer
print events.take(1)
customerDistinct = events.map(lambda ((cn, fs, per), e): (cn,fs)).distinct()
nbrCustomer = customerDistinct.count()
nbrChurningCustomer = customerDistinct.filter(lambda (cn, (f,s)): f==1).count()
nbrNonChurningCustomer = nbrCustomer-nbrChurningCustomer
print "Number of Total customer in the Event table:       " + str(nbrCustomer)
print "Number of Churning customer in the Event table:    " + str(nbrChurningCustomer)
print "Number of NonChurning customer in the Event table: " + str(nbrNonChurningCustomer)

 # Compute patterns (Prepare events list for each customer)

In [None]:
# Hybrid way for computing patterns
eventsWithIndex = events.zipWithIndex() \
                        .map(lambda ((customer, event),index): (customer, (event, index)))
eventsGrouped = eventsWithIndex.groupByKey()
#eventsGrouped.persist(StorageLevel.MEMORY_AND_DISK)
    
# Function to convert Iterable to List, in the RIGHT order
def convIterable(it):
    lst = list(it)
    
    #Order the list
    def getIndex(item):
        return item[1]
    lst = sorted(lst, key=getIndex)
    
    #Remove index
    lst = [row[0] for row in lst]
    
    return tuple(lst) # Convertion to tuple because it needs to be hashable

# Function to add Churning event at the end of the sequence of Churning customers
def addChurningEvent(eventList, flag):
    if flag == 1:
        return eventList + (u"CE", )
    else:
        return eventList

eventsGroupedList = eventsGrouped.mapValues(lambda it: convIterable(it))
eventsGroupedList = eventsGroupedList.map(lambda ((cn, (f,s), per), eventList): ((cn, (f,s), per), addChurningEvent(eventList,f)))
eventsGroupedList.persist(StorageLevel.MEMORY_AND_DISK)

In [None]:
eventsGroupedList.take(1)

# Look for patterns

In [None]:
eventsGroupedList.count()

In [None]:
def createLinks(eventsList):
    linksList = []
    if len(eventsList) < 2:
        return linksList
    
    source = eventsList[0]
    first = True
    i = 0
    for event in eventsList:
        if first:
            first = False
            continue
        i = i + 1
        
        depth = len(eventsList) - i - 1
        
        target = event
        linksList.append((source, target, depth))
        source = target
        
    return linksList
        
eventsLinksList = eventsGroupedList.mapValues(lambda events: createLinks(events))

In [None]:
eventsLinksList.take(1)

In [None]:
def f(x): return x
eventsLinksListFlat = eventsLinksList.flatMapValues(f)
eventsLinksListFlat.take(1)

In [None]:
linksRdd = eventsLinksListFlat.map(lambda ((cn, (f, seg), per), link): ((seg, link), 1))
linksRdd.take(1)

In [None]:
linksReduced = linksRdd.reduceByKey(lambda a,b: a+b)

In [None]:
linksReduced.cache()
print linksReduced.take(1)
print Segments

## For each segment run from here

In [None]:
Segment = "Cluster4"

if Segment == "AllCluster":
    linksReducedSegment = linksReduced.map(lambda ((seg, link), nbr): (link, nbr)).reduceByKey(lambda a,b: a+b)
else:
    linksReducedSegment = linksReduced.filter(lambda ((seg, link), nbr): seg == Segment).map(lambda ((seg, link), nbr): (link, nbr))

In [None]:
linksCollect = linksReducedSegment.sortBy(lambda (k,v): v, False).collect()

# Filter and formating

In [None]:
maxDeep = 7

In [None]:
# Max/min value, based on the maximun number of links
maxLink = maxDeep * 30

maxLink = min(maxLink, len(linksCollect)-1)
minValue = linksCollect[maxLink][1]
print maxLink

In [None]:
print "Number linksCollect: " + str(len(linksCollect))

links = []
eventDict = {}
for ((source, target, depth), value) in linksCollect:
    if value < minValue:
        continue
    if depth > maxDeep:
        continue
        
    sourceD = (source, depth+1)
    targetD = (target, depth)
        
    if sourceD not in eventDict:
        eventDict[sourceD] = len(eventDict)
    if targetD not in eventDict:
        eventDict[targetD] = len(eventDict)
    
    #Source ad Target switched for chronology, not required here
    links.append({'source': eventDict[sourceD], 'target': eventDict[targetD], 'value': value})
    
print "Number links: " + str(len(links))

In [None]:
eventDictSorted = sorted(eventDict.items(), key=operator.itemgetter(1))

In [None]:
nodes = []
for (event, depth), i in eventDictSorted:
    nodes.append({'name': conversionDict[event], 'id': str(i), 'depth': depth})

In [None]:
nodes

# Save

In [None]:
print "Number of nodes: " + str(len(nodes))
print "Number of links: " + str(len(links))

In [None]:
finalJson = {}
finalJson['nodes'] = nodes
finalJson['links'] = links

with open('input/AllEvents' + Segment + '.json', 'w') as outfile:
    json.dump(finalJson, outfile)

In [None]:
print Segment