# Simple Example

This is a simple example to use the library to learn an embedding from the iris data set. This is a toy example, whose purpose is to demonstrate how to use the library.

Note that this requires Pyspark to work (which this notebook already has)

## Imports

In [1]:
from sklearn import cluster
from graph_embedder import Node2Vec
from graph_sampler import GraphSampler
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as ptypes
from pyspark.sql import DataFrame
from pyspark.sql import Window

## Setup Spark

In [2]:
# Spark session & context
spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

## Setup MovieLens (Small) data in Spark

The data from `ratings.csv` and `movies.csv` are from the MovieLens small sample data set that can be obtained from https://files.grouplens.org/datasets/movielens/ml-latest-small.zip

More information is at https://grouplens.org/datasets/movielens/

In [3]:
ratings = pd.read_csv('ratings.csv')
movies = pd.read_csv('movies.csv')

In [4]:
ratings_df = spark.createDataFrame(ratings)
movies_df = spark.createDataFrame(movies)

ratings_df = (
    ratings_df
    .withColumn('userIdExt', F.concat(F.lit('user_'), F.col('userId')))
    .filter(F.col('rating') >= 5)
)

print(f"Number of ratings = {ratings_df.count()}")

Number of ratings = 13211


In [5]:
ratings_df_sampled = ratings_df.sample(0.3).cache()
ratings_df_others = (
    ratings_df
    .join(
        ratings_df_sampled,
        on=['userId', 'movieId', 'rating'],
        how='left_anti'
    )
).cache()

print(f"Number of sampled ratings = {ratings_df_sampled.count()}")
print(f"Number of ratings for testing = {ratings_df_others.count()}")

Number of sampled ratings = 3977
Number of ratings for testing = 9234


## Create a graph from the data

In [6]:
# Generate the nodes
nodes = (
    ratings_df_sampled
    .select('userIdExt')
    .withColumnRenamed('userIdExt', 'node')
    .dropDuplicates()
    .union(
        ratings_df_sampled
        .select('movieId')
        .withColumnRenamed('movieId', 'node')
        .dropDuplicates()
    )
    .select('node')
).cache()

print(f"Number of nodes = {nodes.count()}")

Number of nodes = 2037


In [7]:
# Generate the edges
edges = (
    ratings_df_sampled
    .select('userIdExt', 'movieId', 'rating')
    .join(
        ratings_df
        .groupBy('userIdExt')
        .agg(F.sum('rating').alias('total_ratings')),
        on='userIdExt',
        how='inner'
    )
    .withColumn('weight', F.col('rating') / F.col('total_ratings'))
    .withColumnRenamed('userIdExt', 'start')
    .withColumnRenamed('movieId', 'end')
)
    
print(f"Number of edges = {edges.count()}")

Number of edges = 3977


## Learn an embedding

In [8]:
node2vec = Node2Vec(
    num_dim = 64,                 # Number of dimensions of the resultant feature vector
    num_samples_per_node = 5,     # Number of times to sample the neighbourhood for each node
    path_len = 11,                # Length of the random walks
    return_param = 1.0,           # The smaller the return parameter relative to in-out, the more it learns structures
    inout_param = 1.0             # The smaller the in-out parameter relative to return, the more it learns communities
)

In [9]:
# Fit the graph
node2vec.fit(nodes, edges, verbose=True)

Iteration #1 / 5
Sampling step 1 / 11
Sampling step 2 / 11
Sampling step 3 / 11
Sampling step 4 / 11
Sampling step 5 / 11
Sampling step 6 / 11
Sampling step 7 / 11
Sampling step 8 / 11
Sampling step 9 / 11
Sampling step 10 / 11
Sampling step 11 / 11
Iteration #2 / 5
Sampling step 1 / 11
Sampling step 2 / 11
Sampling step 3 / 11
Sampling step 4 / 11
Sampling step 5 / 11
Sampling step 6 / 11
Sampling step 7 / 11
Sampling step 8 / 11
Sampling step 9 / 11
Sampling step 10 / 11
Sampling step 11 / 11
Iteration #3 / 5
Sampling step 1 / 11
Sampling step 2 / 11
Sampling step 3 / 11
Sampling step 4 / 11
Sampling step 5 / 11
Sampling step 6 / 11
Sampling step 7 / 11
Sampling step 8 / 11
Sampling step 9 / 11
Sampling step 10 / 11
Sampling step 11 / 11
Iteration #4 / 5
Sampling step 1 / 11
Sampling step 2 / 11
Sampling step 3 / 11
Sampling step 4 / 11
Sampling step 5 / 11
Sampling step 6 / 11
Sampling step 7 / 11
Sampling step 8 / 11
Sampling step 9 / 11
Sampling step 10 / 11
Sampling step 11 / 11


