<a href="https://colab.research.google.com/github/Nirmal0009/PySpark---Recommendation-System/blob/main/Insurance_Recommendation_System_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Core Recommendation Engine Using Cosine Similarity in PySpark Using Colab Notebook**

In this project, we build a recommendation engine using cosine similarity to suggest policies to customers based on their income ranges. PySpark is utilized to handle large datasets efficiently and perform the necessary computations.

Here’s a summary of the libraries and their functionalities used:



*   **pyspark.sql.SparkSession**: Provides the entry point to interact with Spark and creates DataFrame objects.
*   pyspark.sql.functions.col: Allows for column operations such as filtering and selecting specific columns.

*   **pyspark.ml.feature.VectorAssembler**: Converts feature columns into a vector format, which is necessary for machine learning algorithms.

*   **pyspark.ml.linalg.Vectors**: Provides tools to create dense and sparse vectors required for cosine similarity calculations.
*   numpy: Provides support for numerical operations and array handling, though it's not directly used in this particular code.


*   **pyspark.sql.functions.collect_list**: Aggregates values into a list, useful for collecting results from grouped data.

*   **pyspark.sql.functions.udf**: Allows the creation of user-defined functions to perform custom operations on DataFrame columns.

*   **pyspark.sql.types.DoubleType**: Defines the data type for columns that contain double precision floating-point numbers, used for similarity scores.
*   **pyspark.sql.functions.col**: Allows for column operations such as filtering and selecting specific columns.


*   **pyspark.ml.feature.VectorAssembler**: Converts feature columns into a vector format, which is necessary for machine learning algorithms.

*   **pyspark.ml.linalg.Vectors**: Provides tools to create dense and sparse vectors required for cosine similarity calculations.

*   **numpy**: Provides support for numerical operations and array handling, though it's not directly used in this particular code.
*   pyspark.sql.functions.collect_list: Aggregates values into a list, useful for collecting results from grouped data.



*   **pyspark.sql.functions.udf**: Allows the creation of user-defined functions to perform custom operations on DataFrame columns.
*   **pyspark.sql.types.DoubleType**: Defines the data type for columns that contain double precision floating-point numbers, used for similarity scores.




This setup efficiently computes similarity scores and recommends policies based on the cosine similarity of income ranges, leveraging the distributed computing power of PySpark.

In [3]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=b218b48d3206bb26252925d3e7ecdb79f26d53a750db3c75f56a116b69fcce0b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [4]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
import numpy as np
from pyspark.sql.functions import collect_list

In [2]:
#Initialize a Spark session

spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [3]:
#Use PySpark to read the CSV file from your Google Drive
file_path = '/content/drive/MyDrive/Dataset.csv'  # Update this with the actual file path
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the first few rows of the dataframe
df.show(5)

+-----------+---+------+--------------+------------+------------+-----------------+----------------------+--------+---------------+----------------+-----------------+-------------------+-------------+----------------------------------+------------------------+---------------+--------------+----------+-----------+--------------------+-------------------------------+----------------------+------------------+------------+-----------------------+------------+----------------+-----------+------------------+
|Customer ID|Age|Gender|Marital Status|  Occupation|Income Level|  Education Level|Geographic Information|Location|Behavioral Data|Purchase History|Policy Start Date|Policy Renewal Date|Claim History|Interactions with Customer Service|Insurance Products Owned|Coverage Amount|Premium Amount|Deductible|Policy Type|Customer Preferences|Preferred Communication Channel|Preferred Contact Time|Preferred Language|Risk Profile|Previous Claims History|Credit Score|  Driving Record|Life Events|Segm

In [4]:
columns = ['Customer ID','Income Level','Behavioral Data']
df = df[columns]

df.show()

+-----------+------------+---------------+
|Customer ID|Income Level|Behavioral Data|
+-----------+------------+---------------+
|      84966|       70541|        policy5|
|      95568|       54168|        policy5|
|      10544|       73899|        policy5|
|      77033|       63381|        policy5|
|      88160|       38794|        policy1|
|      60937|       87188|        policy5|
|      37676|       94891|        policy3|
|      54100|       61003|        policy1|
|      30476|      116249|        policy1|
|      39071|       49083|        policy3|
|      38477|       62099|        policy2|
|      38329|      110698|        policy4|
|       6580|       90301|        policy5|
|      67128|       24648|        policy3|
|      34814|      105009|        policy3|
|      86048|       29783|        policy1|
|      24881|      114236|        policy3|
|      24076|      107359|        policy2|
|      38654|       49224|        policy5|
|      47840|      117264|        policy4|
+----------

In [5]:
pivot_df = df.groupBy("Customer ID").pivot("Behavioral Data").sum("Income Level").fillna(0)
pivot_df.show()

+-----------+-------+-------+-------+-------+-------+
|Customer ID|policy1|policy2|policy3|policy4|policy5|
+-----------+-------+-------+-------+-------+-------+
|      43935|      0| 266631|      0|      0|      0|
|       3794| 133591|      0|      0|      0| 139525|
|      66176|      0| 182902|      0|      0|      0|
|      11033|      0|      0|  35779| 106236|      0|
|       9427|      0| 111693|      0|      0|      0|
|      75122| 142537|  26899|      0|  84393|      0|
|      87462|      0|      0|      0|      0| 122320|
|      65478|  41347|      0|      0|      0| 118079|
|      89874|      0|  90825|      0| 100429|      0|
|      71510| 127793|      0|      0|      0|      0|
|       1580|      0| 125419| 105686|      0|      0|
|      73683|      0|  52204|      0|      0|      0|
|      89878|  37081|      0|      0|      0|      0|
|      96853|      0| 113491|      0|      0| 118528|
|      71527|      0|      0| 102112|      0|      0|
|       3918|      0| 112358

