<i>Copyright (c) Microsoft Corporation. All rights reserved.</i>

<i>Licensed under the MIT License.</i>

# Train SAR Recommendation Model on MovieLens
## Using Azure Machine Learning service (Python, CPU)

In [140]:
import os
import shutil
import sys
from tempfile import TemporaryDirectory

from ipywidgets import interact
import json
import pandas as pd
import requests

import azureml
from azureml.core import Experiment, Run, Workspace
from azureml.core.compute import AksCompute, AmlCompute, ComputeTarget
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.image import Image
from azureml.core.image.container import ContainerImage
from azureml.core.model import Model
from azureml.core.webservice import AksWebservice, LocalWebservice, Webservice
from azureml.train.estimator import Estimator
from azureml.widgets import RunDetails
print("azureml.core version: {}".format(azureml.core.VERSION))

sys.path.append('../..')
import reco_utils
from reco_utils.dataset.movielens import load_pandas_df
print("reco_utils version: {}".format(reco_utils.VERSION))

azureml.core version: 1.0.39
reco_utils version: 2019.05


In [3]:
ws = Workspace.from_config(path='~/Downloads/aml_config')

If you run your code in unattended mode, i.e., where you can't give a user input, then we recommend to use ServicePrincipalAuthentication or MsiAuthentication.
Please refer to aka.ms/aml-notebook-auth for different authentication mechanisms in azureml-sdk.


In [134]:
# General variables
COL_USER = 'UserID'
COL_ITEM = 'ItemID'
COL_RATING = 'Rating'
COL_TIMESTAMP = 'Timestamp'
COL_TITLE = 'Title'
COL_GENRE = 'Genre'
COL_YEAR = 'Year'

HEADER = (COL_USER, COL_ITEM, COL_RATING, COL_TIMESTAMP)

TOP_K = 10
DATA_SIZE = '100k'

# AML Experiment config
EXPERIMENT_NAME = 'movielens-sar'
PIP_PACKAGES = ['azureml-sdk', 'pandas', 'sklearn', 'tqdm']

# AML Compute config
CLUSTER_NAME = 'recocluster'
VM_SIZE = 'STANDARD_D2_V2'
MIN_NODES = 0
MAX_NODES = 1

# AML Image config
IMAGE_NAME = 'sar'

# AML Model config
MODEL_NAME = 'movielens_sar.model'
MODEL_PATH = 'outputs/{}'.format(MODEL_NAME)

# AKS config
AKS_NAME = 'akscompute'
AKS_SERVICE = 'aksreco'

CURRENT_DIR = os.path.abspath('.')

In [105]:
TEMP_DIR = TemporaryDirectory()
def make_temp(name):
    return os.path.join(TEMP_DIR.name, name)

# copy reco_utils dependency to temp dir
shutil.copytree(os.path.join('..', '..', 'reco_utils'), make_temp('reco_utils'))

# it's necessary to move to this directory for the image to be built properly
os.chdir(TEMP_DIR.name)

TRAIN_SCRIPT = make_temp('train.py')
ENTRY_SCRIPT = make_temp('entry.py')
CONDA_FILE = make_temp('conda.yml')

In [150]:
try:
    compute_target = ComputeTarget(workspace=ws, name=CLUSTER_NAME)
    print("Found compute target")
except:
    print("Creating compute target")
    # Specify the configuration for the new cluster
    compute_config = AmlCompute.provisioning_configuration(
        vm_size=VM_SIZE,
        min_nodes=MIN_NODES,
        max_nodes=MAX_NODES
    )
    
    # Create the cluster with the specified name and configuration
    compute_target = ComputeTarget.create(ws, CLUSTER_NAME, compute_config)
    
    # Wait for the cluster to complete, show the output log
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

Found compute target


