# User Engagement Data

In [None]:
from elasticsearch import Elasticsearch

es_client = Elasticsearch(
    "http://localhost:9200",
    basic_auth=("elastic", "password"),
    verify_certs=False,
    ssl_show_warn=False
)

# Retrieve ABO data
q = {
    "size" : 1000,
    "query": {
        "query_string": {
            "query": "desk",
        }
    }
}

result = es_client.search(index="product", body=q)
hits = result['hits']
products = [hit['_source'] for hit in hits['hits']]
products

In [None]:
# Python -> Logstash -> Elasticsearch pipeline.

import logging
import logstash
import random

test_logger = logging.getLogger('Feedback')
test_logger.setLevel(logging.DEBUG)
if (test_logger.hasHandlers()):
    test_logger.handlers.clear()
test_logger.addHandler(logstash.TCPLogstashHandler('0.0.0.0', 5959 , version=1))

sample = 100
item_ids = df.limit(sample).select("item_id").rdd.flatMap(lambda x: x).collect()

for i in range(sample):
    feedback = {
        'user_id': str(i),
        'item_id': item_ids[i],
        'click': random.randrange(2),
        'rating': random.randrange(6),
    }

    test_logger.info('INFO', extra=feedback)

In [None]:
q = {
    "size": 100,
    "query": {
        "match_all": {}
    }
}

result = es_client.search(index="log", body=q)
hits = result['hits']
feedback = [hit['_source'] for hit in hits['hits']]
filter_keys = { 'item_id', 'user_id', 'click', 'rating' }
feature_matrix = [dict((key, elem[key]) for key in elem.keys() & filter_keys) for elem in feedback]
feature_matrix

In [None]:
df = df.fillna(0)
df = df.dropDuplicates()
df.show()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

assembler = VectorAssembler(inputCols=["rating"], outputCol="rating_vec")
scaler = MinMaxScaler(inputCol="rating_vec", outputCol="scaled_rating")
pipeline = Pipeline(stages=[assembler, scaler])
scaler_model = pipeline.fit(df)
scaled_df = scaler_model.transform(df)
scaled_df.show()

In [None]:
from pyspark.ml.feature import StringIndexer

indexer_user = StringIndexer(inputCol='user_id', outputCol='user_index').setHandleInvalid("keep")
indexer_item = StringIndexer(inputCol='item_id', outputCol='item_index').setHandleInvalid("keep")

df_rec = indexer_user.fit(scaled_df).transform(scaled_df)
df_rec = indexer_item.fit(df_rec).transform(df_rec)

df_rec_final = df_rec.withColumn('item_index', df_rec['item_index'].cast('integer'))\
               .withColumn('user_index', df_rec['user_index'].cast('integer'))
df_rec_final.show()

## Collaborative Filtering

In [None]:
from pyspark.ml.recommendation import ALS

# Alternating Least Square
# https://dl.acm.org/doi/10.1109/MC.2009.263
# computes user x rating and item x rating, given user x item matrix
als = ALS(userCol='user_index', itemCol='item_index', ratingCol='rating',
          coldStartStrategy='drop', nonnegative=True)

model = als.fit(df_rec_final)
# model.save("collaborative_filtering_model")

In [None]:
# top-10 recommendations for user_id 1
recs = model.recommendForAllUsers(10).filter(col('user_index') == 1).select("recommendations")
recs = recs.rdd.flatMap(lambda x: x).collect()
list(map(lambda x: x.asDict(), recs[0]))

In [None]:
# Evaluation metric

# Offline metrics

# Recall@k = first k relevent items / total relevant items
# Precision@k = first k relevent items / k
# MAP@k (Mean Average Precision) = (1/total users) * sum(average precision)
# AP@k (Average Precision) = sum(precision of each position) / total relevant items

# other offline metrics
# Mean Absolute Error (MAE), Root Mean Squared Error (RMSE)
# Normalized Discounted Cumulative Gain (NDCG)

# Online metrics

# A/B Testing
# click-through rate (CTR), conversion rate (CR), and revenue per user (RPU)

from pyspark.mllib.evaluation import RankingMetrics

k = 5
rdd = spark.sparkContext.parallelize([(['3','2','1','0'], ['3','2','0','1'])])
metrics = RankingMetrics(rdd)

print(metrics.meanAveragePrecisionAt(k))
print(metrics.precisionAt(k))
print(metrics.recallAt(k))

In [None]:
schema = StructType([
  StructField('item_id', StringType(), True),
  StructField('brand', StringType(), True),
  StructField('item_name', StringType(), True),
  StructField('item_keywords', StringType(), True),
])

df = spark.createDataFrame(data=products, schema=schema)
df = df.dropDuplicates()
df.show()

In [None]:
# create feature with item_keywords

# tokenize user_query
tokenizer = Tokenizer(inputCol="item_keywords", outputCol="keywords")
tokenized_df = tokenizer.transform(df)
tokenized_df.show()

# vectorize using Word2Vec
word2vec = Word2Vec(vectorSize=100, minCount=1, inputCol="keywords", outputCol="keyword_vector")
word2vec_model = word2vec.fit(tokenized_df)
word_vectors_df = word2vec_model.transform(tokenized_df)
word_vectors_df.show()

In [None]:
item_feature = word_vectors_df.select(['item_id', 'keyword_vector'])
item_feature.show()

