# CHAPTER 11. Smarter Email Marketing with the Markov Model

구매이력을 기반으로 해서 고객 대상의 "스마트한 이메일 마켓팅"할 날짜를 예측해보자.

고객의 마지막 구매 날짜가 랜덤(설명) 변수로 주어지고, Markov Model은 이전 구매이력 데이터를 가지고 구매 날짜에 대한 분포를 구함.

## Markov Chains에 대한 설명

> 유한한 상태들의 집합을  \\(  S = ( S_1, S_2, S_3, ... )  \\)  라고 정의

> 우리는 다음과 같이 확률을 구할 수 있음.

![](chap11_01.jpg)

> Markov’s first-order 가정은 다음과 같음.

> t + 1 시간의 상태는 오직 t 시간의 상태에 의존함.

![](chap11_02.jpg)

> Markov’s second-order 가정은 다음과 같음.

![](chap11_03.jpg)

> 이것을 계속 확장하여  Markov 가정을  결합확률로 표현할 수 있음.

![](chap11_04.jpg)

> 예를 들어서 설명하면,  4가지 날씨 상태( sunny, cloudy, rainy, foggy )가 있다고 가정하고, 날씨패턴은 아래와 같음.

![](chap11_05.jpg)

> 오늘 날씨가 sunny가 주어진다면, 내일은 cloudy 이고 그 다음날은 foggy일 확률은 ?

![](chap11_06.jpg)


> 오늘 날씨가 foggy가 주어진다면, 오늘부터 이틀후에 rainy일 확률은?

![](chap11_07.jpg)

이번장의 목표는  모델( = transition table )을 만드는 것임.

## Spark Solution 

### Input Format

```
<customerID><,><transactionID><,><purchaseDate><,><amount>


V31E55G4FI,1381872898,2013-01-01,123
301UNH7I2F,1381872899,2013-01-01,148
PP2KVIR4LD,1381872900,2013-01-01,163
AC57MM3WNV,1381872901,2013-01-01,188
BN020INHUM,1381872902,2013-01-01,116
UP8R2SOR77,1381872903,2013-01-01,183
VD91210MGH,1381872904,2013-01-01,204
COI4OXHET1,1381872905,2013-01-01,78
76S34ZE89C,1381872906,2013-01-01,105
6K3SNF2EG1,1381872907,2013-01-01,214
```

### High-Level Steps

1) 입력 파라메터에 대한 처리 :  구매이력 데이터 파일의 경로를 입력값으로 필요하지만 여기서는 하드 코딩함.

2) Spark Context을 만들고 입력데이터로부터 RDD < String > 을 생성

3) RDD < String > 을 RDD < K, V > 로 변환.  K는 customerID,  V는 tuple < purchaseData, amount > 임.

4) customerID을 기준으로 구매이력을 그룹핑하며 RDD < K2 , V2 >결과로 나옴.  K2는 customerID,  V2는 Iterable < Tuple < purchaseDate, Amount > > 임.

5) Markov state sequence을 만듬 :  State1, State2, ..., StateN 
   - mapValues()으로 customerID에 해당하는 V2 값들을 가지고 와서 변환 작업을 수행
   
6) Markov state transition을 생성
  - Input :  PairRDD < K4, V4 > pairs
  - Output : 
![](chap11_08.jpg)

7) 최종 결과를 저장

## Spark Program

smart_email_training.txt : https://github.com/mahmoudparsian/data-algorithms-book/tree/master/src/main/java/org/dataalgorithms/chap11/resources

In [9]:
% ls -l  smart_email_training*

-rw-r--r-- 1 jovyan users 30921789 Nov 11  2016 smart_email_training.txt
-rw-r--r-- 1 jovyan users  9006004 Nov 11 02:27 smart_email_training.zip


#### Step 2: Create Spark context object and convert Input into RDD

In [10]:
from pyspark import SparkContext
sc = SparkContext() 
sc

records = sc.textFile('smart_email_training.txt', 1);