In [None]:
train_file = """

import logging
from time import time

from azureml.core import Run
from sklearn.externals import joblib

from reco_utils.dataset import movielens
from reco_utils.dataset.python_splitters import python_stratified_split
from reco_utils.evaluation.python_evaluation import map_at_k, ndcg_at_k, precision_at_k, recall_at_k
from reco_utils.recommender.sar import SAR


# get hold of the current run
run = Run.get_context()
run.log('data-size', '{DATA_SIZE}')

header = dict(col_user='{COL_USER}', 
              col_item='{COL_ITEM}', 
              col_rating='{COL_RATING}', 
              col_timestamp='{COL_TIMESTAMP}')

data = movielens.load_pandas_df(
    size='{DATA_SIZE}',
    header=['{COL_USER}', '{COL_ITEM}', '{COL_RATING}', '{COL_TIMESTAMP}'],
    title_col='Title'
)

train, test = python_stratified_split(data, col_user='{COL_USER}', col_item='{COL_ITEM}')
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)-8s %(message)s')

model = SAR(**header)

# train the SAR model
start_time = time()

model.fit(train)

train_time = time() - start_time
run.log('Training time', train_time)

start_time = time()

predict = model.recommend_k_items(test, remove_seen=True)

# evaluate
kwargs = dict(col_user='{COL_USER}', 
              col_item='{COL_ITEM}', 
              col_rating='{COL_RATING}', 
              col_prediction='prediction', 
              k={TOP_K})

eval_map = map_at_k(test, predict, **kwargs)
eval_ndcg = ndcg_at_k(test, predict, **kwargs)
eval_precision = precision_at_k(test, predict, **kwargs)
eval_recall = recall_at_k(test, predict, **kwargs)

test_time = time() - start_time
run.log('Prediction time', test_time)

run.log('map', eval_map)
run.log('ndcg', eval_ndcg)
run.log('precision', eval_precision)
run.log('recall', eval_recall)

# Save the model
joblib.dump(value=model, filename='{MODEL_PATH}')

""".format(DATA_SIZE=DATA_SIZE,
           COL_USER=COL_USER,
           COL_ITEM=COL_ITEM,
           COL_RATING=COL_RATING,
           COL_TIMESTAMP=COL_TIMESTAMP,
           TOP_K=TOP_K,
           MODEL_PATH=MODEL_PATH)

with open(TRAIN_FILE, 'w') as f:
    f.writelines(train_file)

In [None]:
est = Estimator(source_directory=TEMP_DIR.name,
                compute_target=compute_target,
                entry_script=os.path.basename(TRAIN_FILE),
                pip_packages=PIP_PACKAGES)

# create experiment
exp = Experiment(workspace=ws, name=EXPERIMENT_NAME)
run = exp.submit(config=est)

In [None]:
run

In [None]:
RunDetails(run).show()

In [None]:
# Get metrics
metrics = run.get_metrics()
print(metrics)

In [None]:
# Register the model
model = run.register_model(model_name=MODEL_NAME, model_path=MODEL_PATH)
print(model.name, model.id, model.version, sep = '\t')

In [152]:
compute_target.delete()

# Deploy SAR Recommendation Webservice
## Using Azure Machine Learning service (Local, AKS)

In [106]:
entry_file = """

import json
import pandas as pd
from sklearn.externals import joblib
from azureml.core.model import Model

TOP_K = {TOP_K}


def init():
    global model
    model_path = Model.get_model_path('{MODEL_NAME}')
    model = joblib.load(model_path)

    
def run(data):
    try:
        df = pd.read_json(data)
        result = model.get_item_based_topk(items=df, top_k={TOP_K}, sort_top_k=True)
        return result.to_dict()
    except Exception as e:
        return str(e)
        
""".format(TOP_K=TOP_K, MODEL_NAME=MODEL_NAME, COL_USER=COL_USER)

with open(ENTRY_SCRIPT, 'w') as f:
    f.writelines(entry_file)
    
with open(CONDA_FILE, "w") as f:
    f.write(CondaDependencies.create(pip_packages=PIP_PACKAGES).serialize_to_string())

In [None]:
model = Model(workspace=ws, name=MODEL_NAME)

In [135]:
image_config = ContainerImage.image_configuration(runtime="python",
                                                  execution_script=os.path.basename(ENTRY_SCRIPT),
                                                  conda_file=os.path.basename(CONDA_FILE),
                                                  dependencies=['reco_utils'])

try:
    image = Image(workspace=ws, name=IMAGE_NAME)
    print("Found Image")
except:
    print("Creating Container Image")

    # create the image
    image = Image.create(workspace=ws, 
                         name=IMAGE_NAME, 
                         models=[model], 
                         image_config=image_config)

    # wait for image creation to finish
    image.wait_for_creation(show_output=True)

Creating Container Image
Creating image
Running.............................
Succeeded
Image creation operation finished for image sar:1, operation "Succeeded"


In [101]:
try:
    aks_target = ComputeTarget(workspace=ws, name=AKS_NAME)
    print("Found AKS compute target")
