<a href="https://colab.research.google.com/github/Ujjwal1khadka/MLOps-Specialization-Notes/blob/main/Reminders_Recommendation_System_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_extract

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

spark = SparkSession.builder\
    .master("local")\
    .appName("Colab")\
    .getOrCreate()

In [None]:
df = spark.read.csv("/content/incident_event_log.csv", header=True)

In [None]:
df.show(5)

+----------+--------------+------+------------------+------------+-------------+--------+-----------+--------------+---------------+--------------+---------------+--------------+---------------+------------+------------+-----------+---------------+-----------+-------+----------+----------+------------+----------------+-----------+---------+-----------------------+-------------+----------+---+------+---------+-----------+---------------+---------------+--------------+
|    number|incident_state|active|reassignment_count|reopen_count|sys_mod_count|made_sla|  caller_id|     opened_by|      opened_at|sys_created_by| sys_created_at|sys_updated_by| sys_updated_at|contact_type|    location|   category|    subcategory|  u_symptom|cmdb_ci|    impact|   urgency|    priority|assignment_group|assigned_to|knowledge|u_priority_confirmation|       notify|problem_id|rfc|vendor|caused_by|closed_code|    resolved_by|    resolved_at|     closed_at|
+----------+--------------+------+------------------+---

In [None]:
print((df.count(), len(df.columns)))

(141712, 36)


In [None]:
df.printSchema()


root
 |-- number: string (nullable = true)
 |-- incident_state: string (nullable = true)
 |-- active: string (nullable = true)
 |-- reassignment_count: string (nullable = true)
 |-- reopen_count: string (nullable = true)
 |-- sys_mod_count: string (nullable = true)
 |-- made_sla: string (nullable = true)
 |-- caller_id: string (nullable = true)
 |-- opened_by: string (nullable = true)
 |-- opened_at: string (nullable = true)
 |-- sys_created_by: string (nullable = true)
 |-- sys_created_at: string (nullable = true)
 |-- sys_updated_by: string (nullable = true)
 |-- sys_updated_at: string (nullable = true)
 |-- contact_type: string (nullable = true)
 |-- location: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- u_symptom: string (nullable = true)
 |-- cmdb_ci: string (nullable = true)
 |-- impact: string (nullable = true)
 |-- urgency: string (nullable = true)
 |-- priority: string (nullable = true)
 |-- assignment_group: 

Data Pre-Processing

In [None]:
df = df.withColumn("caller_id", regexp_extract("caller_id", r'\d+', 0))
df = df.withColumn("opened_by", regexp_extract("caller_id", r'\d+', 0))
df = df.withColumn("sys_created_by", regexp_extract("sys_created_by", r'\d+', 0))
df = df.withColumn("sys_updated_by", regexp_extract("sys_updated_by", r'\d+', 0))
df = df.withColumn("location", regexp_extract("location", r'\d+', 0))
df = df.withColumn("category", regexp_extract("category", r'\d+', 0))
df = df.withColumn("subcategory", regexp_extract("subcategory", r'\d+', 0))
df = df.withColumn("u_symptom", regexp_extract("u_symptom", r'\d+', 0))
df = df.withColumn("assignment_group", regexp_extract("assignment_group", r'\d+', 0))
df = df.withColumn("assigned_to", regexp_extract("assigned_to", r'\d+', 0))
df = df.withColumn("problem_id", regexp_extract("problem_id", r'\d+', 0))
df = df.withColumn("closed_code", regexp_extract("closed_code", r'\d+', 0))
df = df.withColumn("resolved_by", regexp_extract("resolved_by", r'\d+', 0))
df = df.withColumn("urgency", regexp_extract("urgency", r'\d+', 0))
df = df.withColumn("impact", regexp_extract("impact", r'\d+', 0))
df = df.withColumn("priority", regexp_extract("priority", r'\d+', 0))

df.show(5)


+----------+--------------+------+------------------+------------+-------------+--------+---------+---------+---------------+--------------+---------------+--------------+---------------+------------+--------+--------+-----------+---------+-------+------+-------+--------+----------------+-----------+---------+-----------------------+-------------+----------+---+------+---------+-----------+-----------+---------------+--------------+
|    number|incident_state|active|reassignment_count|reopen_count|sys_mod_count|made_sla|caller_id|opened_by|      opened_at|sys_created_by| sys_created_at|sys_updated_by| sys_updated_at|contact_type|location|category|subcategory|u_symptom|cmdb_ci|impact|urgency|priority|assignment_group|assigned_to|knowledge|u_priority_confirmation|       notify|problem_id|rfc|vendor|caused_by|closed_code|resolved_by|    resolved_at|     closed_at|
+----------+--------------+------+------------------+------------+-------------+--------+---------+---------+---------------+-

