In [11]:
# --- 0
import boto3
import pandas as pd

import numpy as np
import numpy.ma as ma

import tensorflow.compat.v2 as tf
import tensorflow_probability as tfp

tf.enable_v2_behavior()

In [23]:
# --- 1
params = {
    "region" : "us-west-1",
    "database" : "events",
    "bucket" : "clash-prod-datalake",
    "result-path" : "query-results",
    # query returns progress on the first users interaction with a video
    "query" : "SELECT v1.video_SLASH_id, v1.user_SLASH_id, v1.video_SLASH_progress FROM view_video v1 JOIN (SELECT MIN(event_SLASH_timestamp) as event_SLASH_timestamp, user_SLASH_id, video_SLASH_id FROM view_video GROUP BY user_SLASH_id, video_SLASH_id) v2 ON v1.event_SLASH_timestamp = v2.event_SLASH_timestamp WHERE dt = '2020-11-03' ORDER BY video_SLASH_id, user_SLASH_id;"
}

session = boto3.Session()

def athena_query(client, params):
    response = client.start_query_execution(
        QueryString = params["query"],
        QueryExecutionContext = {
            'Database': params['database']
        },
        ResultConfiguration = {
            'OutputLocation': 's3://' + params['bucket'] + '/' + params['result-path']
        }
    )
    return response

client = session.client('athena', region_name = params["region"])
execution = athena_query(client, params)
execution_id = execution['QueryExecutionId']
execution_id

'4d07edcf-4b4b-4190-8cea-38daece8d4aa'

In [25]:
# --- 2
client.get_query_execution(QueryExecutionId = execution_id)

