# INFO3406 - Introduction to Data Analytics
## Assignment 2 -  Map Reduce

#### ** Instructions **

#### Kaggle Display Advertising Challenge dataset will be used in this assignment. It contains of 39 features of online ads and information of if each ad was clicked or not over a period of 7 days. The semantic of these features is undisclosed. The overall objective is to determine if an ad will be clicked or not, for a set of query of features.

#### For this assignment, only 100,000 ads will be used. The dataset should be downloaded according to the instructions in part 1). The first column of *"dac_sample.txt"*  indicates if an ad was clicked (=1) or not (=0) while rest of the 39 columns contain feature values. For this assignment you can consider all features are categorical. The values of some of these features have been hashed for anonymization purposes. Note that some features have missing values.



You may view spark stages from
http://localhost:4040/stages/ 


Some other useful resources/references:
+ [Map-Reduce for Machine Learning on Multicore](http://papers.nips.cc/paper/3150-map-reduce-for-machine-learning-on-multicore.pdf)
+ [Spark RDD](http://www.cs.berkeley.edu/~matei/papers/2010/hotcloud_spark.pdf)
+ [Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge)
+ [MLlib](https://spark.apache.org/docs/1.1.0/mllib-guide.html)
+ [MLlib: Scalable Machine Learning on Spark](http://stanford.edu/~rezab/sparkworkshop/slides/xiangrui.pdf)
+ [Scalable Machine Learning](https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x)

### ** Preparing the dataset **

#### Import *"assignment2LoadData.py"* file to the [*jupyter*  home](http://localhost:8001/tree) folder.  Go to [http://labs.criteo.com/downloads/2014-kaggle-display-advertising-challenge-dataset/](http://labs.criteo.com/downloads/2014-kaggle-display-advertising-challenge-dataset/) to download the dataset. After you accept the agreement, you can obtain the dataset by clicking on the ***"Download Sample"*** button.  The file is 8.4 MB compressed (.tar.gz). Import the compressed data file to the  [*jupyter*  home](http://localhost:8001/tree) folder. The script in *"assignment2LoadData.py"* will automatically extract the file to the virtual machine (VM).

In [1]:
import assignment2LoadData as ld

ld.extractData("data/Assignment2/dac_sample.txt")

fileName is data/Assignment2/dac_sample.txt
File is already available. Nothing to do.


In [2]:
#Extract
import numpy as np
import os.path
import random
import time
from __future__ import division
t0 = time.time()
baseDir = os.path.join('data')
inputPath = os.path.join('Assignment2', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)

partitions = 1
if os.path.isfile(fileName):
    rawData_0 = (sc
               .textFile(fileName, partitions)
               .map(lambda x: x.replace('\t', ','))
              )
    # work with either ',' or '\t' separated data
    rawData_1 = rawData_0.map(lambda x: x.replace(',,', ',a,'))
    rawData = rawData_1.map(lambda x: x.replace(',,', ',a,'))  
    print 'An example of rawData entry: ', rawData.take(1)
    print rawData.collect()[0]
    nData = rawData.count()
    print '\nrawData count=', nData

An example of rawData entry:  [u'0,1,1,5,0,1382,4,15,2,181,1,2,a,2,68fd1e64,80e26c9b,fb936136,7b4723c4,25c83c98,7e0ccccf,de7995b8,1f89b562,a73ee510,a8cd5504,b2cb9c98,37c9c164,2824a5f6,1adce6ef,8ba8b39a,891b62e7,e5ba7672,f54016b9,21ddcdc9,b1252a9d,07b5194c,a,3a171ecb,c5c50484,e8b83407,9727dd16']
0,1,1,5,0,1382,4,15,2,181,1,2,a,2,68fd1e64,80e26c9b,fb936136,7b4723c4,25c83c98,7e0ccccf,de7995b8,1f89b562,a73ee510,a8cd5504,b2cb9c98,37c9c164,2824a5f6,1adce6ef,8ba8b39a,891b62e7,e5ba7672,f54016b9,21ddcdc9,b1252a9d,07b5194c,a,3a171ecb,c5c50484,e8b83407,9727dd16

rawData count= 100000


###Decrease or increase sample size

In [3]:
# decrease the sample size for quick testing and debugging purposes

# n values to take from rawData
n = 20000

# takes n values from rawData and converts it to list
rawData = rawData.take(n)

#convert the list back to RDD
rawData = sc.parallelize(rawData)

In [4]:
weights = [.9, .1]
seed = 10
# Use randomSplit with weights and seed
rawTrainData, rawTestData = rawData.randomSplit(weights, seed)
# Cache the data
#rawTrainData.cache()
#rawTestData.cache()

nTrain = rawTrainData.count()
nTest = rawTestData.count()

print 'rawTrainData count=', nTrain
print 'rawTestData count=', nTest
print rawTestData.collect()[0]

rawTrainData count= 18035
rawTestData count= 1965
0,2,0,44,1,102,8,2,2,4,1,1,a,4,68fd1e64,f0cf0024,6f67f7e5,41274cd7,25c83c98,fe6b92e5,922afcc0,0b153874,a73ee510,2b53e5fb,4f1b46f3,623049e6,d7020589,b28479f6,e6c5b5cd,c92f3b61,07c540c4,b04e4670,21ddcdc9,5840adea,60f6221e,a,3a171ecb,43f13e8b,e8b83407,731c3655


In [5]:
def parsePoint(point):
    """Converts a comma separated string into a list of (featureID, value) tuples. 
    Args: 
        point (str): A comma separated string where the first value is the label and the rest are features. 
    Returns: 
        list: A list of (featureID, value) tuples. "value" is a category from 0 to 38. 
    """
    items = point.split(',')
    return [(i, item) for i, item in enumerate(items[1:])]

parsedTrainData = rawTrainData.map(parsePoint)
parsedTestData = rawTestData.map(parsePoint)
parsedData = rawData.map(parsePoint)

#number of different things for each feature
numCategories = (parsedData
                 .flatMap(lambda x: x)
                 .distinct()
                 .map(lambda x: (x[0], 1))
                 .reduceByKey(lambda x, y: x + y)
                 .sortByKey()
                 .collect())

def createOneHotDict(inputData):
    """
    Creates a one-hot-encoder dictionary based on the input data.
    
    Args: 
        inputData (RDD of lists of (int, str)): An RDD of observations where each observation is made up of a list 
        of (featureID, value) tuples.
    Returns: 
        dict: A dictionary where the keys are (featureID, value) tuples and map to values that are unique integers. 
    """
    distinctFeats = inputData.flatMap(lambda x: x).distinct() #Get the flatmap of inputData, then distinct() 
    return distinctFeats.zipWithIndex().collectAsMap()

#Create one hot dictionary from parsedTrainFeat 
ctrOHEDict = createOneHotDict(parsedData) 

numOHEFeats = len(ctrOHEDict.keys()) 
print '\nNumber of OHE features =', numOHEFeats


Number of OHE features = 84859


### Min Hashing

Reference: http://mccormickml.com/2015/06/12/minhash-tutorial-with-python-code/

In [6]:
import random

# Min Hashing

# a and b are random coefficients
# c is a random prime number greater than numOHEFeats
numHashes = 10

a = random.sample(range(0, numOHEFeats), 10)
b = random.sample(range(0, numOHEFeats), 10)

# c = 273551 # c for whole data set
c = 84859 # c for 20000 sample

# hashes every unique id obtained from OHEDict

def hashFunction(ids):
    hashes=[]
    for i in range(0, numHashes):
        hashID = list( map(lambda x: 
                           (a[i] * x + b[i]) %c ,ids))
        minHash = reduce(lambda x,y:
                         x if x<y else y ,hashID)
        hashes.append(minHash)
    return hashes

# get ids for featureID value pairs from OHEDict and returns it
def getIDs(ad):
    ids = []
    for featureValuePair in ad:
        featureID = ctrOHEDict[featureValuePair]
        ids.append(featureID)
    return ids

In [7]:
# convert (featureID, value) pair to IDs
trainDataIDs = parsedTrainData.map(getIDs)
testDataIDs = parsedTestData.map(getIDs)

# hash the IDs using the hashFunction
hashedTrainIDs = trainDataIDs.map(hashFunction)
hashedTestIDs = testDataIDs.map(hashFunction)

### Classification

In [8]:
def convertToList(RDD):
    return RDD.collect()

#creat separate buckets function
trainIDsList = convertToList(hashedTrainIDs)
testIDsList = convertToList(hashedTestIDs)

def classifySignatures(hashedIDs):
    bucket = []
    adNumber = 0
    # loop through all the trainData
    i = 0
    while i != nTrain:
        # counter for number of hash matches
        counter = 0
        j = 0
        # loop though all the features in the ads comparing
        for hashedID in hashedIDs:
            if hashedID == trainIDsList[i][j]:
                j += 1
                counter += 1
        # number of hashes to match        
        if counter >= 5 : 
            bucket.append(adNumber)
        # move to next ad
        adNumber += 1
        i += 1
    return bucket

In [9]:
# classify the hashed ids into buckets
buckets = hashedTestIDs.map(classifySignatures)
  
# convert RDD to list for iteration
allBuckets = buckets.collect()
 

In [10]:
# convert RDD to list
parsedTestDataList = convertToList(parsedTestData)

# parses the list for joining
def parse(lists):
    index = 0
    List = []
    try:
        while index != len(lists):
            tmp = [index, lists[index]]
            index += 1
            List.append(tmp)
    except IndexError:
        pass
    
    return List

In [11]:
# parse the list
testDataList = parse(parsedTestDataList)
bucketsList = parse(allBuckets)

#convert back to RDD using spark parallelization technique
testDataRDD = sc.parallelize (testDataList)
bucketsRDD = sc.parallelize (bucketsList)

#join two RDD together
combinedList = testDataRDD.join (bucketsRDD)

### KNN

In [12]:
trainingFeatures = convertToList(parsedTrainData)

import operator

def calcKNN(row):
    k = 5
    dis = []
    if len(row[1][1]) == 0:
        for i in range(0,k):
            dis.append([1,0])#for all the neighbours initalizing the distances 
    else :    
        for item in row[1][1]:
            count = 0
            hammingDistance = 0
            for items in trainingFeatures[item]:
                if items != row[1][0][count]:#using the hamming distance metric to find the count of difference between 2 signatures
                    hammingDistance += 1
                count += 1
    
            dis.append([hammingDistance, item])#adding the hamming distance of the corresponding item
    dis.sort(key = operator.itemgetter(1))
    return dis

knn = combinedList.map(calcKNN)
print knn.take(1) 

[[[1, 0], [1, 0], [1, 0], [1, 0], [1, 0]]]


###Prediction

In [13]:
# splits the first column from rawData and returns it
# first column = (0,1) whether the ad was clicked or not
def  getClickedStatus(dec):
    items = dec.split(',')
    return [items[0]]

# Extract the clicked or not clicked values from the raw data
trainClickedStatus = rawTrainData.map(getClickedStatus)
testClickedStatus = rawTestData.map(getClickedStatus)

# convert RDD type to list
trainClickedList = convertToList(trainClickedStatus)
testClickedList = convertToList(testClickedStatus)

#predicting wether an advertisment was clicked or not by using the weight factors
def prediction(values):
    x = 0 #clicked weight factor
    y = 0 #unclicked weight factor
    for i in values:
        if trainClickedList[i[1]] == [u'1']:
            x=x+(1/(pow(i[0]+1,2)))
        else:
            y=y+(1/(pow(i[0]+1,2)))
    if x >= y:
        click=[u'1']
    else:
        click=[u'0']
    return click

guess = knn.map(prediction)
guessList = convertToList(guess)

### Calculate accuracy

In [14]:
# correct guess counter
correctGuesses = 0

# compares the guess against actual data
for i in range(0,nTest):
    if guessList[i] == testClickedList[i]:
        correctGuesses += 1

print "Accuracy = ", correctGuesses/nTest

Accuracy =  0.778117048346