<graph_embedder.Node2Vec at 0x7fc10b2536a0>

In [11]:
# Get the embeddings
embedding_df = node2vec.transform(nodes)
print(f"Number of embeddings = {embedding_df.count()}")

Number of embeddings = 2037


## Recommendation

Try recommending some movies

In [12]:
def get_user_ratings(user, ratings_df=ratings_df, movies_df=movies_df):
    df = (
        ratings_df
        .filter(F.col('userId') == user)
        .join(
            movies_df
            .select('movieId', 'title'),
            on='movieId',
            how='left'
        )
    ).toPandas()
    
    return df

In [13]:
def get_similar_movies(movieId, n=5, embedding_df=embedding_df, movies_df=movies_df):
    @F.udf(ptypes.FloatType())
    def similarity(x, y):
        import numpy as np
        x = np.array(x)
        y = np.array(y)
        z = np.linalg.norm(x)
        if z > 0:
            x /= z
        z = np.linalg.norm(y)
        if z > 0:
            y /= z
        z = x.dot(y)
        return float(z)
    
    df_movies = (
        embedding_df
        .filter(F.col('feature').isNotNull())
        .join(
            movies_df
            .withColumnRenamed('movieId', 'node'),
            on='node',
            how='left_semi'
        )
    )

    df_movies = (
        df_movies
        .withColumnRenamed('node', 'movieId')
        .withColumnRenamed('feature', 'movieFeature')
    )
    
    df_node = (
        embedding_df
        .filter(F.col('node') == str(movieId))
        .withColumnRenamed('node', 'movieId_search')
        .withColumnRenamed('feature', 'searchFeature')
    ).cache()
    
    df = (
        df_node
        .crossJoin(df_movies)
        .withColumn('score', similarity('searchFeature', 'movieFeature'))
        .withColumn('rank', F.row_number().over(Window.orderBy(F.col('score').desc())))
        .drop('searchFeature')
        .drop('movieFeature')
        .join(
            movies_df
            .select('movieId', 'title'),
            on='movieId',
            how='left'
        )
    )
    
    if n:
        df = (
            df
            .filter(F.col('rank') <= n)
        )

    return df.toPandas().sort_values('score', ascending=False)

In [14]:
def get_user_recommendations(user, n=5, ratings_df=ratings_df_sampled, embedding_df=embedding_df, movies_df=movies_df):
    @F.udf(ptypes.FloatType())
    def similarity(x, y):
        import numpy as np
        x = np.array(x)
        y = np.array(y)
        z = np.linalg.norm(x)
        if z > 0:
            x /= z
        z = np.linalg.norm(y)
        if z > 0:
            y /= z
        z = x.dot(y)
        return float(z)
    
    df_movies = (
        embedding_df
        .filter(F.col('feature').isNotNull())
        .join(
            movies_df
            .withColumnRenamed('movieId', 'node'),
            on='node',
            how='left_semi'
        )
    )
    
    if ratings_df:
        df_movies = (
            df_movies
            .join(
                ratings_df
                .filter(F.col('userId') == user)
                .withColumnRenamed('movieId', 'node'),
                on='node',
                how='left_anti'
            )
        ).cache()
        
    df_movies = (
        df_movies
        .withColumnRenamed('node', 'movieId')
        .withColumnRenamed('feature', 'movieFeature')    
    )
    
    df_user = (
        embedding_df
        .filter(F.col('node') == 'user_' + str(user))
        .withColumnRenamed('node', 'userIdExt')
        .withColumnRenamed('feature', 'userFeature')
    ).cache()
    
    df = (
        df_user
        .crossJoin(df_movies)
        .withColumn('score', similarity('userFeature', 'movieFeature'))
        .withColumn('rank', F.row_number().over(Window.partitionBy('userIdExt').orderBy(F.col('score').desc())))
        .drop('userFeature')
        .drop('movieFeature')
        .join(
            movies_df
            .select('movieId', 'title'),
            on='movieId',
            how='left'
        )
    )
    
    if n:
        df = (
            df
            .filter(F.col('rank') <= n)
        )

    return df.toPandas().sort_values('score', ascending=False)

