# INFO3406 - Introduction to Data Analytics
## Assignment 2 -  Map Reduce

#### ** Instructions **

#### Kaggle Display Advertising Challenge dataset will be used in this assignment. It contains of 39 features of online ads and information of if each ad was clicked or not over a period of 7 days. The semantic of these features is undisclosed. The overall objective is to determine if an ad will be clicked or not, for a set of query of features.

#### For this assignment, only 100,000 ads will be used. The dataset should be downloaded according to the instructions in part 1). The first column of *"dac_sample.txt"*  indicates if an ad was clicked (=1) or not (=0) while rest of the 39 columns contain feature values. For this assignment you can consider all features are categorical. The values of some of these features have been hashed for anonymization purposes. Note that some features have missing values.



You may view spark stages from
http://localhost:4040/stages/ 


Some other useful resources/references:
+ [Map-Reduce for Machine Learning on Multicore](http://papers.nips.cc/paper/3150-map-reduce-for-machine-learning-on-multicore.pdf)
+ [Spark RDD](http://www.cs.berkeley.edu/~matei/papers/2010/hotcloud_spark.pdf)
+ [Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge)
+ [MLlib](https://spark.apache.org/docs/1.1.0/mllib-guide.html)
+ [MLlib: Scalable Machine Learning on Spark](http://stanford.edu/~rezab/sparkworkshop/slides/xiangrui.pdf)
+ [Scalable Machine Learning](https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x)

### ** Preparing the dataset **

#### Import *"assignment2LoadData.py"* file to the [*jupyter*  home](http://localhost:8001/tree) folder.  Run the following cell to download the dataset. After you accept the agreement, you can obtain the download URL by right-clicking on the ***"Download Sample"*** button and clicking "Copy link address" or "Copy Link Location", depending on your browser. Paste the URL into the # TODO in the next cell. The file is 8.4 MB compressed (.tar.gz). The script in *"assignment2LoadData.py"* will automatically download the file to the virtual machine (VM) and then extract the data.

In [1]:
# Here is the assignemnt 2 for INFO3406
# SID:430028388 Hongliang Chi
# free tp contact if there is any problem, email: hchi3573@uni.sydney.edu.au
# [Attention]to run the code, it is needed to upload the folder called nilsimsa

from IPython.lib.display import IFrame

IFrame("http://labs.criteo.com/downloads/2014-kaggle-display-advertising-challenge-dataset/",
       800, 400)

In [1]:
url = 'http://labs.criteo.com/wp-content/uploads/2015/04/dac_sample.tar.gz' #It may be similar to 'https://s?-eu-west-1.amazonaws.com/criteo-labs/dac.tar.gz'

import assignment2LoadData as ld

ld.extractData(url)

File is already available. Nothing to do.


In [2]:
#extract the raw data 
import numpy as np
import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('Assignment2', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)

partitions = 1
if os.path.isfile(fileName):
    rawData = (sc
               .textFile(fileName, partitions)
               .map(lambda x: x.replace('\t', ',')))  # work with either ',' or '\t' separated data
    #print 'An example of rawData entry: ', rawData.take(1)
    nData = rawData.count()
    #print '\nrawData count=', nData

In [4]:
#different weights are used in this search, like in the investigation of how speed change, I use 1000 training data 
#points and 1000 testing points, the corresponding weights = [.01,.01,0.98], the crossvalidation part is [0.95,0.05]
#with the change of seeds. please free to change weights to run data. 
#I recommend to use small data to run the code to see the validity and reliability of code.

weights = [.01,.01,.98]
seed = 17

# Use randomSplit with weights and seed
rawTrainData,rawTestData,rawValidData = rawData.randomSplit(weights, seed)
# Cache the data, this is a application of Spark
rawTrainData.cache()
rawTestData.cache()

#nTrain = rawTrainData.count()
#nTest = rawTestData.count()
#print 'rawTrainData count=', nTrain
#print 'rawTestData count=', nTest

PythonRDD[6] at RDD at PythonRDD.scala:43

### ** Answers **



In [7]:
def parsePoint(point):
    #Converts a comma separated string into a list of tuples.
    items = point.split(',')
    return [(i, item) for i, item in enumerate(items[1:])] 

In [8]:
parsedTrainFeat = rawTrainData.map(parsePoint)
parsedTestFeat = rawTestData.map(parsePoint)

In [9]:
def createOneHotDict(inputData):
    #Creates a one-hot-encoder dictionary based on the input data.
    #use Spark technique flatemap() ,distinct(),Zipwithindex90 and collect()
    distinctFeats = inputData.flatMap(lambda x: x).distinct() 
    return distinctFeats.zipWithIndex().collectAsMap()

In [10]:
ctrOHEDict = createOneHotDict(parsedTrainFeat) #Create one hot dictionary from parsedTrainFeat
numCtrOHEFeats = len(ctrOHEDict.keys())

In [11]:
from pyspark.mllib.linalg import SparseVector
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats): 
    #Produce a one-hot-encoding from a list of features and an OHE dictionary.
    sizeList = [OHEDict[f] for f in rawFeats]
    sortedSizeList = sorted(sizeList)
    valueList = [1 for f in rawFeats]
    return SparseVector(numOHEFeats, sortedSizeList, valueList)

In [12]:
from pyspark.mllib.regression import LabeledPoint

def parseOHEPoint(point, OHEDict, numOHEFeats): 
    #Obtain the label and feature vector for this raw observation.
    parsedPoints = parsePoint(point) 
    items = point.split(',') 
    label = items[0] 
    features = oneHotEncoding(parsedPoints, OHEDict, numOHEFeats) 
    return LabeledPoint(label, features) 
    #return LabeledPoint



In [13]:
OHETrainData = rawTrainData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats)) #call parseOHEPoint for each add in rawTrainData
OHETestData = rawTestData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHETrainData.cache()
OHETestData.cache()

PythonRDD[15] at RDD at PythonRDD.scala:43

In [14]:
from collections import defaultdict
import hashlib
import nilsimsa

# this is the body code for LSH
# [Attention] it is needed to upload folder [Nilsimsa] to run this part of code 
# here we use LSH as a dimension-reducing technique 

def NilsimsaHash(numBuckets, rawFeats, printMapping=False):
    #Calculate a feature dictionary for an observation's features based on hashing.
    mapping = {}
    for ind, category in rawFeats:
        featureString = category + str(ind)
        mapping[featureString] = int(int(nilsimsa.Nilsimsa(featureString).hexdigest(), 16) % numBuckets)
    sparseFeatures = defaultdict(float)
    for bucket in mapping.values():
        sparseFeatures[bucket] += 1.0
    return dict(sparseFeatures)

def LSHparse(point, numBuckets):

    parsedPoints = parsePoint(point)
    items = point.split(',')
    label = items[0]
    features = NilsimsaHash(numBuckets, parsedPoints, printMapping=False)  
    return LabeledPoint(label, SparseVector(numBuckets, features))

#we use LSH to reduce the dimension (signature) to 256
numBucketsCTR = 2 ** 8

#use MapReduce technique 
hashTrainData = rawTrainData.map(lambda x: LSHparse(x, numBucketsCTR))
hashTrainData.cache()
hashTestData = rawTestData.map(lambda x: LSHparse(x, numBucketsCTR))
hashTestData.cache()

PythonRDD[17] at RDD at PythonRDD.scala:43

In [18]:
import numpy as np
import matplotlib.pyplot as plt
import time
print 'testing size=',hashTestData.count()
#it is main body code for KNN


def mdis(araw,braw):
    # use manhattan distance as the distance metrics
    a=araw.features.toArray()
    b=braw.features.toArray()
    distance = 0.0
    for i in range(len(a)):
        distance= distance + abs(a[i]-b[i])
    print distance 
    return distance,araw.label

def Knn(test,train,k):
    #use Mapreduce and KNN to return the most likely lable from K neighbours 
    # the parameter test is only one record of testing set
    nearest = train.map(lambda s :(mdis(s,test))).takeOrdered(k,key =lambda x:x[0]) 
    lst = [v for u,v in nearest]
    return max(set(lst),key =lst.count)

def setknn(test,train,k):
    #run KNN on the whole testing set and output statistics and prediction 
    pre= []
    all=test.take(test.count())
    tp=fp=tn=fn=0.0  
    for i in range(test.count()):
        ins = all[i]
        predict =  Knn(ins,train,k)
        pre.append(predict)
        if predict==ins.label:
            if ins.label ==1:tp=tp+1
            else:tn=tn+1
        elif predict==1:fp=fp+1
        else:fn= fn+1 
    return tp,fp,tn,fn,pre


testing size= 1030


In [19]:
# code to print run time and precision and recall rate & other statistics for K=5
# timer is used to measure time used 
# accuracy statistics is output 
# the specific prediction is pre in [tp,fp,tn,fn,pre]=setknn(hashTestData,hashTrainData,5)
startk5 = time.time()
[tp,fp,tn,fn,pre]=setknn(hashTestData,hashTrainData,5)
endk5 = time.time()
print 'time=' ,'start=',startk5,'end=',endk5,'total=',startk5 - endk5
#print pre
precision = tp/(tp+fp)
recall= tp/(tp+fn)
TPR = tp/(tp+fn)
FPR= fp/(fp+tn)
print 'procision=',precision,'recall=',recall
print 'TPR=', TPR, 'FPR', FPR


time= start= 1446173482.37 end= 1446173916.37 total= -434.000163078
procision= 0.318181818182 recall= 0.12389380531
TPR= 0.12389380531 FPR 0.0746268656716


In [22]:
# the follwing code is used for report writing

"""# run time and precision and recall rate for K=1
startk1 = time.time()
[tp,fp,tn,fn,pre]=setknn(hashTestData,hashTrainData,1)
endk1 = time.time()
print 'time=' ,'start=',startk1,'end=',endk1,'total=',startk1 -endk1
precision = tp/(tp+fp)
recall= tp/(tp+fn)
print 'procision=',precision,'recall=',recall
print 'TPR=', TPR, 'FPR', FPR

# run time and precision and recall rate for K=10
startk10 = time.time()
[tp,fp,tn,fn,pre]=setknn(hashTestData,hashTrainData,10)
endk10 = time.time()
print 'time=' ,'start=',startk10,'end=',endk10,'total=',startk10 -endk10
precision = tp/(tp+fp)
recall= tp/(tp+fn)
TPR = tp/(tp+fn)
FPR= fp/(fp+tn)
print 'procision=',precision,'recall=',recall
print 'TPR=', TPR, 'FPR', FPR

# run time and precision and recall rate for K=10
startk10 = time.time()
[tp,fp,tn,fn,pre]=setknn(hashTestData,hashTrainData,10)
endk10 = time.time()
print 'time=' ,'start=',startk10,'end=',endk10,'total=',startk10 -endk10
precision = tp/(tp+fp)
recall= tp/(tp+fn)
TPR = tp/(tp+fn)
FPR= fp/(fp+tn)
print 'procision=',precision,'recall=',recall
print 'TPR=', TPR, 'FPR', FPR

# run time and precision and recall rate for K=30
startk30 = time.time()
[tp,fp,tn,fn,pre]=setknn(hashTestData,hashTrainData,30)
endk30 = time.time()
print 'time=' ,'start=',startk30,'end=',endk30,'total=',startk30 -endk30
precision = tp/(tp+fp)
recall= tp/(tp+fn)
TPR = tp/(tp+fn)
FPR= fp/(fp+tn)
print 'procision=',precision,'recall=',recall
print 'TPR=', TPR, 'FPR', FPR

# run time and precision and recall rate for K=100
startk100 = time.time()
[tp,fp,tn,fn,pre]=setknn(hashTestData,hashTrainData,100)
endk100 = time.time()
print 'time=' ,'start=',startk100,'end=',endk100,'total=',startk100 -endk100
precision = tp/(tp+fp)
recall= tp/(tp+fn)
print 'procision=',precision,'recall=',recall
print 'TPR=', TPR, 'FPR', FPR

#change the cores of Spark
#cmd:--executor-cores 1/--executor-cores 4/--executor-cores 10

# This is the ROC curve
a=[0,1]
b=[0,1]
plt.plot(AFPR,ATPR)
plt.plot(a,b)
plt.show() """


"# run time and precision and recall rate for K=1\nstartk1 = time.time()\n[tp,fp,tn,fn,pre]=setknn(hashTestData,hashTrainData,1)\nendk1 = time.time()\nprint 'time=' ,'start=',startk1,'end=',endk1,'total=',startk1 -endk1\nprecision = tp/(tp+fp)\nrecall= tp/(tp+fn)\nprint 'procision=',precision,'recall=',recall\nprint 'TPR=', TPR, 'FPR', FPR\n\n# run time and precision and recall rate for K=10\nstartk10 = time.time()\n[tp,fp,tn,fn,pre]=setknn(hashTestData,hashTrainData,10)\nendk10 = time.time()\nprint 'time=' ,'start=',startk10,'end=',endk10,'total=',startk10 -endk10\nprecision = tp/(tp+fp)\nrecall= tp/(tp+fn)\nTPR = tp/(tp+fn)\nFPR= fp/(fp+tn)\nprint 'procision=',precision,'recall=',recall\nprint 'TPR=', TPR, 'FPR', FPR\n\n# run time and precision and recall rate for K=10\nstartk10 = time.time()\n[tp,fp,tn,fn,pre]=setknn(hashTestData,hashTrainData,10)\nendk10 = time.time()\nprint 'time=' ,'start=',startk10,'end=',endk10,'total=',startk10 -endk10\nprecision = tp/(tp+fp)\nrecall= tp/(tp+f