In [6]:
# Convert the pivoted DataFrame into feature vectors as its required for cosine similarity
assembler = VectorAssembler(inputCols=pivot_df.columns[1:], outputCol="features")
pivot_df = assembler.transform(pivot_df)
pivot_df.select("features").show(truncate=False)

+----------------------------------+
|features                          |
+----------------------------------+
|(5,[1],[266631.0])                |
|(5,[0,4],[133591.0,139525.0])     |
|(5,[1],[182902.0])                |
|(5,[2,3],[35779.0,106236.0])      |
|(5,[1],[111693.0])                |
|[142537.0,26899.0,0.0,84393.0,0.0]|
|(5,[4],[122320.0])                |
|(5,[0,4],[41347.0,118079.0])      |
|(5,[1,3],[90825.0,100429.0])      |
|(5,[0],[127793.0])                |
|(5,[1,2],[125419.0,105686.0])     |
|(5,[1],[52204.0])                 |
|(5,[0],[37081.0])                 |
|(5,[1,4],[113491.0,118528.0])     |
|(5,[2],[102112.0])                |
|(5,[1,2],[112358.0,116143.0])     |
|(5,[0,1],[43813.0,39857.0])       |
|(5,[2],[35122.0])                 |
|(5,[1],[144474.0])                |
|(5,[0],[24489.0])                 |
+----------------------------------+
only showing top 20 rows



In [7]:
# Create a UDF for cosine similarity - To apply cosine similarity function within Spark DataFrame operations.
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

In [8]:
# Compute cosine similarity - To measure similarity between clients.
def cosine_similarity(vec1, vec2):
    dot_product = np.dot(vec1, vec2)
    norm_a = np.linalg.norm(vec1)
    norm_b = np.linalg.norm(vec2)
    return dot_product / (norm_a * norm_b)


#Apply cosine similarity function within Spark DataFrame operations. (UDF - User defind function)
cosine_similarity_udf = udf(lambda x, y: float(cosine_similarity(x.toArray(), y.toArray())), DoubleType())




In [9]:
# Cross joining to compute similarity between all clients
cross_joined_df = pivot_df.alias("df1").crossJoin(pivot_df.alias("df2"))

cross_joined_df.show(5)

+-----------+-------+-------+-------+-------+-------+------------------+-----------+-------+-------+-------+-------+-------+--------------------+
|Customer ID|policy1|policy2|policy3|policy4|policy5|          features|Customer ID|policy1|policy2|policy3|policy4|policy5|            features|
+-----------+-------+-------+-------+-------+-------+------------------+-----------+-------+-------+-------+-------+-------+--------------------+
|      43935|      0| 266631|      0|      0|      0|(5,[1],[266631.0])|      43935|      0| 266631|      0|      0|      0|  (5,[1],[266631.0])|
|      43935|      0| 266631|      0|      0|      0|(5,[1],[266631.0])|       3794| 133591|      0|      0|      0| 139525|(5,[0,4],[133591....|
|      43935|      0| 266631|      0|      0|      0|(5,[1],[266631.0])|      66176|      0| 182902|      0|      0|      0|  (5,[1],[182902.0])|
|      43935|      0| 266631|      0|      0|      0|(5,[1],[266631.0])|      11033|      0|      0|  35779| 106236|      0|

In [10]:
# Adding similarity column using cosine_similarity_udf lambda func
similarity_df = cross_joined_df.withColumn(
    "similarity",
    cosine_similarity_udf(col("df1.features"), col("df2.features"))).filter(col("df1.Customer ID") != col("df2.Customer ID"))

similarity_df.show()

+-----------+-------+-------+-------+-------+-------+------------------+-----------+-------+-------+-------+-------+-------+--------------------+------------------+
|Customer ID|policy1|policy2|policy3|policy4|policy5|          features|Customer ID|policy1|policy2|policy3|policy4|policy5|            features|        similarity|
+-----------+-------+-------+-------+-------+-------+------------------+-----------+-------+-------+-------+-------+-------+--------------------+------------------+
|      43935|      0| 266631|      0|      0|      0|(5,[1],[266631.0])|       3794| 133591|      0|      0|      0| 139525|(5,[0,4],[133591....|               0.0|
|      43935|      0| 266631|      0|      0|      0|(5,[1],[266631.0])|      66176|      0| 182902|      0|      0|      0|  (5,[1],[182902.0])|               1.0|
|      43935|      0| 266631|      0|      0|      0|(5,[1],[266631.0])|      11033|      0|      0|  35779| 106236|      0|(5,[2,3],[35779.0...|               0.0|
|      439

In [49]:
# Function to recommend policies
def recommend_policies(client_id):
    # Listing out the number of most similar clients where similarity score is considered to be greater than 0.5 (flexible)
    similar_clients = similarity_df.filter((col("df1.Customer ID") == client_id) & (col("similarity") > 0.5))\
                               .select(col("df2.Customer ID")).rdd.flatMap(lambda x: x).collect()

    # Filtering the dataframe with the similar clients
    similar_clients_policies = df.filter(col("Customer ID").isin(similar_clients))

    # Excluding policies that the input client already has
    existing_policies = df.filter(col("Customer ID") == client_id).select("Behavioral Data").rdd.flatMap(lambda x: x).collect()

    #Recommending the missing policies as per similar client's suggestion
    recommendations = similar_clients_policies.filter(~col("Behavioral Data").isin(existing_policies)).select("Behavioral Data").distinct()

    # Return recommended policies
    return recommendations.rdd.flatMap(lambda x: x).collect()

In [50]:
recommend_policies(43935)

['policy1', 'policy5', 'policy4', 'policy3']

## THANK YOU