In [731]:
#load data 

import sys
import pyspark
from pyspark.sql import Row
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("la3")\
        .getOrCreate()
sc=pyspark.SparkContext.getOrCreate()

In [732]:
import pandas as pd


credits = pd.read_csv('./imdb/credits.csv',low_memory=False)
keywords = pd.read_csv('./imdb/keywords.csv',low_memory=False)
links= pd.read_csv('./imdb/links.csv',low_memory=False)
metadata_raw= pd.read_csv('./imdb/movies_metadata.csv',low_memory=False)

ratings  = pd.read_csv('./imdb/ratings.csv',low_memory=False)

In [733]:
#refine data
metadata_raw=metadata_raw[metadata_raw['id'].apply(lambda x: str(x).isdigit())]
metadata_raw.id=metadata_raw.id.astype(int)
metadata=metadata_raw

In [734]:

metadata=metadata.merge(credits,left_on='id',right_on='id',how='inner')\
        .merge(keywords,left_on='id',right_on='id',how='inner')


metadata=metadata.merge(links,left_on='id',right_on='tmdbId',how='left')


In [736]:
user_ids=ratings.drop_duplicates('userId')['userId']

In [783]:

target_user=int(user_ids.sample(1))

# take the rating info
user_ratings=ratings.loc[ratings['userId']==target_user]

train=user_ratings.sample(int(len(user_ratings)*0.8))
test=user_ratings[~user_ratings.movieId.isin(train.movieId)]
train_items=metadata.merge(train,left_on='movieId',right_on='movieId',how='right')
test_items=metadata.merge(test,left_on='movieId',right_on='movieId',how='right')

len(test_items)==len(test) and len(train_items)==len(train)


True

In [809]:
#user profile
#extract features
import json,re, itertools
import ast
import numpy as np
from pandas.io.json import json_normalize
def ext_feature(column,key):
    try:
        t=column[column!='[]'].apply(lambda x:json_normalize(ast.literal_eval(x))[key])
        x=pd.unique(t.values.ravel('K'))
        
        return list(x[~pd.isnull(x)])
    except:
        print(column)
    return list()

#1-json columns
json_columns = {'keywords':'id'
                ,'crew':'id'
                ,'cast':'id'
                ,'genres':'id'
                , 'production_countries':'iso_3166_1'
                #'production_companies':'id',
                ,'spoken_languages':'iso_639_1'
               }
u_features={}
for column in json_columns.keys():
    u_features[column]=ext_feature(train_items[column],json_columns[column])

#year------metadata=metadata.withColumn("year", year(col('release_date')))
#budget
#actors
#director
#...

In [848]:
#USER PROFILE
avg=train_items.rating.mean()
def setMovieVector(item):#based on user's ratings
    vector={}
    i=0
    for column in json_columns.keys():
        f_vector=dict((uf,0.0) for uf in u_features[column])
        i_feature=item.loc[column]
        try:
            features=json_normalize(ast.literal_eval(i_feature))[json_columns[column]]
        except:
            print(item)
            features=[]
            
        for u_f in u_features[column]:
            if u_f in list(features):
                    f_vector[u_f]=item['rating']#-avg
                    i+=1
        vector[column]=f_vector
    if i==0:
        i=1
    vector['total_attributes']=i
    return vector

#score user based on train set
train_vectores=train_items.apply(setMovieVector, axis=1)


import math
def sumVectors(vectors):
    sumScore={}
    if len(score)>0:        
        sumScores=dict((g,{}) for g in vectors[0])
        for column in json_columns.keys():
            sumScores[column]=dict((g,0.0) for g in u_features[column])
            for f in u_features[column]:
                total=0
                for row in vectors:                    
                    sumScores[column][f]+=row[column][f]/math.sqrt(row['total_attributes'])
                    if row[column][f]!= 0:
                        total+=1
                if total>0:
                    sumScores[column][f]/=total
    sumScores['rating']=avg
    sumScores['confidence']=100
    return sumScores

user_vector=sumVectors(train_vectores)


adult                                                                False
belongs_to_collection    {'id': 151, 'name': 'Star Trek: The Original S...
budget                                                            18000000
genres                   [{'id': 878, 'name': 'Science Fiction'}, {'id'...
homepage                                                               NaN
id                                                                     157
imdb_id                                                          tt0088170
original_language                                                       en
original_title                         Star Trek III: The Search for Spock
overview                 Admiral Kirk and his bridge crew risk their ca...
popularity                                                        6.197298
poster_path                               /b9ZaPiD6AaZR7CgQP5P4Kg893QL.jpg
production_companies             [{'name': 'Paramount Pictures', 'id': 4}]
production_countries     

In [849]:
len(train_vectores)

14

In [850]:
#SIMILARITY SPACE
def getMovieVector(item):#based on user's profile
    vector={}
    i=0.0#item's features
    t=0.0#total user's features
    for column in json_columns.keys():
        f_vector=dict((uf,0.0) for uf in u_features[column])
        i_feature=item.loc[column]
        try:
            features=json_normalize(ast.literal_eval(i_feature))[json_columns[column]]
        except:
            print(i_feature)
            print("setMovieVector\r\n")
            features=[]
        t+=len(u_features[column])
        for u_f in u_features[column]:
            if u_f in list(features):
                f_vector[u_f]=user_vector[column][u_f]
                i+=1
        vector[column]=f_vector
    vector['rating']=item['rating']
    vector['confidence']=i/t*100
    return vector
testMovieVectors=test_items.apply(getMovieVector,axis=1)


In [846]:
testMovieVectors[3]['confidence']

0.955794504181601

In [851]:
def getRate(item):
    rate=0
    for column in u_features.keys():        
        f_vector=dict((uf,0.0) for uf in item[column])
        for f in f_vector:
            rate+=item[column][f]*item[column][f]
    return [round(item['rating'],2),round(rate,2),item['confidence']]
testMovieVectors.apply(getRate) + getRate(user_vector)

0    [3.0, 2.09, 0.8363201911589008, 4.07, 226.8, 100]
1    [5.0, 1.51, 0.5973715651135006, 4.07, 226.8, 100]
2     [5.0, 2.79, 0.955794504181601, 4.07, 226.8, 100]
3     [2.5, 2.46, 0.955794504181601, 4.07, 226.8, 100]
dtype: object

In [852]:
train_items.apply(getMovieVector,axis=1).apply(getRate)


[]
setMovieVector



0      [3.5, 12.42, 5.256869772998805]
1      [5.0, 25.22, 10.51373954599761]
2     [5.0, 25.61, 18.040621266427717]
3     [3.5, 12.49, 5.4958183990442055]
4      [3.5, 11.59, 4.301075268817205]
5       [0.5, 1.85, 10.75268817204301]
6      [4.0, 15.56, 5.734767025089606]
7      [5.0, 21.68, 4.540023894862605]
8     [4.0, 17.94, 12.425328554360812]
9      [4.5, 19.36, 5.137395459976105]
10     [4.0, 15.77, 6.451612903225806]
11     [4.5, 18.99, 4.301075268817205]
12     [5.0, 23.89, 5.734767025089606]
13      [5.0, 25.13, 9.55794504181601]
dtype: object