First user

In [15]:
# What the user gave ratings (in training data)
display(get_user_ratings(25, ratings_df=ratings_df_sampled))

Unnamed: 0,movieId,userId,rating,timestamp,userIdExt,title
0,91529,25,5.0,1535470498,user_25,"Dark Knight Rises, The (2012)"
1,260,25,5.0,1535470429,user_25,Star Wars: Episode IV - A New Hope (1977)
2,122912,25,5.0,1535470461,user_25,Avengers: Infinity War - Part I (2018)
3,4993,25,5.0,1535470421,user_25,"Lord of the Rings: The Fellowship of the Ring,..."
4,79132,25,5.0,1535470428,user_25,Inception (2010)
5,116797,25,5.0,1535470507,user_25,The Imitation Game (2014)
6,187593,25,5.0,1535470534,user_25,Deadpool 2 (2018)


In [16]:
# What the user gave a ratings (not in training data)
display(get_user_ratings(25, ratings_df=ratings_df_others))

Unnamed: 0,movieId,userId,rating,timestamp,userIdExt,title
0,60069,25,5.0,1535470523,user_25,WALL·E (2008)
1,68157,25,5.0,1535470515,user_25,Inglourious Basterds (2009)
2,7153,25,5.0,1535470418,user_25,"Lord of the Rings: The Return of the King, The..."
3,68954,25,5.0,1535470493,user_25,Up (2009)
4,180095,25,5.0,1535470476,user_25,Wonder (2017)
5,2571,25,5.0,1535470427,user_25,"Matrix, The (1999)"
6,177593,25,5.0,1535470532,user_25,"Three Billboards Outside Ebbing, Missouri (2017)"
7,1198,25,5.0,1535470495,user_25,Raiders of the Lost Ark (Indiana Jones and the...
8,3578,25,5.0,1535470497,user_25,Gladiator (2000)
9,2028,25,5.0,1535470505,user_25,Saving Private Ryan (1998)


In [17]:
# Recommended movies
display(
    get_user_recommendations(25, n=20, ratings_df=ratings_df_sampled, embedding_df=embedding_df)
)

Unnamed: 0,movieId,userIdExt,score,rank,title
14,59784,user_25,0.692618,1,Kung Fu Panda (2008)
17,6650,user_25,0.691941,2,Kind Hearts and Coronets (1949)
12,62336,user_25,0.679376,3,FLCL (2000)
9,7143,user_25,0.673626,4,"Last Samurai, The (2003)"
11,5971,user_25,0.666631,5,My Neighbor Totoro (Tonari no Totoro) (1988)
0,222,user_25,0.650476,6,Circle of Friends (1995)
3,57504,user_25,0.644588,7,"Girl Who Leapt Through Time, The (Toki o kaker..."
6,103772,user_25,0.62474,8,"Wolverine, The (2013)"
8,2924,user_25,0.624474,9,Drunken Master (Jui kuen) (1978)
4,31696,user_25,0.592547,10,Constantine (2005)


User 25 is recommended the movie `The Dark Knight (2008)`, which also appears in the testing set. He is also recommended `X-Men` and `Matrix` movies which are similar to what he seems to like

Find similar movies

In [18]:
# Find the movie id
movies = (
    movies_df
    .filter(F.col('title').contains('Love Actually'))
).collect()

movieTitle = movies[0]['title']
movieId = movies[0]['movieId']

print(f"ID: {movieId} Title: {movieTitle}")

display(
    get_similar_movies(movieId, n=5, embedding_df=embedding_df)
)

ID: 6942 Title: Love Actually (2003)


Unnamed: 0,movieId,movieId_search,score,rank,title
0,6942,6942,1.0,1,Love Actually (2003)
3,66371,6942,0.898201,2,Departures (Okuribito) (2008)
4,5646,6942,0.876703,3,Valmont (1989)
2,31658,6942,0.869262,4,Howl's Moving Castle (Hauru no ugoku shiro) (2...
1,62336,6942,0.86273,5,FLCL (2000)


These movies are about love and relationships