In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split

In [2]:
train_raw = pd.read_csv('train.dat', header=None,
            names=["uid", "iid", "rating", "ts"], sep='\t', engine='python')
train_raw

Unnamed: 0,uid,iid,rating,ts
0,905,470,1,889325071
1,697,1518,5,879835275
2,855,1687,5,875638677
3,950,1447,5,877420720
4,806,1170,4,879889337
...,...,...,...,...
85719,205,1136,1,884142487
85720,708,1497,4,881473612
85721,167,1036,3,875492395
85722,508,1528,3,880337585


In [3]:
test_raw = pd.read_csv('test.dat', header=None,
            names=["uid", "iid"], sep='\t', engine='python')
test_raw

Unnamed: 0,uid,iid
0,158,951
1,521,1202
2,98,1556
3,292,1583
4,68,1064
...,...,...
2149,537,1414
2150,618,1448
2151,154,1519
2152,154,1429


In [6]:
train, valid = train_test_split(train_raw, test_size=0.2)

In [7]:
train.to_csv("train_set.csv")
valid.to_csv("valid_set.csv")

# Feature Vectors

In [59]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import *
from pyspark.ml.feature import MinHashLSH

In [8]:
train_set = pd.read_csv('train_set.csv')

In [21]:
max_iid = train_set.iid.max()
u2irmap = {}

In [17]:
for index, r in train_set.iterrows():
    if r.uid not in u2irmap:
        u2irmap[r.uid] = {}
    u2irmap[r.uid][r.iid] = r.rating

In [28]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/12 11:52:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [89]:
spUsers = [(int(i), Vectors.sparse(max_iid+1, u2irmap[i])) for i in u2irmap] # need to convert numpy.int to int
dfUsers = spark.createDataFrame(spUsers, ["uid", "features"])

In [83]:
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfUsers)
model.transform(dfUsers).head()

# Approximate Nearest Neighbor

In [90]:
valid_set = pd.read_csv('valid_set.csv')

In [92]:
vu2irmap = {}
for index, r in valid_set.iterrows():
    if r.uid not in vu2irmap:
        vu2irmap[r.uid] = {}
    vu2irmap[r.uid][r.iid] = r.rating

In [95]:
vspUsers = [(int(i), Vectors.sparse(max_iid+1, vu2irmap[i])) for i in vu2irmap] # need to convert numpy.int to int
vdfUsers = spark.createDataFrame(vspUsers, ["uid", "features"])
model.approxNearestNeighbors(dfUsers, vdfUsers, 5).collect()
approxNearestNeighbors

Py4JError: An error occurred while calling o199.approxNearestNeighbors. Trace:
py4j.Py4JException: Method approxNearestNeighbors([class org.apache.spark.sql.Dataset, class org.apache.spark.sql.Dataset, class java.lang.Integer, class java.lang.String]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1589)



In [84]:
>>> data2 = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
...          (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
...          (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
>>> df2 = spark.createDataFrame(data2, ["id", "features"])
>>> key = Vectors.sparse(6, [1, 2], [1.0, 1.0])
>>> model.approxNearestNeighbors(df2, key, 1).collect()

Row(uid=908, features=SparseVector(1700, {485: 2.0, 486: 2.0, 487: 2.0, 488: 3.0, 489: 3.0, 492: 4.0, 506: 4.0, 605: 2.0, 610: 3.0, 626: 3.0, 627: 5.0, 648: 2.0, 653: 1.0, 654: 3.0, 658: 3.0, 659: 3.0, 660: 3.0, 663: 2.0, 667: 2.0, 677: 2.0, 684: 2.0, 686: 3.0, 688: 4.0, 689: 3.0, 702: 2.0, 714: 2.0, 716: 2.0, 720: 3.0, 723: 2.0, 739: 4.0, 751: 3.0, 753: 4.0, 766: 2.0, 769: 1.0, 770: 2.0, 772: 3.0, 775: 3.0, 855: 3.0, 859: 3.0, 866: 1.0, 874: 2.0, 875: 4.0, 877: 4.0, 880: 1.0, 898: 2.0, 900: 3.0, 911: 5.0, 915: 3.0, 919: 3.0, 920: 3.0, 929: 1.0, 939: 2.0, 942: 1.0, 944: 3.0, 945: 3.0, 952: 3.0, 957: 2.0, 958: 3.0, 963: 4.0, 965: 3.0, 968: 3.0, 971: 4.0, 972: 3.0, 973: 4.0, 975: 3.0, 978: 3.0, 980: 3.0, 983: 3.0, 985: 4.0, 988: 3.0, 993: 4.0, 996: 3.0, 998: 3.0, 1008: 4.0, 1016: 3.0, 1021: 4.0, 1022: 2.0, 1026: 4.0, 1027: 4.0, 1028: 3.0, 1037: 4.0, 1040: 4.0, 1042: 3.0, 1049: 4.0, 1058: 3.0, 1060: 5.0, 1064: 3.0, 1069: 4.0, 1072: 4.0, 1073: 3.0, 1080: 3.0, 1081: 4.0, 1104: 2.0, 1105: 3.

In [85]:
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfUsers).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
|uid|            features|              hashes|
+---+--------------------+--------------------+
|908|(1700,[485,486,48...|[[3246246.0], [1....|
|646|(1700,[189,234,45...|[[4414153.0], [62...|
|697|(1700,[189,190,19...|[[1359174.0], [10...|
|752|(1700,[846,894,10...|[[5.2396333E7], [...|
|947|(1700,[776,952,11...|[[6.8390393E7], [...|
|849|(1700,[401,403,41...|[[7469132.0], [47...|
|827|(1700,[435,716,76...|[[1.2411183E7], [...|
|357|(1700,[561,599,62...|[[7.8274495E7], [...|
|688|(1700,[184,249,40...|[[7469132.0], [99...|
|863|(1700,[583,672,80...|[[1.16372573E8], ...|
|705|(1700,[197,227,24...|[[7469132.0], [1....|
|858|(1700,[805,1237,1...|[[7.1445372E7], [...|
|674|(1700,[469,731,75...|[[9.7323534E7], [...|
|855|(1700,[408,409,41...|[[3246246.0], [73...|
|999|(1700,[1428,1429,...|[[7469132.0], [62...|
|609|(1700,[537,599,73...|[[4.9341354E7], [...|
|859|(1700,[41