In [1]:
from collections import defaultdict
import pyspark.sql.types as stypes
import operator
import math

In [None]:
d = sc.textFile("gs://lbanor/dataproc_example/data/2017-11-01").zipW

In [None]:
r = (sc.textFile("gs://lbanor/dataproc_example/data/2017-11-01").zipWithIndex()
     .filter(lambda x: x[1] > 0)
     .map(lambda x: x[0].split(','))
     .map(lambda x: (x[0], (x[1], 0.5 if x[2] == '1' else 2 if x[2] == '2' else 6)))
     .groupByKey().mapValues(list)
     .flatMap(lambda x: aggregate_skus(x)))

In [None]:
print(r.collect()[:10])

In [None]:
print(r.collect()[:10])

In [None]:
d2 = spark.read.csv("gs://lbanor/dataproc_example/data/2017-11-01", header=True)

In [None]:
t = sc.parallelize([('1', 'sku0', 1), ('2', 'sku2', 2), ('1', 'sku1', 1)])

In [None]:
t.zipWithIndex().map(lambda x: (x[0][0], (x[0][1], x[0][2]))).groupByKey().mapValues(list).collect()[:10]

In [8]:
def aggregate_skus(row):
    """Aggregates skus from customers and their respective scores
    :type row: list
    :param row: list having values [user, (sku, score)]
    :rtype: list
    :returns: `yield` on [user, (sku, sum(score))]
    """
    d = defaultdict(float)
    for inner_row in row[1]:
        d[inner_row[0]] += inner_row[1]
    yield (row[0], list(d.items()))

In [None]:
r = d2.rdd.collect()[:10]

In [None]:
r[0].user

In [None]:
print(r.flatMap(lambda x: aggregate_skus(x)).collect()[:10])

In [None]:
r.toDF(schema=_load_users_matrix_schema()).write.json('gs://lbanor/dataproc_example/intermediary/2017-11-01')

In [3]:
def _load_users_matrix_schema():
    """Loads schema with data type [user, [(sku, score), (sku, score)]]
    :rtype: `pyspark.sql.type.StructType`
    :returns: schema speficiation for user -> (sku, score) data.
    """
    return stypes.StructType(fields=[
        stypes.StructField("user", stypes.StringType()),
         stypes.StructField('interactions', stypes.ArrayType(
          stypes.StructType(fields=[stypes.StructField('item', 
           stypes.StringType()), stypes.StructField('score', 
            stypes.FloatType())])))])

In [None]:
dir()

In [None]:
t = sc.parallelize([[0, [1, 2]], [0, [3]]])

In [None]:
print(t.collect())

In [None]:
t.write.json?

In [4]:
t = spark.read.json('gs://lbanor/dataproc_example/intermediary/2017-11-02', schema=_load_users_matrix_schema())

In [15]:
t = spark.read.json('gs://lbanor/dataproc_example/intermediary/2017-11-02/*.gz')

In [16]:
t.rdd.map(lambda x: x).collect()[:10]

[Row(interactions=[Row(item='CL107APF50APL', score=0.5), Row(item='AU176APF23XJC', score=0.5), Row(item='BB228APF43WGQ', score=0.5), Row(item='CL107APF65QXA', score=0.5), Row(item='AG672APF39ZAQ', score=0.5), Row(item='MO131APF88TNN', score=0.5)], user='6935962925703084781'),
 Row(interactions=[Row(item='VI618SHF43ZUOQ', score=0.5), Row(item='VI618SHF39PAM', score=0.5)], user='3976706926628584656'),
 Row(interactions=[Row(item='FA258APF88WLN', score=0.5)], user='4833826057813210049'),
 Row(interactions=[Row(item='PA463SHM37YXI', score=1.0)], user='1283229667069910147'),
 Row(interactions=[Row(item='TR763APM70QXD', score=0.5), Row(item='RE189APM30XWN', score=0.5), Row(item='TR763APF14YKR', score=0.5), Row(item='RE189APM99XXS', score=0.5), Row(item='VO972SRM08MYT', score=0.5), Row(item='CO515APM31DRC', score=0.5), Row(item='FO743APF31PEM', score=1.0), Row(item='PR667APF28HXB', score=0.5), Row(item='LE886APM04FXN', score=0.5), Row(item='PR667APF44FHB', score=0.5), Row(item='DU964APF04DED'

In [None]:
t.head(3)

In [None]:
t.rdd.reduceByKey(operator.add).collect()[:10]

In [None]:
print(t.reduceByKey(operator.add).collect())

In [9]:
 data = (t.rdd
 .reduceByKey(operator.add)
 .flatMap(lambda x: aggregate_skus(x))
 .filter(lambda x: len(x[1]) > 1 and len(x[1]) < 10))

In [6]:
def _process_scores(row):
    """After all user -> score aggregation is done, this method loops
    through each sku for a given user and yields its squared score so
    that we can compute the norm ``||c||`` for each sku column.

    :type row: list
    :param row: list of type [(user, (sku, score))]

    :rtype: tuple
    :returns: tuple of type (sku, (score ** 2))
    """
    for inner_row in row[1]:
        yield (inner_row[0], inner_row[1] ** 2)


In [10]:
norms = {sku: norm for sku, norm in (data.flatMap(lambda x: _process_scores(x))
 .reduceByKey(operator.add)
 .map(lambda x: (x[0], math.sqrt(x[1])))
 .collect())}


In [13]:
data = (data
   .flatMap(lambda x: process_intersections(x, norms))
   .reduceByKey(operator.add)
   .collect()[:20])

In [14]:
data

[(('VI185ACF51CIA', 'EQ443ACF44NCD'), 0.23145502494313788),
 (('SA232SHF71FUY', 'DU277SHF61WLM'), 0.033806170189140665),
 (('UN967APF66VVD', 'RI464APF25OQG'), 0.002715196760230896),
 (('VA134SCF19LVC', 'VA134SCF94WCF'), 0.12309149097933275),
 (('CO515ACF64LQX', 'AC589APF97OZY'), 0.001290361251780358),
 (('CA878ACF83LIM', 'DO723ACF08VWD'), 0.5),
 (('VI618SHF13MTQ', 'VI618SHF91GTS'), 0.18057877962865385),
 (('VI185SHF11KRK', 'VI282SHF61OUW'), 0.2886751345948129),
 (('CO980HDU92APZ', 'AL505SHU83DJK'), 0.33333333333333337),
 (('PA026SHM19TEQ', 'GI259SHM96LDV'), 0.06154574548966636),
 (('RA132ACU52KSP', 'DU277SHF89ZFI'), 0.03706246583305506),
 (('VI618SHF79ULG', 'VI618SHF59MTG'), 0.1889822365046136),
 (('MO578SHF10RGN', 'MO578SHF35RFO'), 0.04141576832812911),
 (('LA906APF66LGV', 'LA906APF20GSP'), 0.022140372138502378),
 (('FI911SHF25QTA', 'FI911SHF16QTJ'), 0.15635262099882086),
 (('EQ443ACF67XZW', 'CO515APF17LCG'), 0.058722021951470346),
 (('CA497APM18YRL', 'CA497APM74YWZ'), 0.0819231920519

In [12]:
def process_intersections(row, norms):
    for i in range(len(row[1])):
        for j in range(i + 1, len(row[1])):
            #yield row[1][i]
            yield ((row[1][i][0], row[1][j][0]), row[1][i][1] * row[1][j][1] / (norms[row[1][i][0]] * norms[row[1][j][0]]))

In [None]:
re = t.flatMap(lambda x: process_intersections(x))