{'QueryExecution': {'QueryExecutionId': '4d07edcf-4b4b-4190-8cea-38daece8d4aa',
  'Query': "SELECT v1.video_SLASH_id, v1.user_SLASH_id, v1.video_SLASH_progress FROM view_video v1 JOIN (SELECT MIN(event_SLASH_timestamp) as event_SLASH_timestamp, user_SLASH_id, video_SLASH_id FROM view_video GROUP BY user_SLASH_id, video_SLASH_id) v2 ON v1.event_SLASH_timestamp = v2.event_SLASH_timestamp WHERE dt = '2020-11-03' ORDER BY video_SLASH_id, user_SLASH_id",
  'StatementType': 'DML',
  'ResultConfiguration': {'OutputLocation': 's3://clash-prod-datalake/query-results/4d07edcf-4b4b-4190-8cea-38daece8d4aa.csv'},
  'QueryExecutionContext': {'Database': 'events'},
  'Status': {'State': 'SUCCEEDED',
   'SubmissionDateTime': datetime.datetime(2020, 11, 4, 14, 34, 54, 758000, tzinfo=tzlocal()),
   'CompletionDateTime': datetime.datetime(2020, 11, 4, 14, 34, 59, 552000, tzinfo=tzlocal())},
  'Statistics': {'EngineExecutionTimeInMillis': 4620,
   'DataScannedInBytes': 12378049,
   'TotalExecutionTimeInMi

In [51]:
# --- 3
response = client.get_query_execution(QueryExecutionId = execution_id)
s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation']
data = pd.read_csv(s3_path)
data.head (10)

Unnamed: 0,video_slash_id,user_slash_id,video_slash_progress
0,00201d96-fa3d-11ea-8000-02a4a06c46e9,db8f8da9-0a5d-11eb-8000-02a4a06c46e9,2
1,005c6ecc-f821-11ea-8000-02a4a06c46e9,75232c0e-f81d-11ea-8000-02a4a06c46e9,13
2,007fede6-0e47-11eb-8000-02a4a06c46e9,6ce366d5-1de2-11eb-9726-028b985194d3,7
3,0082f9bc-0d23-11eb-8000-02a4a06c46e9,2540eb28-1e31-11eb-9726-028b985194d3,2
4,00a95534-d84c-11ea-9add-02a4a06c46e9,0d559245-d315-11ea-8e78-0270e3994d0f,2
5,00b72d2a-1da8-11eb-9726-028b985194d3,3f2e381a-0dad-11eb-8000-02a4a06c46e9,5
6,00b72d2a-1da8-11eb-9726-028b985194d3,6604ab3c-13c2-11eb-9726-028b985194d3,7
7,00b72d2a-1da8-11eb-9726-028b985194d3,74f0c570-c326-11ea-aea4-02e044c0bbfb,4
8,00b72d2a-1da8-11eb-9726-028b985194d3,9324c5a0-1dfc-11eb-9726-028b985194d3,17
9,00d4db1a-d465-11ea-9add-02a4a06c46e9,155aceeb-13f9-11eb-9726-028b985194d3,5


In [40]:
# --- 3
nan = float('NaN')
users = ["Lisa", "Gene", "Michael", "Claudia", "Mick", "Jack", "Toby"]
videos = ["Lady", "Snakes", "Just", "Superman", "You", "TNL"]

data = pd.DataFrame (np.array([
    # Lady Snakes Just Superman You TNL
    [ 2.5, 3.5, 3.0, 3.5, 2.5, 3.0 ], # Lisa
    [ 3.0, 3.5, 1.5, 5.0, 3.5, 3.0 ], # Gene
    [ 2.5, 3.0, nan, 3.5, nan, 4.0 ], # Michael
    [ nan, 3.5, 3.0, 4.0, 2.5, 4.5 ], # Claudia
    [ 3.0, 4.0, 2.0, 3.0, 2.0, 3.0 ], # Mick
    [ 3.0, 4.0, nan, 5.0, 3.5, 3.0 ], # Jack
    [ nan, 4.5, nan, 4.0, 1.0, nan ]  # Toby
]),
                     index = users,
                     columns = videos)

data = pd.melt (data, value_vars=videos, var_name='video_slash_id', value_name='video_slash_progress')
data['user_slash_id'] = np.repeat([users], len (videos), axis=0).flatten ()
data = data.dropna()
data

Unnamed: 0,video_slash_id,video_slash_progress,user_slash_id
0,Lady,2.5,Lisa
1,Lady,3.0,Gene
2,Lady,2.5,Michael
4,Lady,3.0,Mick
5,Lady,3.0,Jack
7,Snakes,3.5,Lisa
8,Snakes,3.5,Gene
9,Snakes,3.0,Michael
10,Snakes,3.5,Claudia
11,Snakes,4.0,Mick


In [52]:
# --- 4
s = pd.pivot_table(data, values = 'video_slash_progress', index = ['user_slash_id'], columns = ['video_slash_id'])
videos = s.columns
users = s.index
# print (s)

s = np.array (s)
print (s.shape)
s

(2237, 3362)


array([[nan, nan, nan, ..., nan, nan, nan],
       [nan, nan, nan, ..., nan, nan, nan],
       [nan, nan, nan, ..., nan, nan, nan],
       ...,
       [nan, nan, nan, ..., nan, nan, nan],
       [nan, nan, nan, ..., nan, nan, nan],
       [nan, nan, nan, ..., nan, nan, nan]])

In [85]:
# --- 5
# TODO : tf.SparseTensor
scores = tf.constant(s, dtype=tf.float32)

# scores = tf.constant([
#     # Lady Snakes Just Superman You TNL
#     [ 2.5, 3.5, 3.0, 3.5, 2.5, 3.0 ], # Lisa
#     [ 3.0, 3.5, 1.5, 5.0, 3.5, 3.0 ], # Gene
#     [ 2.5, 3.0, nan, 3.5, nan, 4.0 ], # Michael
#     [ nan, 3.5, 3.0, 4.0, 2.5, 4.5 ], # Claudia
#     [ 3.0, 4.0, 2.0, 3.0, 2.0, 3.0 ], # Mick
#     [ 3.0, 4.0, nan, 5.0, 3.5, 3.0 ], # Jack
#     [ nan, 4.5, nan, 4.0, 1.0, nan ]  # Toby
# ])

scores

<tf.Tensor: shape=(7, 6), dtype=float32, numpy=
array([[2.5, 3.5, 3. , 3.5, 2.5, 3. ],
       [3. , 3.5, 1.5, 5. , 3.5, 3. ],
       [2.5, 3. , nan, 3.5, nan, 4. ],
       [nan, 3.5, 3. , 4. , 2.5, 4.5],
       [3. , 4. , 2. , 3. , 2. , 3. ],
       [3. , 4. , nan, 5. , 3.5, 3. ],
       [nan, 4.5, nan, 4. , 1. , nan]], dtype=float32)>

In [87]:
@tf.function (input_signature=(tf.TensorSpec(shape=(None,), dtype=tf.float32),
                               tf.TensorSpec(shape=(None,), dtype=tf.float32)))  
def cor_na_omit (x, y):
    ind1 = tf.math.logical_not (tf.math.is_nan(x))
    ind2 = tf.math.logical_not (tf.math.is_nan(y))
    indices = tf.reduce_all(tf.stack ([ind1, ind2]), 0)
    return tfp.stats.correlation (x [indices], y [indices], sample_axis = 0, event_axis = None)

class Recommender(tf.Module):
    
  def __init__(self):
    super(Recommender, self).__init__()

  @tf.function (input_signature=(tf.TensorSpec(shape=[None,None], dtype=tf.float32),
                                 tf.TensorSpec(shape=(), dtype=tf.int32)))  
  def __call__(self, scores, user_index):
    tf.print('Executing with tensorflow v'+ tf.__version__)    
    user_row = tf.reshape(tf.gather(scores, user_index), [-1])
    c_hat = tf.map_fn(fn=lambda row: cor_na_omit (user_row, row), elems = scores)       
    # keep only positively correlated users
    users_indices = tf.where(c_hat > 0)[:,-1]
    # remove user row
    not_user_index = tf.where (users_indices != tf.cast(user_index, dtype=tf.int64))[:,-1]    
    users_indices = tf.gather (users_indices, not_user_index, axis=0)    
    # only videos user has not seen
    videos_indices = tf.where(tf.math.is_nan(user_row))[:,-1]
    c_hat = tf.gather (c_hat, users_indices, axis=0)    
    s = tf.gather (tf.gather (scores, users_indices, axis=0), videos_indices, axis=1)
    # fill missing values with 0's
    s = tf.where(tf.math.is_nan(s), tf.zeros_like(s), s)
    c_hat = tf.reshape (c_hat, [1, tf.shape (c_hat) [0]])
    r = tf.matmul (c_hat, s)
    r = r[-1]
    # remove 0 and NAN scored recomendations
    non_zero_indices = tf.where(r > 0)[:,-1] 
    r = tf.gather (r, non_zero_indices, axis=0)    
    total = tf.math.reduce_sum (c_hat)
    r = r/total
    order = tf.argsort(r, direction='DESCENDING', axis=0)
    videos_indices = tf.cast(videos_indices, dtype=tf.float32)
    recommendations = tf.stack([tf.gather (videos_indices, order),
                                tf.gather (r, order)],
                               axis=1)    
    return recommendations

# --- run model
model = Recommender ()
print (model (scores, tf.constant(6)))

# --- save model
# saved_model_cli show --dir ./saved_model --all
PATH = "./saved_model"
tf.saved_model.save(model, PATH)

Executing with tensorflow v2.3.0




tf.Tensor(
[[5.        3.3477895]
 [0.        2.1757958]
 [2.        2.095589 ]], shape=(3, 2), dtype=float32)
INFO:tensorflow:Assets written to: ./saved_model/assets


In [84]:
# --- load model
loaded = tf.keras.models.load_model(PATH)
loaded (s, tf.constant(100))

Executing with tensorflow v2.3.0




<tf.Tensor: shape=(554, 2), dtype=float32, numpy=
array([[4.4500000e+02, 6.1678324e+00],
       [3.6000000e+02, 5.8321676e+00],
       [9.5000000e+01, 5.1328673e+00],
       ...,
       [8.1000000e+01, 1.3986013e-02],
       [9.1000000e+01, 1.3986013e-02],
       [3.5300000e+02, 1.3986013e-02]], dtype=float32)>

In [159]:
print (videos [4])
print (videos [1])
print (videos [0])

TNL
Lady
Just
