<img src="../img/hu-logo.png" align="right" width="120">

# Product2Vec data streamer

This notebook contains an example for a Product2Vec data streamer (steps 1 and 2 of P2V-MAP) that generates a batch of center products, context products and negative samples of size $B$.

1. `DataStreamerP2V` produces batches of P2V-MAP samples
1. `NegativeSamplesGenerator` produces negative samples
1. The notebook uses the two classes to produce a batch of training samples

In [1]:
import collections
import itertools
from typing import Tuple

import numpy as np
import pandas as pd

In [2]:
class DataStreamerP2V:
    """
    Class for generating P2V training samples
        generate_batch: produce a batch of training samples
        reset_iterator: reset data streamer and empty sample cache
    """

    def __init__(
        self,
        data: pd.DataFrame,
        variable_basket: str,
        variable_product: str,
        batch_size: int = 8_192,
        shuffle: bool = True,
        n_negative_samples: int = 0,
        power: float = 0.75,
        allow_context_collisions: bool = False,
    ):
        """
        Initialize P2V data streamer
            data: must contain `variable_basket` and `variable_product`
            variable_basket: basket identifier in `data`
            variable_product: product identifier in `data`
            batch_size: size of a single batch
            shuffle: shuffle data when resetting streamer
            n_negative_samples: number of negative samples per positive sample
            power: distortion factor for negative sample generator
            allow_context_collisions: allow that an id is a positive and a negative sample at the same time
        """
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.cached_samples = []
        self.basket_list = self._basket_df_to_list(
            x=data, variable_basket=variable_basket, variable_product=variable_product
        )
        self.reset_iterator()
        self.produce_negative_samples = n_negative_samples > 0
        if self.produce_negative_samples:
            self.allow_context_collisions = allow_context_collisions
            self.negative_samples_generator = NegativeSamplesGenerator(
                data=data,
                n_negative_samples=n_negative_samples,
                batch_size=self.batch_size,
                power=power,
                variable_product=variable_product,
            )

    def generate_batch(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Produce a batch of training samples containing center products, context products, and negative samples
        """
        # fill cache
        self._fill_cache()

        # generate skip-gram pairs
        output_array = np.asarray(self.cached_samples[: self.batch_size])
        self.cached_samples = self.cached_samples[self.batch_size :]
        center = output_array[:, 0, 0].astype(np.int64)
        context = output_array[:, 1, 0].astype(np.int64)

        # add negative samples
        if self.produce_negative_samples:
            if self.allow_context_collisions:
                negative_samples = (
                    self.negative_samples_generator.get_negative_samples()
                )
            else:
                negative_samples = self.negative_samples_generator.get_negative_samples(
                    context
                ).T
        else:
            negative_samples = np.empty(shape=(2, 0))

        # return
        return center, context, negative_samples

    def reset_iterator(self) -> None:
        """
        Reset data streamer and empty sample cache
        """
        if self.shuffle:
            np.random.shuffle(self.basket_list)
        self.basket_iterator = self._basket_iterator(self.basket_list)
        self.cached_samples = []

    def _basket_df_to_list(self, x, variable_basket, variable_product):
        """
        Turn a basket dataframe into a list of baskets
        """
        x_basket_values = (
            x[[variable_basket, variable_product]].sort_values([variable_basket]).values
        )
        keys = x_basket_values[:, 0]
        ukeys, index = np.unique(keys, True)
        return np.split(x_basket_values[:, 1:], index)[1:]

    def _basket_iterator(self, basket_list):
        """
        Iterator yielding single baskets
        """
        for basket in basket_list:
            yield basket

    def _fill_cache(self):
        """
        Fill sample cache with center-context pairs
        """
        fill_cache = len(self.cached_samples) < self.batch_size
        while fill_cache:
            try:
                new_basket = next(self.basket_iterator, None)
                self.cached_samples.extend(itertools.permutations(new_basket, 2))
            except:
                fill_cache = False
            if len(self.cached_samples) >= self.batch_size:
                fill_cache = False


class NegativeSamplesGenerator:
    """
    Class for generating negative samples
        get_negative_samples: produce negative samples
    """

    def __init__(
        self,
        data: pd.DataFrame,
        n_negative_samples: int,
        batch_size: int,
        power: float = 0.75,
        variable_product: str = "product",
    ) -> None:
        """
        Initialize negative samples generator (for given positive samples)
            data: basket data, must contain `variable_basket` and `variable_product`
            n_negative_samples: number of negative samples per positive sample
            batch_size: size of a single batch
            power: distortion factor for negative sample generator
            variable_product: product identifier in `data`
        """

        self.n_negative_samples = n_negative_samples
        self.batch_size = batch_size
        self.power = power
        self.n_draws = self.batch_size * self.n_negative_samples
        self.variable_product = variable_product
        self.domain = (2 ** 31 - 1,)
        self._build_product_counts(data)
        self._build_cumulative_count_table()
        self.products = np.array(list(self.counts.keys()))

    def get_negative_samples(self, context: np.ndarray = None) -> np.ndarray:
        """
        Produce negative samples (for given positive samples)
            context: context products that may not be used as negative samples
        """
        if context is not None:
            negative_samples = (
                np.zeros((self.n_negative_samples, len(context)), dtype=np.int32) - 1
            )
            done_sampling = False
            while not done_sampling:
                new_sample_index = negative_samples == -1
                n_draws = np.sum(new_sample_index)
                random_integers = np.random.randint(0, self.domain, n_draws)
                new_negative_samples_index = np.searchsorted(
                    self.cumulative_count_table, random_integers
                )
                new_negative_samples = self.products[new_negative_samples_index]
                negative_samples[new_sample_index] = new_negative_samples
                negative_samples[negative_samples == context] = -1
                done_sampling = np.all(negative_samples != -1)
            return negative_samples
        else:
            random_integers = np.random.randint(0, self.domain, self.n_draws)
            negative_samples_index = np.searchsorted(
                self.cumulative_count_table, random_integers
            )
            return self.products[negative_samples_index].reshape(
                (self.batch_size, self.n_negative_samples)
            )

    def _build_product_counts(self, x: pd.DataFrame) -> None:
        """
        Count number of times products occur in basket data
        """
        n_products = x[self.variable_product].max() + 1
        product_counts = (
            x.groupby(self.variable_product)[self.variable_product].count().to_dict()
        )
        product_counts_filled = collections.OrderedDict()
        for j in range(n_products):
            if j not in product_counts:
                product_counts_filled[j] = 0
            else:
                product_counts_filled[j] = product_counts[j]
        self.counts = product_counts_filled

    def _build_cumulative_count_table(self) -> None:
        """
        Build count table (mapped to self.domain) for integer sampling of products
        """
        tmp = np.array(list(self.counts.values())) ** self.power
        cumulative_relative_count_table = np.cumsum(tmp / sum(tmp))
        self.cumulative_count_table = np.int32(
            (cumulative_relative_count_table * self.domain).round()
        )
        assert self.cumulative_count_table[-1] == self.domain

## Load data

In [3]:
baskets = pd.read_parquet("market-baskets.parquet")
baskets.head()

Unnamed: 0,shopper,product,basket
0,0,51,0
1,0,113,0
2,0,154,0
3,0,165,0
4,0,185,0


## Create streamer

In [4]:
data_streamer_p2v = DataStreamerP2V(
    data=baskets,
    variable_basket="basket",
    variable_product="product",
    batch_size=16,
    shuffle=False,
    n_negative_samples=2,
)

## Run streamer

In [5]:
baskets[baskets["basket"] == "00000000"]

Unnamed: 0,shopper,product,basket
0,0,51,0
1,0,113,0
2,0,154,0
3,0,165,0
4,0,185,0
5,0,253,0
6,0,266,0


In [6]:
center, context, negative_samples = data_streamer_p2v.generate_batch()
np.column_stack([center, context])

array([[ 51, 113],
       [ 51, 154],
       [ 51, 165],
       [ 51, 185],
       [ 51, 253],
       [ 51, 266],
       [113,  51],
       [113, 154],
       [113, 165],
       [113, 185],
       [113, 253],
       [113, 266],
       [154,  51],
       [154, 113],
       [154, 165],
       [154, 185]])

<br>
<br>

&mdash; <br>
Dr. Sebastian Gabel <br>
Machine Learning in Marketing &ndash; Homework 04 <br>
2020/21