In [1]:
import findspark
findspark.init()

In [2]:
import pandas as pd
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.types import *

In [3]:
sc = SparkContext()
spark = SparkSession(sc)

In [8]:
data = spark.read.csv('Products.csv', header = True, inferSchema = True)

In [9]:
data.printSchema()

root
 |-- index: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- price: string (nullable = true)
 |-- list_price: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- group: string (nullable = true)
 |-- url: string (nullable = true)
 |-- image: string (nullable = true)



In [10]:
data.show(5)

+--------------------+--------+--------------------+------------------+------+-----+----------+-----+-----+----+-----+
|               index| item_id|                name|       description|rating|price|list_price|brand|group| url|image|
+--------------------+--------+--------------------+------------------+------+-----+----------+-----+-----+----+-----+
|                   0|48102821|Tai nghe Bluetoot...|THÔNG TIN CHI TIẾT|  null| null|      null| null| null|null| null|
|  Dung lượng pin 300|    null|                null|              null|  null| null|      null| null| null|null| null|
|Thời gian pin - T...|    null|                null|              null|  null| null|      null| null| null|null| null|
|         Bluetooth 5|    null|                null|              null|  null| null|      null| null| null|null| null|
|     Thương hiệu OEM|    null|                null|              null|  null| null|      null| null| null|null| null|
+--------------------+--------+-----------------

In [12]:
data[['item_id', 'rating']].show(5)

+--------+------+
| item_id|rating|
+--------+------+
|48102821|  null|
|    null|  null|
|    null|  null|
|    null|  null|
|    null|  null|
+--------+------+
only showing top 5 rows



In [14]:
df = pd.read_csv('Products.csv')
df = df[['index', 'item_id', 'rating']]

In [16]:
df_schema = StructType([StructField('index', StringType(), True),
                       StructField('item_id', StringType(), True),
                       StructField('rating', DoubleType(), True)
                      ])
data_sub = spark.createDataFrame(df, schema = df_schema)

In [18]:
data_sub.show(5)

+-----+--------+------+
|index| item_id|rating|
+-----+--------+------+
|    0|48102821|   4.0|
|    1|52333193|   4.5|
|    2|  299461|   4.8|
|    3|57440329|   4.7|
|    4|38458616|   4.8|
+-----+--------+------+
only showing top 5 rows



In [20]:
data_sub.printSchema()

root
 |-- index: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- rating: double (nullable = true)



In [21]:
users = data_sub.select('index').distinct().count()
products = data_sub.select('item_id').distinct().count()
numerator = data_sub.count()

In [22]:
denominator = users * products
denominator

19096900

In [23]:
sparsity = 1 - (numerator * 1.0 / denominator)

print('Sparsity: {}'.format(sparsity))

Sparsity: 0.9997711670480549


In [25]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

indexer = StringIndexer(inputCol = 'product_id', outputCol = 'product_idx')
indexer_model = indexer.fit(data_sub)
data_indexed = indexer_model.transform(data_sub)

indexer1 = StringIndexer(inputCol = 'index', outputCol = 'customer_idx')
indexer1_model = indexer1.fit(data_indexed)
data_indexed = indexer1_model.transform(data_indexed)

Py4JJavaError: An error occurred while calling o101.fit.
: org.apache.spark.SparkException: Input column product_id does not exist.
	at org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema(StringIndexer.scala:123)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema$(StringIndexer.scala:115)
	at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:145)
	at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:252)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:237)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
data_indexed.show(5)

In [None]:
data_indexed.count()

In [None]:
train_data, test_data = data_indexed.randomSplit([0.8, 0.2], seed = 42)