except:
    print("Creating AKS compute target")

    # Use the default configuration for now
    prov_config = AksCompute.provisioning_configuration()

    # Create the cluster
    aks_target = ComputeTarget.create(workspace=ws,
                                      name=AKS_NAME,
                                      provisioning_configuration=prov_config)

    # Wait for the create process to complete
    aks_target.wait_for_completion(show_output=True)

Found AKS compute target


In [146]:
local = False

if local:
    # Test locally
    deployment_config = LocalWebservice.deploy_configuration(port=8889)
    service = Webservice.deploy_local_from_model(workspace=ws,
                                                 name='localservice',
                                                 models=[model],
                                                 image_config=image_config,
                                                 deployment_config=deployment_config)
else:
    # Deploy to AKS
    try:
        service = AksWebservice(workspace=ws, name=AKS_SERVICE)
        print('Found AKS service')
    except:
        print('Creating AKS service')
        deployment_config = AksWebservice.deploy_configuration()
        service = Webservice.deploy_from_image(workspace=ws,
                                               name=AKS_SERVICE,
                                               image=image,
                                               deployment_config=deployment_config,
                                               deployment_target=aks_target)

service.wait_for_deployment(show_output = True)
print(service.state)

Creating AKS service
Creating service
Running......
SucceededAKS service creation operation finished, operation "Succeeded"
Healthy


# Test SAR Recommendations

In [12]:
df = load_pandas_df(size=DATA_SIZE, header=HEADER, title_col=COL_TITLE, genres_col=COL_GENRE, year_col=COL_YEAR)
df = df[[COL_ITEM, COL_TITLE, COL_GENRE, COL_YEAR]].dropna().drop_duplicates()

4.93MB [00:02, 2.42MB/s]                                                                                               


In [15]:
genres = set()
for genre_list in df[COL_GENRE].unique():
    for genre in genre_list.split('|'):
        genres.add(genre)
genres = ['All'] + sorted(genres)

years = ['All'] + sorted(df[COL_YEAR].unique(), reverse=True)

def view(title, genre, year):
    tmp_df = df[df[COL_TITLE].str.contains(title)].set_index(COL_ITEM)
    if genre != 'All':
        tmp_df = tmp_df[tmp_df[COL_GENRE].str.contains(genre)]
    if year != 'All':
        tmp_df = tmp_df[tmp_df[COL_YEAR] == year]
    return tmp_df.sort_values(COL_TITLE)

interact(lambda title, genre, year: view(title=title, genre=genre, year=year), title='', genre=genres, year=years);    

interactive(children=(Text(value='', description='title'), Dropdown(description='genre', options=('All', 'Acti…

In [68]:
items = {COL_ITEM: [1021]}

In [147]:
if service.compute_type == 'AKS':
    url = service.scoring_uri

    # Setup authentication using one of the keys from service
    headers = dict(Authorization='Bearer {}'.format(service.get_keys()[0]))
else:
    url = 'http://localhost:8889/score'
    headers = None

print('Service URI: {}'.format(url))

Service URI: http://138.91.113.50:80/api/v1/service/aksreco/score


In [162]:
# Send a request to the service
response = requests.post(url=url, json=items, headers=headers)
if response.status_code != 200:
    print(response.content, response.status_code)
else:
    result = pd.DataFrame(response.json()).join(df.set_index(COL_ITEM), on=COL_ITEM)
result

Unnamed: 0,UserID,ItemID,prediction,Title,Genre,Year
0,0,606,0.21875,All About Eve (1950),Drama,1950
1,0,811,0.194444,Thirty-Two Short Films About Glenn Gould (1993),Documentary,1993
2,0,519,0.179487,"Treasure of the Sierra Madre, The (1948)",Adventure,1948
3,0,604,0.178082,It Happened One Night (1934),Comedy,1934
4,0,212,0.176471,"Unbearable Lightness of Being, The (1988)",Drama,1988
5,0,489,0.172414,Notorious (1946),Film-Noir|Romance|Thriller,1946
6,0,745,0.166667,"Ruling Class, The (1972)",Comedy,1972
7,0,170,0.165138,Cinema Paradiso (1988),Comedy|Drama|Romance,1988
8,0,524,0.163636,"Great Dictator, The (1940)",Comedy,1940
9,0,490,0.163636,To Catch a Thief (1955),Comedy|Romance|Thriller,1955


# Cleanup

In [97]:
# clean up temporary directory
os.chdir(CURRENT_DIR)
TEMP_DIR.cleanup()