In [None]:
from pinecone import Pinecone, ServerlessSpec

pinecone_api_key = 'e2c03eaa-fdf1-46ae-ac56-7e869b205322'
pc = Pinecone(api_key=pinecone_api_key)

# Create Index
index_name = "item-feature"

if not pc.has_index(index_name):
    pc.create_index(
        name=index_name,
        dimension=100,
        metric="cosine",
        spec=ServerlessSpec(
            cloud='aws',
            region='us-east-1'
        )
    )

pc_client = pc.Index(index_name)

In [None]:
# index item feature vector to Pinecone without Spark-Pinecone connector
item_ids = item_feature.select("item_id").rdd.flatMap(lambda x: x).collect()
keyword_vector = item_feature.select("keyword_vector").rdd.flatMap(lambda x: x).collect()

vectors = []
for _id, vec in zip(item_ids, keyword_vector):
    vectors.append({
        "id": _id,
        "values": vec,
    })

pc_client.upsert(
    vectors=vectors,
    namespace="ns1"
)

In [None]:
# create mock user history
user_history = [
    {'user_id': '1', 'query': ['computer desk', 'office desk']},
]

history_df = spark.createDataFrame(user_history)
history_df.show()

word2vec = Word2Vec(vectorSize=100, minCount=1, inputCol="query", outputCol="query_vector")
word2vec_model = word2vec.fit(history_df)
word_vectors_df = word2vec_model.transform(history_df)
word_vectors_df.show()

In [None]:
user_feature = word_vectors_df.select(['user_id', 'query_vector'])
user_feature.show()

In [None]:
from pyspark.ml.functions import vector_to_array

user_vector = user_feature.select("query_vector").rdd.flatMap(lambda x: x).collect()[0].tolist()

results = pc_client.query(
    namespace="ns1",
    vector=user_vector,
    top_k=3,
    include_values=False,
    include_metadata=True
)

sorted_matches = sorted(results['matches'], key=lambda x: x['score'], reverse=True)
print(sorted_matches)

## Hybrid Recommendation

In [None]:
# convert item_index to item_id
def get_item_id(rec_list):
    item_ids = []
    for item in rec_list:
        _id = df_rec_final.filter(col('item_index') == item['item_index']).select('item_id').collect()[0]['item_id']
        item_ids.append(_id)
    return item_ids

# weighted recommender
def hybrid_recommendation(user_id, query_vector, explore=0.3, k=10):
    explore_items = int(k*explore)
    similar_items = k - explore_items
    
    # collaborative filtering
    recs = model.recommendForAllUsers(explore_items).filter(col('user_index') == user_id).select("recommendations")
    recs = recs.rdd.flatMap(lambda x: x).collect()
    rec_list = list(map(lambda x: x.asDict(), recs[0]))
    item_ids = get_item_id(rec_list)
    
    # content-based filtering
    results = pc_client.query(
        namespace="ns1",
        vector=query_vector,
        top_k=similar_items,
        include_metadata=True
    )
    
    sorted_matches = sorted(results['matches'], key=lambda x: x['score'], reverse=True)
    content_ids = list(map(lambda x: x['id'], sorted_matches))
    final_rec = content_ids + item_ids
    return final_rec

hybrid_recommendation(1, user_vector)

## Contextual Bandit


In [None]:
import numpy as np

class contextual_bandits():
  def __init__(self, p):
    self.p = p
    # priors for the beta distribution
    self.prior_alpha = 1
    self.prior_beta = 1
    # positive observations (i.e. clicks)
    self.num_pos = 0
    # negative observations (impressions - clicks)
    self.num_neg = 0
    self.bandit_id = ""

  # thompson sampling
  def sample(self):
    return np.random.beta(self.prior_alpha, self.prior_beta)

  def compute_posterior(self):
    self.prior_alpha = self.prior_alpha + self.num_pos
    self.prior_beta = self.prior_beta  + self.num_neg

  def update_observations(self, clicks, impressions, curr_bandit_id):
    self.num_pos = clicks
    self.num_neg = impressions - clicks
    self.bandit_id = curr_bandit_id

In [None]:
def runSimulation(trials=10000):
    bandits = []
    
    bandit1 = contextual_bandits(.5)
    bandit1.update_observations(10, 100, '1')
    bandit1.compute_posterior()
    bandits.append(bandit1)
    
    bandit2 = contextual_bandits(.5)
    bandit2.update_observations(9, 90, '2')
    bandit2.compute_posterior()
    bandits.append(bandit2)
    
    bandit3 = contextual_bandits(.5)
    bandit3.update_observations(12, 120, '3')
    bandit3.compute_posterior()
    bandits.append(bandit3)
    
    counts = {} 
    for i in range(trials):
        # take a sample from each bandit
        best_bandit = None
        max_sample = -1
        all_samples = [] 
        for b in bandits:
          sample = b.sample()
          all_samples.append("%f" % sample)
          if sample > max_sample:
            max_sample = sample
            best_bandit = b.bandit_id
        counts[best_bandit] = counts.get(best_bandit, 0) + 1    
        if i % 500 == 0:
            print("current samples: %s" % all_samples)
            
    # normalize the counts to get the traffic percenatge 
    normalized_counts = {}
    for b in bandits:
      normalized_counts[b.bandit_id] = float(counts.get(b.bandit_id, 0)) / trials 
    print(normalized_counts)

runSimulation()