**Analisis Amazon Fine Food Reviews dengan PySpark dan Random Forest**

In [1]:
# install
!pip install findspark
!pip install pyspark
import findspark
findspark.init()

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840629 sha256=f7b84e92a3e74f89550528d16eb64f189e2da7dac24c986c5acec4a3c3bc8802
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [2]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
# Start Spark session
spark = SparkSession.builder.appName("Amazon Fine Food Reviews Classification").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/19 17:52:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Load dataset
df = spark.read.csv('/kaggle/input/amazon-fine-food-reviews/Reviews.csv', header=True, inferSchema=True)

# Tampilkan beberapa baris pertama dataset
df.show(5)

                                                                                

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|
|  4|B000UA0QIQ|A395BORC6FGVXV|                Karl|            

In [5]:
# Hapus nilai null dari kolom 'Text' dan 'Score'
df = df.na.drop(subset=["Text", "Score"])

# Tokenisasi teks dan hapus stop words
tokenizer = Tokenizer(inputCol="Text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

In [6]:
# Mengubah teks menjadi fitur TF-IDF
hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")

In [7]:
# Konversi kolom 'Score' menjadi label dan tangani label yang tidak terlihat
label_indexer = StringIndexer(inputCol="Score", outputCol="label", handleInvalid="keep")

In [8]:
# Definisikan Random Forest Classifier
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50)

In [9]:
# Membuat pipeline untuk menggabungkan praproses dan model
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, label_indexer, rf])

In [10]:
# Membagi data menjadi data pelatihan dan pengujian
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

In [11]:
# Melatih model menggunakan data pelatihan
model = pipeline.fit(train_data)

24/10/19 17:54:22 WARN MemoryStore: Not enough space to cache rdd_51_0 in memory! (computed 24.2 MiB so far)
24/10/19 17:54:22 WARN BlockManager: Persisting block rdd_51_0 to disk instead.
24/10/19 17:54:22 WARN MemoryStore: Not enough space to cache rdd_51_1 in memory! (computed 24.2 MiB so far)
24/10/19 17:54:22 WARN BlockManager: Persisting block rdd_51_1 to disk instead.
24/10/19 17:54:22 WARN MemoryStore: Not enough space to cache rdd_51_2 in memory! (computed 10.5 MiB so far)
24/10/19 17:54:22 WARN BlockManager: Persisting block rdd_51_2 to disk instead.
24/10/19 17:54:22 WARN MemoryStore: Not enough space to cache rdd_51_3 in memory! (computed 24.2 MiB so far)
24/10/19 17:54:22 WARN BlockManager: Persisting block rdd_51_3 to disk instead.
24/10/19 17:55:32 WARN MemoryStore: Not enough space to cache rdd_51_3 in memory! (computed 130.0 MiB so far)
24/10/19 17:55:35 WARN MemoryStore: Not enough space to cache rdd_51_1 in memory! (computed 195.3 MiB so far)
24/10/19 17:55:36 WARN M

In [12]:
# Membuat prediksi pada data pengujian
predictions = model.transform(test_data)

In [13]:
# Evaluasi model dengan metrik akurasi
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Akurasi Model = {accuracy:.2f}")

24/10/19 17:57:45 WARN DAGScheduler: Broadcasting large task binary with size 1628.6 KiB

Akurasi Model = 0.64


                                                                                