In [1]:
spark

In [2]:
sc

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnull
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [4]:
logs = spark.read.parquet('hdfs://devenv/user/ml_datasets/ec_web_logs_analysis/data/')

In [5]:
# Preprocessing and feature engineering
feature_prep = logs.select("product_category_id", "device_type", "connect_type", "age_group") \
                   .where(~isnull("age_group"))

In [6]:
feature_prep = StringIndexer(inputCol="age_group", outputCol="age_group_indexed") \
              .fit(feature_prep) \
              .transform(feature_prep)

In [7]:
# feature_prep.show(10)

+-------------------+-----------+------------+---------+-----------------+
|product_category_id|device_type|connect_type|age_group|age_group_indexed|
+-------------------+-----------+------------+---------+-----------------+
|                 60|          2|           1|    36-50|              1.0|
|                156|          1|           2|  over 50|              3.0|
|                 53|          2|           2|    36-50|              1.0|
|                113|          1|           2|    21-35|              0.0|
|                104|          2|           2|    21-35|              0.0|
|                  6|          2|           2| under 20|              2.0|
|                123|          2|           1|    21-35|              0.0|
|                 27|          1|           1| under 20|              2.0|
|                 64|          1|           2|  over 50|              3.0|
|                150|          1|           2|  over 50|              3.0|
+-------------------+----

In [8]:
final_data = VectorAssembler(inputCols=["product_category_id", "device_type", "connect_type"],
                             outputCol="features").transform(feature_prep)

In [9]:
final_data.show(10)

+-------------------+-----------+------------+---------+-----------------+---------------+
|product_category_id|device_type|connect_type|age_group|age_group_indexed|       features|
+-------------------+-----------+------------+---------+-----------------+---------------+
|                 60|          2|           1|    36-50|              1.0| [60.0,2.0,1.0]|
|                156|          1|           2|  over 50|              3.0|[156.0,1.0,2.0]|
|                 53|          2|           2|    36-50|              1.0| [53.0,2.0,2.0]|
|                113|          1|           2|    21-35|              0.0|[113.0,1.0,2.0]|
|                104|          2|           2|    21-35|              0.0|[104.0,2.0,2.0]|
|                  6|          2|           2| under 20|              2.0|  [6.0,2.0,2.0]|
|                123|          2|           1|    21-35|              0.0|[123.0,2.0,1.0]|
|                 27|          1|           1| under 20|              2.0| [27.0,1.0,1.0]|

In [10]:
# Split data into train and test sets
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [11]:
# Model training
classifier = DecisionTreeClassifier(featuresCol="features", labelCol="age_group_indexed", maxDepth=10)
model = classifier.fit(train_data)

In [12]:
# Model training 隨機森林
# classifier = RandomForestClassifier(featuresCol="features", labelCol="age_group_indexed", numTrees=100, maxDepth=10)
# model = classifier.fit(train_data)

In [13]:
# 神經網路
# from pyspark.ml.classification import MultilayerPerceptronClassifier
# classifier = MultilayerPerceptronClassifier(featuresCol="features", labelCol="age_group_indexed", layers=[3, 10, 10, 4])
# model = classifier.fit(train_data)

In [None]:
# 邏輯思回歸
# from pyspark.ml.classification import LogisticRegression 
# classifier = LogisticRegression(featuresCol="features", labelCol="age_group_indexed")
# model = classifier.fit(train_data)

In [14]:
# Transform the test data using the model to get predictions
predicted_test_data = model.transform(test_data)

In [20]:
predicted_test_data.drop('product_category_id','device_type').show(10)

+------------+---------+-----------------+-------------+--------------------+--------------------+----------+
|connect_type|age_group|age_group_indexed|     features|       rawPrediction|         probability|prediction|
+------------+---------+-----------------+-------------+--------------------+--------------------+----------+
|           1|    21-35|              0.0|[1.0,1.0,1.0]|[4310.0,7111.0,71...|[0.05064034778521...|       2.0|
|           1|    36-50|              1.0|[1.0,1.0,1.0]|[4310.0,7111.0,71...|[0.05064034778521...|       2.0|
|           1|    36-50|              1.0|[1.0,1.0,1.0]|[4310.0,7111.0,71...|[0.05064034778521...|       2.0|
|           1|    36-50|              1.0|[1.0,1.0,1.0]|[4310.0,7111.0,71...|[0.05064034778521...|       2.0|
|           1|  over 50|              3.0|[1.0,1.0,1.0]|[4310.0,7111.0,71...|[0.05064034778521...|       2.0|
|           1|  over 50|              3.0|[1.0,1.0,1.0]|[4310.0,7111.0,71...|[0.05064034778521...|       2.0|
|         

In [16]:
# Evaluate the model performance
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='age_group_indexed',
                                                       predictionCol='prediction',
                                                       metricName='accuracy')
print("Accuracy: {}", evaluator_accuracy.evaluate(predicted_test_data))

Accuracy: {} 0.8244888597640891


In [17]:
confusion_matrix_info = predicted_test_data.select("age_group_indexed", "prediction")\
                                           .groupBy("age_group_indexed", "prediction")\
                                           .count()

In [18]:
confusion_matrix_info.orderBy("age_group_indexed", "prediction").show()

+-----------------+----------+-----+
|age_group_indexed|prediction|count|
+-----------------+----------+-----+
|              0.0|       0.0|39460|
|              0.0|       1.0| 3122|
|              0.0|       2.0| 2445|
|              0.0|       3.0| 2487|
|              1.0|       0.0| 1238|
|              1.0|       1.0|33485|
|              1.0|       2.0| 4542|
|              2.0|       0.0| 1350|
|              2.0|       1.0| 4895|
|              2.0|       2.0|32985|
|              3.0|       0.0| 3712|
|              3.0|       1.0| 1578|
|              3.0|       2.0| 1414|
|              3.0|       3.0|19887|
+-----------------+----------+-----+



In [19]:
confusion_matrix_info.orderBy("prediction", "age_group_indexed").show(10)

+-----------------+----------+-----+
|age_group_indexed|prediction|count|
+-----------------+----------+-----+
|              0.0|       0.0|39460|
|              1.0|       0.0| 1238|
|              2.0|       0.0| 1350|
|              3.0|       0.0| 3712|
|              0.0|       1.0| 3122|
|              1.0|       1.0|33485|
|              2.0|       1.0| 4895|
|              3.0|       1.0| 1578|
|              0.0|       2.0| 2445|
|              1.0|       2.0| 4542|
+-----------------+----------+-----+
only showing top 10 rows



In [None]:
# Save the model
model.save("file:///home/spark/Desktop/models/ec_web_logs_analysis/model_age_group_prediction/")