In [1]:
import pyspark
import sys

from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import random

In [2]:
data_train = "lyrl2004_vectors_train.dat"
topic_files = "rcv1-v2.topics.qrels"

In [3]:
sc = SparkContext()

## Loading the data

In [4]:
def generate_dictionary(datapoint):
    ''' 
    Parses and generates a dictionary from one sparse datapoint. 
    From Hogwild python implementation
    '''
    d = {0: 1.0} # Adding the bias
    for elem in datapoint:
        elem = elem.split(':')
        d[int(elem[0])] = float(elem[1])
    return d

In [5]:
def load_data(sc,data_ = data_train, topics_path=topic_files, selected_cat='CCAT'):
    '''
    Function to load the data (we are not using spark here but we could later on during the project)
    sc : spark context
    '''
    rdd = sc.textFile(data_).map(lambda line: line.strip("")).map(lambda line: line.split(' '))
    labels = rdd.map(lambda line: int(line[0]))
    data = rdd.map(lambda line: generate_dictionary(line[2:]))
    labels = labels.toLocalIterator()

    cat = get_category_dict(topics_path)
    labels = [1 if selected_cat in cat[label] else -1 for label in labels]

    return data, labels

In [6]:
def get_category_dict(topics_path):
    ''' Generates the category dictionary using the topics file from:
    http://www.ai.mit.edu/projects/jmlr/papers/volume5/lewis04a/lyrl2004_rcv1v2_README.htm
    From Hogwild python implementation
    '''
    categories = {}
    with open(topics_path) as f:
        content = f.readlines()
        content = [line.strip() for line in content]
        content = [line.split(' ') for line in content]
        for line in content:
            id = int(line[1])
            cat = line[0]
            if id not in categories:
                categories[id] = [cat]
            else:
                categories[id].append(cat)
    return categories

In [7]:
data_train, labels_train = load_data(sc)

In [8]:
data_train = data_train.collect()

In [9]:
m = 0
for i in range(len(data_train)):
    if m < max(data_train[i],key=int):
        m = max(data_train[i],key=int)
print(m)

47236


In [10]:
from pyspark.mllib.linalg import SparseVector

In [11]:
data_train = [SparseVector(m+1,d) for d in data_train]

## SGD

In [62]:
w = [0] * (m+1) 
num_examples = len(data_train)
lambda_ = 0.001
n_workers = 5

In [63]:
n_iterations = 100

In [64]:
def hinge_loss(y,x,w):
    '''
    Compute the value of the hinge loss
    x: sparse_vector
    y: label
    w: weigths vector
    '''
    val = 1 - y * x.dot(w)
    if  val < 0:
        return 0 
    elif val <= float('Inf'):
        return val
    else:
        return float('Inf')

In [65]:
def calculate_primal_objective(y,x,w,lambda_):
    """	
    compute the full cost (the primal objective), that is loss plus regularizer.
    """
    v = hinge_loss(y, X, w)
    return sum(v) + lambda_ / 2 * sum(w ** 2)

def accuracy(y1, y2):
    return sum(y1 == y2)/len(y1)

def prediction(x, w):
    return (x.dot(w) > 0) * 2 - 1

def calculate_accuracy(y, X, w):
    """
    compute the training accuracy on the training set (can be called for test set as well).
    """
    predicted_y = prediction(X, w)
    return accuracy(predicted_y, y)

In [66]:
def calculate_stochastic_gradient(x_n,y_n, w, lambda_, num_examples):
    """compute the stochastic gradient of loss plus regularizer.
    w: shape = (num_features)
    num_examples: N
    """

    def is_support(y_n, x_n, w):
        """a datapoint is support if max{} is not 0. """
        return y_n * x_n.dot(w) < 1

    grad = [-1*x*y_n for x in x_n] if is_support(y_n, x_n, w) else [0] * len(x_n)
    l = [w_n*lambda_ for w_n in w]
    grad = [num_examples * x + y for x,y in zip(grad,l)]
    return grad

In [67]:
def train(iterator,w):
    for x in iterator:
        weights = calculate_stochastic_gradient(x[0],x[1],w,lambda_,1)
    yield weights

In [68]:
def batch_iter(x,y,batch_size = 20):
    y_batch = []
    x_batch = []
    indices = random.sample(range(len(x)),batch_size)
    for indice in indices:
        x_batch.append(x[indice])
        y_batch.append(y[indice])
    return x_batch, y_batch

In [70]:
batch_size = 200
mini_batch_size = 10
for i in range(n_iterations):
    print(i)
    for j in range(batch_size):
        x_batch, y_batch = batch_iter(data_train,labels_train,batch_size= mini_batch_size)
        sgd = sc.parallelize(zip(x_batch,y_batch),numSlices= mini_batch_size) \
        .mapPartitions(lambda it: train(it,w)) \
        .coalesce(20)
    sgd = sgd.reduce(lambda x,y: [a+b for a,b in zip(x,y)])
    sgd = [elem/mini_batch_size for elem in sgd]
    w = [x + y/batch_size for x,y in zip(w,sgd)]

0
1
2
3
4
5
6
7
8
9


KeyboardInterrupt: 

In [59]:
max(w)

0.010000700029500662