In [12]:
records.take( 10 )

[u'V31E55G4FI,1381872898,2013-01-01,123',
 u'301UNH7I2F,1381872899,2013-01-01,148',
 u'PP2KVIR4LD,1381872900,2013-01-01,163',
 u'AC57MM3WNV,1381872901,2013-01-01,188',
 u'BN020INHUM,1381872902,2013-01-01,116',
 u'UP8R2SOR77,1381872903,2013-01-01,183',
 u'VD91210MGH,1381872904,2013-01-01,204',
 u'COI4OXHET1,1381872905,2013-01-01,78',
 u'76S34ZE89C,1381872906,2013-01-01,105',
 u'6K3SNF2EG1,1381872907,2013-01-01,214']

### Step 3: Convert RDD into JavaPairRDD

In [54]:
import datetime
import time


def makeCustomerIdPurchaseData( rec ) :
    tokens = rec.split(",")
    if len( tokens ) != 4 :
        return None
    
    # tokens[0] = customer-id
    # tokens[1] = transaction-id
    # tokens[2] = purchase-date
    # tokens[3] = amount
    pdateStr =  datetime.datetime.strptime(tokens[2], "%Y-%m-%d")
    pdate = long( time.mktime(pdateStr.timetuple()) )
    
    amount = int( tokens[3] )
    V = (pdate, amount)    
    return (tokens[0], V) 
    
makeCustomerIdPurchaseData( u'V31E55G4FI,1381872898,2013-01-01,123' )

(u'V31E55G4FI', (1356998400L, 123))

In [55]:
kv = records.map( makeCustomerIdPurchaseData  ) 

In [56]:
kv.take(10)

[(u'V31E55G4FI', (1356998400L, 123)),
 (u'301UNH7I2F', (1356998400L, 148)),
 (u'PP2KVIR4LD', (1356998400L, 163)),
 (u'AC57MM3WNV', (1356998400L, 188)),
 (u'BN020INHUM', (1356998400L, 116)),
 (u'UP8R2SOR77', (1356998400L, 183)),
 (u'VD91210MGH', (1356998400L, 204)),
 (u'COI4OXHET1', (1356998400L, 78)),
 (u'76S34ZE89C', (1356998400L, 105)),
 (u'6K3SNF2EG1', (1356998400L, 214))]

### Step 4: Group transactions by customerID

In [57]:
customerRDD = kv.groupByKey();

In [70]:
for t3 in customerRDD.take(5) :
    print "KEY={}, VALUE=[{}]\n".format( t3[0],  " ".join([str(x) for x in t3[1]] )   )

KEY=LGXR5X2HCM, VALUE=[(1359849600L, 131) (1364169600L, 54) (1364774400L, 28) (1366070400L, 45) (1367020800L, 29) (1368921600L, 50) (1371686400L, 84)]

KEY=6KS0MMBMBR, VALUE=[(1358294400L, 49) (1359158400L, 26) (1359590400L, 51) (1360713600L, 28) (1361577600L, 44) (1363392000L, 26) (1364256000L, 53) (1365120000L, 30) (1368489600L, 92) (1369872000L, 31) (1372550400L, 101) (1372550400L, 27) (1373760000L, 52) (1374019200L, 30) (1374451200L, 47) (1374537600L, 25)]

KEY=6RTSY4IUVS, VALUE=[(1358294400L, 75) (1360454400L, 30) (1360627200L, 47) (1361750400L, 33) (1369526400L, 172) (1369699200L, 26) (1370217600L, 50) (1370649600L, 27) (1370649600L, 53) (1371772800L, 26) (1374624000L, 95)]

KEY=41W427090N, VALUE=[(1357776000L, 180) (1359158400L, 33) (1360108800L, 51) (1361404800L, 27) (1361664000L, 52) (1362614400L, 34) (1362960000L, 57) (1363564800L, 30) (1365552000L, 42) (1365724800L, 25) (1369440000L, 91) (1369958400L, 32) (1370217600L, 40) (1373155200L, 81) (1373241600L, 27)]

