In [None]:
import pyspark
import numpy as np
from pyspark.mllib.linalg.distributed import CoordinateMatrix
from pyspark.mllib.linalg.distributed import MatrixEntry
from pyspark.sql.types import *

In [None]:
minimal_fields = [ 
          StructField("author", StringType(), True),
          StructField("score", LongType(), True),
          StructField("controversiality", LongType(), True),
          StructField("subreddit", StringType(), True)]
sj = sqlContext.read.json("s3a://insight-ohoidn/sample3.json", StructType(minimal_fields))

In [None]:
occurrences = sqlContext.sql("""
select *, dense_rank() over (order by subreddit desc) as rid 
from (SELECT subreddit, author, sum(score) as tally,\
        sum(abs(score)) as activity, dense_rank() over (order by author desc) as uid
    from test
    group by subreddit, author)
where tally!=0
""").persist(StorageLevel.MEMORY_AND_DISK_SER)
occurrences.registerTempTable('occurrences')
 
coordinate_matrix = CoordinateMatrix(bare_occurrences.rdd.map(tuple))

In [None]:
from operator import add

# TODO: check that zero entries are correctly filtered
def coordinateMatrixMultiply(leftmat, rightmat):
    m = leftmat.entries.map(lambda entry: (entry.j, (entry.i, entry.value)))
    n = rightmat.entries.map(lambda entry: (entry.i, (entry.j, entry.value)))
    product_entries = m.join(n)\
    .map(lambda tup: ((tup[1][0][0], tup[1][1][0]), (tup[1][0][1] * tup[1][1][1])))\
    .reduceByKey(add)\
    .map(lambda record: MatrixEntry(record[0][0], record[0][1], record[1]))
    
    return pyspark.mllib.linalg.distributed.CoordinateMatrix(product_entries)

def coordinateMatrixAdd(leftmat, rightmat, scalar):
    """
    Return leftmat + scalar * rightmat
    """
    m = leftmat.entries.map(lambda entry: ((entry.i, entry.j), entry.value))
    n = rightmat.entries.map(lambda entry: ((entry.i, entry.j), scalar * entry.value))
    matsum = m.fullOuterJoin(n)\
    .map(lambda tup: MatrixEntry(tup[0][0], tup[0][1],
                                 reduce(add, filter(lambda elt: elt is not None, tup[1]))))
    
    #return matsum
    return pyspark.mllib.linalg.distributed.CoordinateMatrix(matsum)

def coordinate_matrix_elementwise_vector_division(mat, vec):
    """
    mat : CoordinateMatrix
    
    mat_{ij} -> mat_{ij}/vec_{i}
    """
    m = mat.entries.map(lambda entry: (entry.i, (entry.j, entry.value)))
    v = vec.entries.map(lambda entry: (entry.i, (entry.j, entry.value)))
    matdiv = m.join(v).map(lambda tup: MatrixEntry(tup[0], tup[1][0][0], float(tup[1][0][1]) / tup[1][1][1]))
    
    return pyspark.mllib.linalg.distributed.CoordinateMatrix(matdiv)

def coordinate_matrix_elementwise_matrix_multiplication(mat1, mat2):
    """
    mat : CoordinateMatrix
    
    return matprod, where matprod_{ij} = mat1_{ij} * mat2_{ij}
    """
    m1 = mat1.entries.map(lambda entry: ((entry.i, entry.j), entry.value))
    m2 = mat2.entries.map(lambda entry: ((entry.i, entry.j), entry.value))
    
    matprod = m1.join(m2).map(lambda tup: MatrixEntry(tup[0][0], tup[0][1], tup[1][0] * tup[1][1]))
    return pyspark.mllib.linalg.distributed.CoordinateMatrix(matprod)

def coordinate_matrix_sumj(mat):
    """
    mat : CoordinateMatrix
    """
    summed_entries = mat.entries.map(lambda entry: (entry.i, entry.value)).reduceByKey(add)\
                          .map(lambda tup: MatrixEntry(tup[0], 0, tup[1]))

    return pyspark.mllib.linalg.distributed.CoordinateMatrix(summed_entries)