In [None]:
df = df.drop("cmdb_ci")

In [None]:
print((df.count(), len(df.columns)))

(141712, 35)


In [None]:
df = df.select("caller_id", "category", "subcategory", "incident_state", "location", "assignment_group", "knowledge", "urgency", "impact")
df = df.na.drop(subset=["assignment_group"])
df.show(5)


+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
|caller_id|category|subcategory|incident_state|location|assignment_group|knowledge|urgency|impact|
+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
|     2403|      55|        170|           New|     143|              56|     true|      2|     2|
|     2403|      55|        170|      Resolved|     143|              56|     true|      2|     2|
|     2403|      55|        170|      Resolved|     143|              56|     true|      2|     2|
|     2403|      55|        170|        Closed|     143|              56|     true|      2|     2|
|     2403|      40|        215|           New|     165|              70|     true|      2|     2|
+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
only showing top 5 rows



In [None]:
df = df.filter((df["assignment_group"].isNotNull()))
df = df.filter((df["location"].isNotNull()))
df = df.filter((df["category"].isNotNull()))
df = df.filter((df["caller_id"].isNotNull()))
df = df.filter((df["impact"].isNotNull()))
df = df.filter((df["urgency"].isNotNull()))
df = df.filter((df["knowledge"].isNotNull()))

df.show(5)

+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
|caller_id|category|subcategory|incident_state|location|assignment_group|knowledge|urgency|impact|
+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
|     2403|      55|        170|           New|     143|              56|     true|      2|     2|
|     2403|      55|        170|      Resolved|     143|              56|     true|      2|     2|
|     2403|      55|        170|      Resolved|     143|              56|     true|      2|     2|
|     2403|      55|        170|        Closed|     143|              56|     true|      2|     2|
|     2403|      40|        215|           New|     165|              70|     true|      2|     2|
+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
only showing top 5 rows



In [None]:
df = df.withColumn("caller_id", col("caller_id").cast("integer"))

In [None]:
df = df.withColumn("location", col("location").cast("integer"))


In [None]:
df = df.withColumn("assignment_group", col("assignment_group").cast("integer"))


In [None]:
df = df.withColumn("impact", col("impact").cast("integer"))


In [None]:
df = df.withColumn("urgency", col("urgency").cast("integer"))


In [None]:
df = df.withColumn("category", col("category").cast("integer"))


In [None]:
df = df.withColumn("subcategory", col("subcategory").cast("integer"))


In [None]:
df.printSchema()

root
 |-- caller_id: integer (nullable = true)
 |-- category: integer (nullable = true)
 |-- subcategory: integer (nullable = true)
 |-- incident_state: string (nullable = true)
 |-- location: integer (nullable = true)
 |-- assignment_group: integer (nullable = true)
 |-- knowledge: string (nullable = true)
 |-- urgency: integer (nullable = true)
 |-- impact: integer (nullable = true)



In [None]:
df.show(5)

+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
|caller_id|category|subcategory|incident_state|location|assignment_group|knowledge|urgency|impact|
+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
|     2403|      55|        170|           New|     143|              56|     true|      2|     2|
|     2403|      55|        170|      Resolved|     143|              56|     true|      2|     2|
|     2403|      55|        170|      Resolved|     143|              56|     true|      2|     2|
|     2403|      55|        170|        Closed|     143|              56|     true|      2|     2|
|     2403|      40|        215|           New|     165|              70|     true|      2|     2|
+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
only showing top 5 rows



In [None]:
df = df.na.drop(subset=["caller_id"])


In [None]:
df = df.na.drop(subset=["category"])


In [None]:
df = df.na.drop(subset=["subcategory"])


In [None]:
df = df.na.drop(subset=["assignment_group"])


In [None]:
columns_to_check = ["caller_id", "category", "subcategory", "incident_state", "location", "assignment_group", "knowledge", "urgency", "impact"]

missing_rows = df.filter((col("caller_id").isNull()) |
 (col("location").isNull()) |
(col("assignment_group").isNull()) |
 (col("category").isNull()) |
  (col("subcategory").isNull()) |
   (col("urgency").isNull()) |
    (col("impact").isNull())|
     (col("knowledge").isNull()))

num_missing_values = missing_rows.count()

missing_rows.show()

for column in columns_to_check:
    num_missing_values_column = missing_rows.filter(col(column).isNull()).count()
    print("Number of missing values in column '{}': {}".format(column, num_missing_values_column))

