Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 131 additions & 96 deletions nb/03-17 als_simulation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 44,
"execution_count": 94,
"id": "8b25a596",
"metadata": {},
"outputs": [
Expand All @@ -20,6 +20,7 @@
"from pyspark.sql import SparkSession\n",
"from pyspark.ml.evaluation import RegressionEvaluator\n",
"from pyspark.mllib.recommendation import ALS as newALS\n",
"from pyspark.ml.recommendation import ALS\n",
"from sklearn.metrics import mean_squared_error\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql.types import *\n",
Expand Down Expand Up @@ -50,29 +51,29 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": 103,
"id": "b7688ace",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"/data/wooders/ralf-vldb//datasets/ml-latest-small\n",
"Downloading from aws: vldb\n"
"/data/wooders/ralf-vldb//datasets/ml-1m\n"
]
}
],
"source": [
"dataset_dir = use_dataset(\"ml-latest-small\", download=True)\n",
"dataset_dir = use_dataset(\"ml-1m\")\n",
"ratings_path = f\"{dataset_dir}/ratings.csv\"\n",
"tags_path = f\"{dataset_dir}/tags.csv\" \n",
"movies_path = f\"{dataset_dir}/movies.csv\""
"movies_path = f\"{dataset_dir}/movies.csv\"\n",
"path = dataset_dir"
]
},
{
"cell_type": "code",
"execution_count": 53,
"execution_count": 72,
"id": "0ab59e64",
"metadata": {},
"outputs": [],
Expand All @@ -83,7 +84,7 @@
},
{
"cell_type": "code",
"execution_count": 14,
"execution_count": 73,
"id": "ccdd8301",
"metadata": {},
"outputs": [],
Expand All @@ -92,7 +93,7 @@
"\n",
"def split_data(df): \n",
" start_ts = df['timestamp'].min()\n",
" med_ts = df['timestamp'].quantile(.5)\n",
" med_ts = df['timestamp'].quantile(.25)\n",
" end_ts = df['timestamp'].max()\n",
" train_df = df[df['timestamp'] <= med_ts]\n",
" stream_df = df[df['timestamp'] > med_ts]\n",
Expand All @@ -113,30 +114,30 @@
},
{
"cell_type": "code",
"execution_count": 9,
"execution_count": 75,
"id": "2bf51807",
"metadata": {},
"outputs": [],
"source": [
"tags = pd.read_csv(tags_path)\n",
"tags.columns = ['user_id', 'movie_id', 'tag', 'timestamp']\n",
"#tags = pd.read_csv(tags_path)\n",
"#tags.columns = ['user_id', 'movie_id', 'tag', 'timestamp']\n",
"ratings = pd.read_csv(ratings_path)\n",
"ratings.columns = ['user_id', 'movie_id', 'rating', 'timestamp']\n",
"movies = pd.read_csv(movies_path)\n",
"movies.columns = ['movie_id', 'title', 'genres']"
"#movies = pd.read_csv(movies_path)\n",
"#movies.columns = ['movie_id', 'title', 'genres']"
]
},
{
"cell_type": "code",
"execution_count": 15,
"execution_count": 76,
"id": "80579d50",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"5567 7720 50418\n"
"3346 3676 750156\n"
]
}
],
Expand All @@ -147,73 +148,19 @@
},
{
"cell_type": "code",
"execution_count": 6,
"id": "6ac525ca",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"ParserWarning: Falling back to the 'python' engine because the 'c' engine does not support regex separators (separators > 1 char and different from '\\s+' are interpreted as regex); you can avoid this warning by specifying engine='python'.\n",
"UserWarning: sort_values defaulting to pandas implementation.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"3551 3643 500104\n"
]
},
{
"data": {
"text/plain": [
"(956703932, 973018006.0, 1046454590)"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"split_data(path)"
]
},
{
"cell_type": "code",
"execution_count": 16,
"execution_count": 80,
"id": "4a38666a",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"22/03/25 16:38:46 WARN Utils: Your hostname, flaminio resolves to a loopback address: 127.0.1.1; using 169.229.48.114 instead (on interface enp1s0f0)\n",
"22/03/25 16:38:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n",
"WARNING: An illegal reflective access operation has occurred\n",
"WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/eecs/wooders/miniconda3/lib/python3.9/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
"WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
"WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
"WARNING: All illegal access operations will be denied in a future release\n",
"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
"22/03/25 16:38:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
]
}
],
"outputs": [],
"source": [
"spark = SparkSession.builder.master('local').appName('als').getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 18,
"execution_count": 100,
"id": "a3fbc1e7",
"metadata": {},
"outputs": [],
Expand All @@ -223,7 +170,7 @@
},
{
"cell_type": "code",
"execution_count": 19,
"execution_count": 101,
"id": "4f286b1b",
"metadata": {},
"outputs": [
Expand All @@ -234,26 +181,26 @@
"+-------+--------+------+\n",
"|user_id|movie_id|rating|\n",
"+-------+--------+------+\n",
"| 1| 1| 4.0|\n",
"| 1| 3| 4.0|\n",
"| 1| 6| 4.0|\n",
"| 1| 47| 5.0|\n",
"| 1| 50| 5.0|\n",
"| 1| 70| 3.0|\n",
"| 1| 101| 5.0|\n",
"| 1| 110| 4.0|\n",
"| 1| 151| 5.0|\n",
"| 1| 157| 5.0|\n",
"| 1| 163| 5.0|\n",
"| 1| 216| 5.0|\n",
"| 1| 223| 3.0|\n",
"| 1| 231| 5.0|\n",
"| 1| 235| 4.0|\n",
"| 1| 260| 5.0|\n",
"| 1| 296| 3.0|\n",
"| 1| 316| 3.0|\n",
"| 1| 333| 5.0|\n",
"| 1| 349| 4.0|\n",
"| 4268| 3064| 1|\n",
"| 4268| 955| 3|\n",
"| 4268| 260| 4|\n",
"| 4268| 356| 5|\n",
"| 4268| 1270| 4|\n",
"| 4268| 1625| 3|\n",
"| 4269| 3791| 1|\n",
"| 4269| 3793| 4|\n",
"| 4269| 1| 5|\n",
"| 4269| 2064| 4|\n",
"| 4269| 2065| 5|\n",
"| 4269| 3022| 5|\n",
"| 4269| 2080| 4|\n",
"| 4269| 2081| 3|\n",
"| 4269| 908| 4|\n",
"| 4269| 910| 4|\n",
"| 4269| 921| 4|\n",
"| 4269| 3061| 5|\n",
"| 4269| 3070| 5|\n",
"| 4269| 3072| 4|\n",
"+-------+--------+------+\n",
"only showing top 20 rows\n",
"\n"
Expand All @@ -275,15 +222,103 @@
},
{
"cell_type": "code",
"execution_count": 20,
"execution_count": 104,
"id": "d5a101ba",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"als = ALS(\n",
" userCol=\"user_id\", \n",
" itemCol=\"movie_id\",\n",
" ratingCol=\"rating\", \n",
" nonnegative = True, \n",
" implicitPrefs = False,\n",
" coldStartStrategy=\"drop\",\n",
" rank=50,\n",
" maxIter=10,\n",
" regParam=.1\n",
")\n",
"model=als.fit(spark_df)"
]
},
{
"cell_type": "code",
"execution_count": 105,
"id": "d47294c0",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"data": {
"text/plain": [
"0.7407670697276212"
]
},
"execution_count": 105,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"predictions = model.transform(spark_df)\n",
"evaluator = RegressionEvaluator().setMetricName(\"rmse\").setLabelCol(\"rating\").setPredictionCol(\"prediction\")\n",
"rmse = evaluator.evaluate(predictions)\n",
"rmse"
]
},
{
"cell_type": "code",
"execution_count": 88,
"id": "7aa65951",
"metadata": {},
"outputs": [
{
"ename": "AttributeError",
"evalue": "'MatrixFactorizationModel' object has no attribute 'transform'",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mAttributeError\u001b[0m Traceback (most recent call last)",
"Input \u001b[0;32mIn [88]\u001b[0m, in \u001b[0;36m<cell line: 2>\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;66;03m# Evaluate the model by computing the RMSE on the test data\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m predictions \u001b[38;5;241m=\u001b[39m \u001b[43mnew_model\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mtransform\u001b[49m(spark_df\u001b[38;5;241m.\u001b[39mrdd)\n\u001b[1;32m 3\u001b[0m evaluator \u001b[38;5;241m=\u001b[39m RegressionEvaluator(metricName\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mrmse\u001b[39m\u001b[38;5;124m\"\u001b[39m, labelCol\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mrating\u001b[39m\u001b[38;5;124m\"\u001b[39m,\n\u001b[1;32m 4\u001b[0m predictionCol\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mprediction\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 5\u001b[0m rmse \u001b[38;5;241m=\u001b[39m evaluator\u001b[38;5;241m.\u001b[39mevaluate(predictions)\n",
"\u001b[0;31mAttributeError\u001b[0m: 'MatrixFactorizationModel' object has no attribute 'transform'"
]
}
],
"source": [
"# Evaluate the model by computing the RMSE on the test data\n",
"predictions = new_model.transform(spark_df.rdd)\n",
"evaluator = RegressionEvaluator(metricName=\"rmse\", labelCol=\"rating\",\n",
" predictionCol=\"prediction\")\n",
"\n",
"rmse = evaluator.evaluate(predictions)"
]
},
{
"cell_type": "code",
"execution_count": 83,
"id": "4b6b9644",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"22/03/25 16:39:18 WARN BlockManager: Task 49 already completed, not releasing lock for rdd_318_0\n",
"22/03/26 18:49:39 WARN BlockManager: Task 99 already completed, not releasing lock for rdd_658_0\n",
"22/03/26 18:49:40 WARN BlockManager: Task 101 already completed, not releasing lock for rdd_659_0\n",
" \r"
]
}
Expand Down
Loading