In [1]:
import random

# Sample Mapper

source: https://github.com/gamboviol/bpr/blob/master/bdoopr.py

Map-reduce algorithm to create a schedule of BPR samples.
The probability of emitting a candidate positive item
in the first mapper is designed to give a uniform
probability of any item in the dataset being output
as the positive item in the final list of triples.

In [2]:
class Mapper:

    def __init__(self, user_item_counts, oversampling=1):
        self.N = sum(user_item_counts.values())  # number of non-zeros
        self.user_item_counts = user_item_counts
        self.max_item_count = max(user_item_counts.values())
        self.oversampling = oversampling

    def sample_positive(self, user):
        alpha = float(self.N - self.max_item_count) / (self.N - self.user_item_counts[user])
        return random.uniform(0,1) < alpha

    def random_index(self):
        return random.randint(0, self.N * self.oversampling)

    # use yield to take less memory
    def __call__(self, user, item):
        # send candidate items to random indices
        for _ in range(self.oversampling):
            if self.sample_positive(user):
                # propose a candidate positive item
                yield self.random_index(), (user, item, '+')
            # propose a candidate negative item
            yield self.random_index(), (user, item, '-')

In [3]:
def reducer(index, values):
    # sample a positive and negative item uniformly to make a candidate triple
    seen = {'+': [], '-': []}
    for user, item, label in values:
        seen[label].append((user, item))
    if seen['+'] and seen['-']:
        # we've got at least one postive and one negative item, now pick one
        pos = random.choice(seen['+'])
        neg = random.choice(seen['-'])
        yield (pos[0], neg[1]), pos[1]  # candidate triple as (u,j),i

In [4]:
# a indicator used to check whether J is unpurchased/unengaged item
J_IS_POSITIVE = '-'

In [5]:
def indicator_mapper(user, item):
    # map the data again with an indicator value
    # to help us spot negative items in candidate triples
    yield (user, item), J_IS_POSITIVE

In [6]:
def indicator_reducer(key, values):
    user, j = key
    values = list(values)
    # check the positive items
    ii = set(i for i in values if i != J_IS_POSITIVE)
    if len(ii) == len(values):
        # j really is a negative item for u
        for i in ii:
            yield user, (i, j)

# Map Processor

Python map-reduce implementation for testing and
proof-of-concept experiments.
Use it like this:
    first define your mapper and reducer functions
    they should be generators i.e. use "yield" and
    not "return"
    
    def mapper(key,val):
        # can yield multiple k,v pairs for each input
        yield key,val**2
        yield key,val**3
        
    def reducer(key,vals):
        yield key,sum(vals)
    
    now run the job!
    mapreduce(infile,outfile,mapper=mapper,reducer=reducer)
You can specify a custom parser to read your input,
and a custom formatter to format your output.  The
default_parser and default_formatter read and write
tsv.

In [7]:
from itertools import chain

In [10]:
def default_parser(line):
    """read tab-separated key, val from line"""
    return map(eval, line.strip().split('\t'))

In [11]:
def default_formatter(key, val):
    """format key, val as tsv"""
    if isinstance(key, str):
        key = "'" + key + "'"
    if isinstance(val, str):
        val = "'" + val + "'"
    return '{0}\t{1}'.format(key, val)

In [12]:
def identity_mapper(key, val):
    """output key, val without change"""
    yield key, val

In [13]:
def identity_reducer(key, vals):
    """output key, val for each value in vals"""
    for val in vals:
        yield key, val

In [15]:
def mapreduce(infile,
              outfile,
              parser = default_parser,
              formatter = default_formatter,
              mapper = identity_mapper,
              reducer = None):
    """run map-reduce job specified by mapper and reducer generator functions"""

    out = open(outfile,'w')
    if not isinstance(infile, list):
        infile = [infile]
    map_out = chain.from_iterable(chain.from_iterable(mapper(*parser(line)) for line in open(f)) \
                  for f in infile)

    if reducer:
        last_key = None
        vals = []
        for key, val in sorted((k, v) for k, v in map_out):
            if key != last_key:
                if last_key is not None:
                    for k, v in reducer(last_key, vals):
                        print(formatter(k, v), file=out)
                last_key = key
                vals = []
            vals.append(val)
        for k, v in reducer(last_key, vals):
            print(formatter(k, v), file=out)
    else:
        for key, val in map_out:
            print(formatter(key, val), file=out)