# INFO3406 - Introduction to Data Analytics
## Assignment 2 -  Map Reduce

#### ** Instructions **

#### Kaggle Display Advertising Challenge dataset will be used in this assignment. It contains of 39 features of online ads and information of if each ad was clicked or not over a period of 7 days. The semantic of these features is undisclosed. The overall objective is to determine if an ad will be clicked or not, for a set of query of features.

#### For this assignment, only 100,000 ads will be used. The dataset should be downloaded according to the instructions in part 1). The first column of *"dac_sample.txt"*  indicates if an ad was clicked (=1) or not (=0) while rest of the 39 columns contain feature values. For this assignment you can consider all features are categorical. The values of some of these features have been hashed for anonymization purposes. Note that some features have missing values.



You may view spark stages from
http://localhost:4040/stages/ 


Some other useful resources/references:
+ [Map-Reduce for Machine Learning on Multicore](http://papers.nips.cc/paper/3150-map-reduce-for-machine-learning-on-multicore.pdf)
+ [Spark RDD](http://www.cs.berkeley.edu/~matei/papers/2010/hotcloud_spark.pdf)
+ [Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge)
+ [MLlib](https://spark.apache.org/docs/1.1.0/mllib-guide.html)
+ [MLlib: Scalable Machine Learning on Spark](http://stanford.edu/~rezab/sparkworkshop/slides/xiangrui.pdf)
+ [Scalable Machine Learning](https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x)

### ** Preparing the dataset **

#### Import *"assignment2LoadData.py"* file to the [*jupyter*  home](http://localhost:8001/tree) folder.  Go to [http://labs.criteo.com/downloads/2014-kaggle-display-advertising-challenge-dataset/](http://labs.criteo.com/downloads/2014-kaggle-display-advertising-challenge-dataset/) to download the dataset. After you accept the agreement, you can obtain the dataset by clicking on the ***"Download Sample"*** button.  The file is 8.4 MB compressed (.tar.gz). Import the compressed data file to the  [*jupyter*  home](http://localhost:8001/tree) folder. The script in *"assignment2LoadData.py"* will automatically extract the file to the virtual machine (VM).

In [13]:
import assignment2LoadData as ld
import os
import numpy as np
import os.path
import pyspark
from pyspark.mllib.linalg import SparseVector
from random import randint

url = os.getcwd()
ld.extractData(url)

fileName is data/Assignment2/dac_sample.txt
File is already available. Nothing to do.


In [14]:
#Extract
baseDir = os.path.join('data')
inputPath = os.path.join('Assignment2', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)

partitions = 1
if os.path.isfile(fileName):
    rawData = (sc
               .textFile(fileName, partitions)
               .map(lambda x: x.replace('\t', ',')))  # work with either ',' or '\t' separated data
    print 'An example of rawData entry: ', rawData.take(1)
    nData = rawData.count()
    print '\nrawData count=', nData

An example of rawData entry:  [u'0,1,1,5,0,1382,4,15,2,181,1,2,,2,68fd1e64,80e26c9b,fb936136,7b4723c4,25c83c98,7e0ccccf,de7995b8,1f89b562,a73ee510,a8cd5504,b2cb9c98,37c9c164,2824a5f6,1adce6ef,8ba8b39a,891b62e7,e5ba7672,f54016b9,21ddcdc9,b1252a9d,07b5194c,,3a171ecb,c5c50484,e8b83407,9727dd16']

rawData count= 100000


In [3]:
weights = [.9, .1]
seed = 17
# Use randomSplit with weights and seed
rawTrainData, rawTestData = rawData.randomSplit(weights, seed)
# Cache the data
rawTrainData.cache()
rawTestData.cache()

nTrain = rawTrainData.count()
nTest = rawTestData.count()
print 'rawTrainData count=', nTrain
print 'rawTestData count=', nTest

rawTrainData count= 90027
rawTestData count= 9973


## ** Solutions: **



In [None]:
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

# Creates a list of (featureID, value) tuples for each ad
def parsePoint(point):
    items = point.split(',')
    clean_items = items.fetch(14)
    return [(i, item) for i, item in enumerate(items[1:])]

parsedTrainFeat = rawTrainData.map(parsePoint)

# (featureID,value)
numCategories = (parsedTrainFeat
                 .flatMap(lambda x: x)
                 .distinct().map(lambda x: (x[0], 1))
                 .reduceByKey(lambda x,y: x+y)
                 .sortByKey()
                 .collect())

# Create a One-hot dictionary from parsedTrainFeat
def createOneHotDict(inputData):
    distinctFeats = inputData.flatMap(lambda x: x).distinct()
    return distinctFeats.zipWithIndex().collectAsMap()

ctrOHEDict = createOneHotDict(parsedTrainFeat)

# Counts the number of features
numCtrOHEFeats = len(ctrOHEDict.keys())

# Create a SparseVector from a list of features and OHE dictionary
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    sizeList = [OHEDict[f] for f in rawFeats]
    sortedSizeList = sorted(sizeList)
    valueList = [1 for f in rawFeats]
    return SparseVector(numOHEFeats, sortedSizeList, valueList)

# Obtain the label and feature vector for this raw observation

def parseOHEPoint(point, OHEDict, numOHEFeats):
    parsedPoints = parsePoint(point)
    items = point.split(',')
    label = items[0]
    features = oneHotEncoding(parsedPoints, OHEDict, numOHEFeats)
    return LabeledPoint(label, features)

# # Performs OHE on raw training data
OHETrainData = rawTrainData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))

# # Cache the training data
OHETrainData.cache()

# # Checking to see if we get ma
print(OHETrainData.take(1))

In [None]:
from random import randint
# Create buckets where our buckets are defined by random Centroids
def centroid(indexList):
    valueList = np.ones(39)
    return SparseVector(254310, np.sort(indexList), valueList)

# Parses the Centrod point and returns a new Label and Feature
def parseCentroidPoint(indexList):
    label = 2
    features = centroid(indexList)
    return LabeledPoint(label, features)

# Choose random centroids and assign them to our indexList
def createRandomCentroids(num_centroids):
    indexList = np.zeros((num_centroids,39))
    for i in range(0,num_centroids):
        for j in range(0,39):
            indexList[i,j] = randint(0,254310)
    
    centroid = map(lambda point: parseCentroidPoint(point), indexList)
    return centroid

k = 20

n_train_data = nTrain

centroids = createRandomCentroids(k)

train_data = OHETrainData.take(n_train_data)



In [None]:
# Calculate Hamming Distance 
def hammingDistance(list_one, list_two):
    index_1, index_2 = 0, 0
    counter = 0
    while True:
        if index_1 >= list_one.size or index_2 >= list_two.size:
            break
        elif list_one[index_1] == list_two[index_2]:
            counter = counter + 1
            if index_1 < list_one.size:
                index_1 = index_1 + 1
            else:
                index_2 = index_2 + 1
        elif list_one[index_1] > list_two[index_2]:
            if index_2 < list_two.size:
                index_2 = index_2 + 1
            else: 
                break
        else:
            if index_1 < list_one.size:
                index_1 = index_1 + 1
            else: 
                break
    return counter

# Calculates the cosine length
def cosineLength(list_one, list_two):
    list_one_big = list_one.astype(np.int64)
    list_two_big = list_two.astype(np.int64)
    if list_one.size < list_two.size:
        c = list_two_big.copy()
        c[:len(list_one_big)] += list_one_big
        list_one_big = c
    return ((list_one_big.dot(list_two_big))/(np.sqrt(((list_one_big) ** 2).sum()) * np.sqrt(((list_two_big) ** 2).sum())))


In [7]:
#calculate buckets for the training data
dist = np.zeros((n_train_data,k))
for index in range(0,n_train_data):
    dist[index,:] = map(lambda point: cosineLength(train_data[index].features.indices,point.features.indices), centroids)



In [8]:
# Determine the max distance from cosine, this indicates that there better similarity between Sparse vectors
max_dist = dist.argmax(axis=1)

# Setup Bucket for our training data 
bucket_train = np.zeros((k,n_train_data))
bucket_train.fill(-1)

# print bucket_train
for index, point in enumerate(max_dist):
    bucket_train[point,index] = index




In [9]:
# Parse Test Data
def testOneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    validFeatureTuples = []
    for (featID, value) in rawFeats:
        try:
            validFeatureTuples.append((OHEDict[(featID, value)],1))
        except KeyError:
            pass
    return SparseVector(numOHEFeats, validFeatureTuples)

# Parse OHE points
def parseOHEPoint(point, OHEDict, numOHEFeats):
    parsedPoints = parsePoint(point)
    items = point.split(',')
    label = items[0]
    features = testOneHotEncoding(parsedPoints, OHEDict, numOHEFeats)
    return LabeledPoint(label, features)

# # Performs OHE on raw training data
OHETestData = rawTestData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))

