This is reference code to show how one could use collaborative filtering to predict fund managers interest in funds. For this model, we'll use the terms `mgrId` for the fund manager and `acctId` for the funds. We'll repurpose the movie lense data for this example to show what the syntax would be for training. In a production model, the fund manager rate of a fund would have to be derived using a heuristic to represent interest based upon existing holding and recent activities. 

---


First, we'll create our rate data:

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("/FileStore/shared_uploads/brad.barker@databricks.com/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(mgrId=int(p[0]), acctId=int(p[1]),
                                     rating=float(p[2]), timestamp=str(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])
display(ratings)

mgrId,acctId,rating,timestamp
0,2,3.0,1424380312
0,3,1.0,1424380312
0,5,2.0,1424380312
0,9,4.0,1424380312
0,11,1.0,1424380312
0,12,2.0,1424380312
0,15,1.0,1424380312
0,17,1.0,1424380312
0,19,1.0,1424380312
0,21,1.0,1424380312


In [0]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="mgrId", itemCol="acctId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

In [0]:
# The model is fit. Under the covers this creates two latent feature matrices that when multiplied return the prediction for all purmutations of mgrId and acctId.
predictions = model.transform(test)
display(predictions.select('mgrId','acctId','prediction'))

mgrId,acctId,prediction
28,0,-2.216629
28,20,-0.68427944
28,27,-2.4498608
28,38,-1.1607066
28,52,1.1881276
28,56,-0.34887445
28,62,0.68853253
28,65,2.5819151
28,78,-0.5394548
28,82,1.9684495


In [0]:
# we will also want evaluate the performance of predictions on the test data:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.9650014273064944


In [0]:
# Thus if we wanted to know which fund mgrs that would be most interested in a specific funds, we first create a dataframe with the subset of funds we are selling, say acctId 5 & 6:
our_funds_id = spark.createDataFrame(pd.DataFrame(data={'acctId': [5,6]})) 
display(our_funds_id)

acctId
5
6


In [0]:
# We can now pick out the top 10 fund managers that would like our fund the most:
display(model.recommendForItemSubset(our_funds_id, 10))

acctId,recommendations
5,"List(List(16, 2.797384), List(15, 2.3128467), List(22, 2.207551), List(24, 2.1676114), List(26, 1.9074308), List(11, 1.8944778), List(0, 1.8361195), List(28, 1.7344811), List(19, 1.5703579), List(3, 1.4731323))"
6,"List(List(26, 2.7549696), List(24, 2.3356304), List(16, 2.3319616), List(22, 2.2970455), List(19, 1.9992425), List(11, 1.9093401), List(15, 1.8146858), List(3, 1.7753186), List(0, 1.7735116), List(12, 1.7725106))"


**NOTE**: depending on the data used to show current interest, its possible we'll want to use an implicit instead of explicit parameter setting. The demo uses explicit because in the demo data the movie ratings were explicitly rated by users.