+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
|caller_id|category|subcategory|incident_state|location|assignment_group|knowledge|urgency|impact|
+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+
+---------+--------+-----------+--------------+--------+----------------+---------+-------+------+

Number of missing values in column 'caller_id': 0
Number of missing values in column 'category': 0
Number of missing values in column 'subcategory': 0
Number of missing values in column 'incident_state': 0
Number of missing values in column 'location': 0
Number of missing values in column 'assignment_group': 0
Number of missing values in column 'knowledge': 0
Number of missing values in column 'urgency': 0
Number of missing values in column 'impact': 0


In [None]:
print((df.count(), len(df.columns)))

(127430, 9)


Training the model

In [None]:
train_data, test_data = df.randomSplit([0.8, 0.2])


User Category and it's urgency *Recommendations*

In [None]:
als = ALS(maxIter=5,
          regParam=0.01,
          userCol="caller_id",
          itemCol="category",
          ratingCol="urgency",
          coldStartStrategy="drop",
          nonnegative = True,
          implicitPrefs = False)

model = als.fit(train_data)

userRecs = model.recommendForAllUsers(5)

userRecs.show(10, truncate=False)

#caller_id: Represents the user for whom recommendations are being provided.
#recommendations: Contains a list of top recommended items along with their predicted ratings for the corresponding user.

#For user with caller_id 2, the top 5 recommended items are:
#category 16 with a predicted rating of 15.535255

#he higher the predicted rating, the more likely the category is  recommended to users.


+---------+-----------------------------------------------------------------------------------+
|caller_id|recommendations                                                                    |
+---------+-----------------------------------------------------------------------------------+
|2        |[{16, 15.535255}, {52, 13.432845}, {59, 11.002541}, {5, 10.120635}, {21, 8.300005}]|
|4        |[{59, 9.48207}, {52, 9.210654}, {16, 7.8099813}, {5, 7.572008}, {21, 5.5738273}]   |
|5        |[{16, 10.794547}, {59, 8.608426}, {5, 8.057395}, {52, 7.5779934}, {21, 5.0074863}] |
|6        |[{59, 10.17369}, {52, 8.40568}, {5, 7.4064507}, {21, 4.994644}, {16, 4.8975425}]   |
|7        |[{59, 9.131418}, {52, 8.305286}, {5, 6.3837423}, {16, 5.0113325}, {21, 4.9521446}] |
|8        |[{16, 9.660321}, {59, 9.524336}, {5, 8.760149}, {52, 8.54721}, {63, 5.5260777}]    |
|9        |[{59, 10.17369}, {52, 8.40568}, {5, 7.4064507}, {21, 4.994644}, {16, 4.8975425}]   |
|10       |[{16, 10.356836}, {52, 8.9552

In [None]:

itemRecs = model.recommendForAllItems(5)

itemRecs.show(truncate=False)

#For example, for category 20, the top 5 recommended users along with their predicted ratings are:
#User 3600 with a predicted rating of 2.9177592

#he higher the predicted rating, the more likely the item is to be recommended to users.

+--------+-----------------------------------------------------------------------------------------------+
|category|recommendations                                                                                |
+--------+-----------------------------------------------------------------------------------------------+
|20      |[{3600, 2.9177592}, {1189, 2.9177592}, {3027, 2.8641233}, {5209, 2.8195026}, {3177, 2.7929013}]|
|40      |[{4267, 2.7813814}, {4161, 2.7813814}, {3622, 2.7813814}, {1015, 2.7813814}, {2, 2.7813814}]   |
|10      |[{3177, 9.113786}, {3061, 8.881903}, {3600, 8.799541}, {1189, 8.799541}, {2919, 8.745092}]     |
|50      |[{5598, 7.4100776}, {5495, 7.4100776}, {4843, 7.4100776}, {4625, 7.4100776}, {4587, 7.4100776}]|
|30      |[{4267, 2.4974804}, {4161, 2.4974804}, {3622, 2.4974804}, {1015, 2.4974804}, {2, 2.4974804}]   |
|31      |[{1313, 7.6703005}, {5403, 7.610588}, {1193, 7.0833397}, {853, 6.9105415}, {3746, 6.910368}]   |
|41      |[{4866, 2.7422242}, {5370, 

In [None]:

predictions = model.transform(test_data)


evaluator = RegressionEvaluator(metricName="rmse", labelCol="urgency", predictionCol="prediction")

# RMSE
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) =", rmse)

Root Mean Squared Error (RMSE) = 0.19928682949698487


In [None]:

evaluator = RegressionEvaluator(metricName="mse", labelCol="urgency", predictionCol="prediction")

# MSE
mse = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE):", mse)

Mean Squared Error (MSE): 0.03971524041096032