def coordinate_matrix_row(mat, i):
    """
    mat : CoordinateMatrix
    
    return the specified row vector
    """
    filtered_entries = mat.entries.filter(lambda entry: entry.i == i).\
        map(lambda entry: MatrixEntry(0, entry.j, entry.value))
    return pyspark.mllib.linalg.distributed.CoordinateMatrix(filtered_entries)

def coordinate_vector_matrix_norm(vec):
    """
    TODO: type checking, confusing name?
    """
    return np.sqrt(vec.entries.map(lambda entry: entry.value**2).sum())

def coordinate_matrix_vector_l2(vec1, vec2):
    """
    Given two vectors of the data type CoordinateMatrix, return L2 norm of vec1/|vec1| - vec2/|vec2|
    """
    norm1, norm2 = map(coordinate_vector_matrix_norm, [vec1, vec2])

    vec1normed = coordinateMatrixScalarMult(vec1, 1./norm1)
    vec2normed = coordinateMatrixScalarMult(vec2, 1./norm2)
    
    diff = coordinateMatrixAdd(vec1normed, vec2normed, -1.)
    return coordinate_vector_matrix_norm(diff)

def sort_row_indices_by_distance(mat, vec):
    """
    Return a list of the row indices of mat sorted by the ascending L2 distance between normalized row vectors and vec/|vec|
    """
    size = mat.numCols()
    row_vectors = mat.entries.map(lambda entry: (entry.i, [(entry.j, entry.value)]))\
        .reduceByKey(add).map(lambda tup: (tup[0], Vectors.sparse(size, tup[1])))
    # TODO replace all 1D CoordinateMatrix instances by local sparse vectors
    compare_vector = vec.entries.map(lambda entry: ('', [(entry.j, entry.value)])).reduceByKey(add)\
        .map(lambda tup: Vectors.sparse(size, tup[1])).collect()[0]
    # TODO check vector normalization
    return row_vectors\
        .map(lambda tup: (tup[1].squared_distance(compare_vector)/(tup[1].norm(2) * compare_vector.norm(2)), tup[0])).sortByKey()\
        .map(lambda tup: tup[1]).collect()
        
def coordinatematrix_get_row(mat, i):
    return pyspark.mllib.linalg.distributed.CoordinateMatrix(
                    mat.entries.filter(lambda entry: entry.i == i))

def coordinatematrix_to_sparse_vector(mat):
    """
    mat : CoordinateMatrix
    
    Mat is assumed to have non-zero entries in only the 0th row index
    """
    size = mat.numCols()
    return Vectors.sparse(size, mat.entries.map(lambda entry: (entry.j, entry.value)).collect())

def coordinatematrix_multiply_vector_elementwise(mat, vec):
    """
    mat : CoordinateMatrix
    vec : CoordinateMatrix
    """
    mat_entries = mat.entries.map(lambda entry: (entry.i, (entry.j, entry.value)))
    vec_entries = vec.entries.map(lambda entry: (entry.i, entry.value))

    prod = vec_entries.join(mat_entries).map(lambda tup: MatrixEntry(tup[0], tup[1][1][0], tup[1][0] * tup[1][1][1]))
    return pyspark.mllib.linalg.distributed.CoordinateMatrix(prod)


def coordinateMatrixElementwise(mat, op):
    """
    elt -> op(elt) for each nonzero element elt of the matrix mat
    """
    new_entries = mat.entries.map(lambda entry: MatrixEntry(entry.i, entry.j, op(entry.value)))
    return pyspark.mllib.linalg.distributed.CoordinateMatrix(new_entries)


def coordinateMatrixElementwiseMultiplication(mat, scalar):
    """
    return scalar * mat
    """
    new_entries = mat.entries.map(lambda entry: MatrixEntry(entry.i, entry.j, scalar * entry.value))
    return pyspark.mllib.linalg.distributed.CoordinateMatrix(new_entries)

def coordinate_matrix_to_ndarr(mat):
    size = mat.entries.count()
    elts = mat.entries.take(size)
    arr = np.zeros((mat.numRows(), mat.numCols()))
    for elt in elts:
        arr[elt.i][elt.j] = elt.value
    return arr

def ndarr_to_coord_array(arr):
    entries = []
    for i in range(len(arr)):
        for j in range(len(arr[0])):
            if arr[i][j] != 0:
                entries.append((i, j, arr[i][j]))
    return CoordinateMatrix(sc.parallelize(entries))