KEY=KJWKMCKLRI

### Step 5: Create a Markov state sequence

In [161]:
def toStateSequence( mlist ) :
    if( len(mlist) < 2 ) :
        return None;
    
    stateSequence = [] ## LIST형
    prior = mlist[ 0 ]
    for idx in range( 1 , len(mlist) ) :
        current = mlist[idx]
        priorDate = prior[0]
        date = current[0]
        ## one day = 24*60*60*1000 = 86400000 milliseconds
        daysDiff = (date - priorDate) / 86400000; 
        priorAmount = prior[1]
        amount = current[1]
        amountDiff = amount - priorAmount;
        dd = "";
        if daysDiff < 30  :
            dd = "S"
        elif  daysDiff < 30  :
            dd = "M"
        else :
            dd = "L"
    
        ad = "";
        if priorAmount < 0.9 * amount  :
            ad = "L"
        elif  priorAmount < 1.9 * amount  :
            ad = "E"
        else :
            ad = "G"
            
        element = dd + ad
        stateSequence.append( element )
        prior = current
        
    return stateSequence

In [162]:
def getKey(item) :
    return item[0]

In [163]:
def genStateSeq( dateAndAmount ) :
    stateSequence = [] ## LIST형
    
    for v in dateAndAmount :
        stateSequence.append( v )

    sorted( stateSequence, key=getKey )
    
    retVal = toStateSequence( stateSequence )
    
    return retVal

In [164]:
stateSequence = customerRDD.mapValues( genStateSeq )

In [165]:
stateSequence.take(3)

[(u'LGXR5X2HCM', ['SG', 'SG', 'SL', 'SE', 'SL', 'SL']),
 (u'6KS0MMBMBR',
  ['SE',
   'SL',
   'SE',
   'SL',
   'SE',
   'SL',
   'SE',
   'SL',
   'SG',
   'SL',
   'SG',
   'SL',
   'SE',
   'SL',
   'SE']),
 (u'6RTSY4IUVS', ['SG', 'SL', 'SE', 'SL', 'SG', 'SL', 'SE', 'SL', 'SG', 'SL'])]

### Step 6: Generate a Markov state transition matrix

In [166]:
def genTransitionMatrix( s ) :
    mapperOutput = [] ## LIST형
    
    print s[0]
    print s[1]
    #print s[2]
    
    states = s[1]
    if ( states == None or len(states) < 2 ) :
        return mapperOutput
    
    prevState = None
    for state in states :
        if( prevState == None ) : 
            prevState = state
            continue
        
        fromState = prevState
        toState = state
        k = (prevState, state)
        mapperOutput.append( (k, 1)  )
        prevState = state
    
    return mapperOutput

In [167]:
model = stateSequence.flatMap( genTransitionMatrix )

In [168]:
model.take(5)

[(('SG', 'SG'), 1),
 (('SG', 'SL'), 1),
 (('SL', 'SE'), 1),
 (('SE', 'SL'), 1),
 (('SL', 'SL'), 1)]

## Combine/reduce frequent (fromState, toState)

In [170]:
markovModel = model.reduceByKey( lambda i1, i2 :  i1 + i2 )

In [171]:
markovModel.take( 10 )

[(('SL', 'SE'), 153189),
 (('SG', 'SE'), 4364),
 (('SL', 'SG'), 130437),
 (('SG', 'SL'), 179702),
 (('SG', 'SG'), 6530),
 (('SE', 'SL'), 140483),
 (('SL', 'SL'), 46608),
 (('SE', 'SG'), 8611),
 (('SE', 'SE'), 2381)]

### Step 7: Emit final output

```
<fromState><,><toState><TAB><frequency-count>
```

In [175]:
def formatted( t ) :
    retVal = "%s,%s\t%d" %( t[0][0], t[0][1], t[1] )
    return retVal