# # Cache the training data
OHETestData.cache()

# Test data
n_test_data = nTest 


test_data = OHETestData.take(n_test_data)

# # Checking to see if we get the right data
print(test_data[0])

(0.0,(254310,[9418,23256,26962,34497,41035,47120,51298,51384,53140,54271,68347,73558,74217,88528,89594,100231,103630,121085,130235,131673,140921,142403,149508,151452,155024,155309,165333,189323,192945,198438,204764,215007,224556,234176,241192,241991,248964,251902],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))


In [10]:
#calculate buckets
dist = np.zeros((n_test_data,k))
for index in range(0,n_test_data):
    dist[index,:] = map(lambda point: cosineLength(test_data[index].features.indices,point.features.indices), centroids)
    
test_bucket = dist.argmax(axis=1)


In [None]:
# Calculate the Euclidean distance for KNN
def euclideanLength(list_one, list_two):
    list_one_big = list_one.astype(np.int64)
    list_two_big = list_two.astype(np.int64)
    if list_one.size < list_two.size:
        c = list_two_big.copy()
        c[:len(list_one_big)] += list_one_big
        list_one_big = c
    return (np.sqrt(np.sum((list_one_big - list_two_big)**2)))

# calculate the Distance for KNN
def calculateDist(currentBucket, train_data, test_point):
    train_bucket = filter(lambda x: (x != -1), currentBucket)
    dist = map(lambda point: cosineLength( test_point.features.indices, train_data[int(point)].features.indices), currentBucket)
    return dist


