In [1]:
# Replace with your values
#
# NOTE: Set the access to this notebook appropriately to protect the security of your keys.
# Or you can delete this cell after you run the mount command below once successfully.
ACCESS_KEY = "Your-access-key"
SECRET_KEY = "Your-secret-key"
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "ads-proj5-data/"
MOUNT_NAME = "data"

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)


In [2]:
display(dbutils.fs.ls("/mnt/%s" % MOUNT_NAME))

In [3]:
review_df = spark.read.json("/mnt/data/reviews_Kindle_Store.json").dropDuplicates()
ItemTopics = spark.read.load("/mnt/data/predictions.csv", 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true').dropDuplicates()
user_map = spark.read.load("/mnt/data/user_map.csv", 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')
item_map = spark.read.load("/mnt/data/item_map.csv", 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

In [4]:
ItemTopicsRDD = ItemTopics.rdd.map(lambda r: (r[0],[r[i] for i in range(1,51)]))

In [5]:
ItemTopics = spark.createDataFrame(ItemTopicsRDD,['asin','topic'])

In [6]:
from pyspark.sql.functions import array, avg, col

In [7]:
ItemTopics = ItemTopics.groupby("asin").agg(array(*[avg(col("topic")[i]) for i in range(50)]).alias("topic"))

In [8]:
df = review_df.select(review_df.asin,review_df.overall.alias("rating"),review_df.reviewerID).join(ItemTopics,"asin").join(item_map,"asin").join(user_map,"reviewerID")

In [9]:
df = df.select(df.user,df.item,df.rating,df.topic)

In [10]:
df.cache()

In [11]:
df.rdd.getNumPartitions()

In [12]:
row1 = df.agg({"user": "max", "item":"max"}).collect()
print row1

In [13]:
from pyspark.sql.functions import collect_list,first
from time import time
import numpy as np
from numpy.random import rand
from numpy import matrix

In [14]:
def CTM_train(Full,I,J,K,LAMBDA,max_iter=10,n_partition=6):
    '''
    '''
    
    # define update functions
    def updateU(i,v_ind,R,V,LAMBDA):
        '''
        '''
        r = v_ind.shape[0]
        K = V.shape[1]
    
        A = V[v_ind,:].T.dot(V[v_ind,:]) + LAMBDA*r*np.eye(K)
        b = V[v_ind,:].T.dot(R).T
        
        return (np.linalg.solve(A, b)).T
    
    def updateV(j,u_ind,R,U,LAMBDA,Th):
        '''
        '''
        r = u_ind.shape[0]
        K = U.shape[1]
    
        A = U[u_ind,:].T.dot(U[u_ind,:]) + LAMBDA*r*np.eye(K)
        b = U[u_ind,:].T.dot(R).T + LAMBDA*r*Th.reshape([K,1])
    
        return (np.linalg.solve(A, b)).T
    
    print('pre-compute block information...')
    Full = Full.repartition(n_partition)
    U_map = Full.groupBy("user").agg(collect_list("item").alias('items'),collect_list("rating").alias('ratings')).sort('user')
    V_map = Full.groupBy("item").agg(collect_list("user").alias('users'),collect_list("rating").alias('ratings'), first('topic').alias('topic')).sort('item')
    U_map = U_map.repartition(n_partition)
    V_map = V_map.repartition(n_partition)
    
    print('initialize parameters...')
    U = matrix(rand(I,K))
    V = matrix(rand(J,K))
    
    Us = sc.broadcast(U)
    Vs = sc.broadcast(V)
    
    print('update parameters...')
    for i in range(max_iter):
        
        
        st = time()
        U = U_map.rdd.map(lambda r: updateU(r[0],np.array(r[1]),np.array(r[2]),Vs.value,LAMBDA)).reduce(lambda a,b: np.vstack((a,b)))
        Us = sc.broadcast(U)
        
        
        V = V_map.rdd.map(lambda r: updateV(r[0],np.array(r[1]),np.array(r[2]),Us.value,LAMBDA,np.array(r[3]))).reduce(lambda a,b: np.vstack((a,b)))
        Vs = sc.broadcast(V)
        ed = time()
        
        
        print('Finish iteration round: '+str(i)+', use time: '+str(round(ed-st,4))+'s.\n')
    
    return (U,V)        

In [15]:
U,V = CTM_train(df,1406710,430520,50,LAMBDA=0.01,max_iter=5,n_partition=200)

In [16]:
df.show()

In [17]:
indexU = np.array(range(U.shape[0]))
UM = np.column_stack((indexU,U))
indexV = np.array(range(V.shape[0]))
VM = np.column_stack((indexV,V))

In [18]:
UserMatrix = sc.parallelize(UM)
ItemMatrix = sc.parallelize(VM)

In [19]:
UsM = UserMatrix.map(lambda r: r.tolist()[0])
VsM = ItemMatrix.map(lambda r: r.tolist()[0])

In [20]:
def toCSVLine(data):
    return ','.join(str(d) for d in data)

Utxt = UsM.map(toCSVLine)
Utxt.saveAsTextFile('/mnt/data/UserTopics.csv')

In [21]:
Vtxt = VsM.map(toCSVLine)
Vtxt.saveAsTextFile('/mnt/data/ItemTopics.csv')