In [176]:
markovModelFormatted = markovModel.map( formatted )

In [177]:
markovModelFormatted.take(10)

['SL,SE\t153189',
 'SG,SE\t4364',
 'SL,SG\t130437',
 'SG,SL\t179702',
 'SG,SG\t6530',
 'SE,SL\t140483',
 'SL,SL\t46608',
 'SE,SG\t8611',
 'SE,SE\t2381']

https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap11/statemodel/StateTransitionTableBuilder.java


```
/**
 * Markov state transition probability matrix builder
 *
 */
public class StateTransitionTableBuilder {

	//
	// model.states=SL,SE,SG,ML,ME,MG,LL,LE,LG
	//
	// states<key, value>: key is the state and value is row/column in table
	//
	private Map<String, Integer> states = null;
	private double[][] table = null;
    private int numberOfStates;
	private int scale = 100;

	private void initStates(){
	    states = new HashMap<String, Integer>();
		states.put("SL", 0);	
		states.put("SE", 1);	
		states.put("SG", 2);	
		states.put("ML", 3);	
		states.put("ME", 4);	
		states.put("MG", 5);	
		states.put("LL", 6);	
		states.put("LE", 7);	
		states.put("LG", 8);	
	}
			
	public StateTransitionTableBuilder(int numberOfStates) {
		this.numberOfStates = numberOfStates;
		table = new double[numberOfStates][numberOfStates];
		initStates();
	}
	
	public StateTransitionTableBuilder(int numberOfStates, int scale) {
		this(numberOfStates);
		this.scale = scale;
	}

    public void add(String fromState, String toState, int count) {
    	int row = states.get(fromState);
    	int column = states.get(toState);
        table[row][column] = count;
    }
    
	public void normalizeRows() {
		// Laplace correction: the usual solution is to do a 
		// Laplacian correction by upping all the counts by 1
		// see: http://cs.nyu.edu/faculty/davise/ai/bayesText.html		
		for (int r = 0; r < numberOfStates; r++) {
			boolean gotZeroCount = false;
			for (int c = 0; c < numberOfStates; c++) {
				if(table[r][c] == 0) {
					gotZeroCount = true;
					break;
				}
			}
			
			if (gotZeroCount) {
				for (int c = 0; c < numberOfStates; c++) {
					 table[r][c] += 1;
				}			
			}
		}		
		
		//normalize
		for (int r = 0; r < numberOfStates; r++) {
			double rowSum = getRowSum(r);
			for (int c = 0; c < numberOfStates; c++) {
				table[r][c] = table[r][c] / rowSum;
			}
		}
	}
	
    public double getRowSum(int rowNumber) {
        double sum = 0.0;
        for (int column = 0; column < numberOfStates; column++) {
            sum += table[rowNumber][column];
        }
        return sum;
    }

    public String serializeRow(int rowNumber) {
        StringBuilder builder = new StringBuilder();
        for (int column = 0; column < numberOfStates; column++) {
        	double element = table[rowNumber][column];
        	builder.append(String.format("%.4g", element));
            if (column < (numberOfStates-1)) {
            	builder.append(",");
            }
        }
        return builder.toString();
    }

    public void persistTable() {
		for (int row = 0; row < numberOfStates; row++) {
        	String serializedRow = serializeRow(row);
        	System.out.println(serializedRow);
        }
    }
   
	public static void generateStateTransitionTable(String hdfsDirectory) {
		List<TableItem> list = ReadDataFromHDFS.readDirectory(hdfsDirectory);
	    StateTransitionTableBuilder tableBuilder = new StateTransitionTableBuilder(9);
	    for (TableItem item : list) {
	    	tableBuilder.add(item.fromState, item.toState, item.count);
	    }
	    
	    tableBuilder.normalizeRows();
	    tableBuilder.persistTable();
	}
	
	public static void main(String[] args) {
		String hdfsDirectory = args[0];
		generateStateTransitionTable(hdfsDirectory);
	}	
}

```