# Calculate KNN 
def calcKnn(train_bucket, test_bucket,train_data,test_data):
    classes = np.zeros(2)
    predict = np.zeros(test_bucket.size)
    for index, test_point in enumerate(test_bucket):
        currentBucket = train_bucket[test_point]
        classes = np.zeros(2)
        final_dist = calculateDist(currentBucket, train_data, test_data[index]) # Calculate the final distance
        dist = np.argsort(final_dist, kind='mergesort') # Sort distance
        for i in range(0,5):
            count = 0
            size = 0
            if point == -1:
                continue
            elif train_data[dist[i]].label == 0.0:
                classes[0] += 1
            else:
                classes[1] += 1
        # Determine Prediction
        if classes[0] >= classes[1]:
            predict[index] = 0
        else:
            predict[index] = 1
        print ("Prediction: ", int(predict[index]), "Actual: ", int(test_data[index].label))
        if int(predict[index]) == int(test_data[index].label):
            count += 1 
        size += 1
#      predict
    return predict
                
predict_label = calcKnn(bucket_train, test_bucket, train_data, test_data)

count = 0

for i in range(0, predict_label.size):
    if int(test_data[i].label) == int(predict_label[i]):
        count += 1
print count
print predict_label.size
print (str(count*100/predict_label.size)+"%") 

('Prediction: ', 0, 'Actual: ', 0)
('Prediction: ', 0, 'Actual: ', 1)
('Prediction: ', 0, 'Actual: ', 0)
('Prediction: ', 0, 'Actual: ', 0)
('Prediction: ', 0, 'Actual: ', 0)
('Prediction: ', 0, 'Actual: ', 1)
('Prediction: ', 0, 'Actual: ', 0)
('Prediction: ', 0, 'Actual: ', 0)
('Prediction: ', 0, 'Actual: ', 0)
('Prediction: ', 0, 'Actual: ', 0)
('Prediction: ', 0, 'Actual: ', 1)
('Prediction: ', 0, 'Actual: ', 0)

In [None]:
# from collections import defaultdict
# import hashlib

# def hashFunction(numBuckets, rawFeats, printMapping=False):
#     mapping = {}
#     print rawFeats
#     for ind, category in rawFeats:
#         featureString = category + str(ind)
#         mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
#     if(printMapping): print mapping
#     sparseFeatures = defaultdict(float)
#     for bucket in mapping.values():
#         sparseFeatures[bucket] += 1.0
#     return dict(sparseFeatures)

# def paresHashPoint(point, numBuckets):
#     fields = point.split(',')
#     label = fields[0]
#     features = parsePoint(point)
#     return LabeledPoint(label, SparseVector(numBuckets, hashFunction(numBuckets, features)))

# numBucketsCTR = 2 ** 15
# hashTrainData = rawTrainData.map(lambda point: parseHashPoint(point, numBucketsCTR))
# hashTrainData.cache()

