In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=44115d50515bd3b9bbeabf98c54e2115cdaf91a49d978a813d678f0ffebaadc7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.linalg import DenseVector
from pyspark.ml.stat import Correlation

spark = SparkSession.builder.appName("RecommenderSystem").getOrCreate()


In [None]:
df = spark.read.csv('/content/BDA-Project.csv', header=True, inferSchema=True)

In [None]:
df_filtered = df.filter(df['Churn Label'] == 'No')
churn_columns = ['Churn Label', 'Churn Category', 'Churn Reason']
df_filtered = df_filtered.drop(*churn_columns)


In [None]:
user_features = df_filtered.select(
    'Customer ID', 'Age', 'Gender', 'Senior Citizen', 'Married', 'Dependents', 'Number of Dependents',
    'Tenure in Months', 'Phone Service', 'Multiple Lines', 'Internet Service', 'Internet Type',
    'Online Security', 'Online Backup', 'Device Protection Plan', 'Premium Tech Support', 'Streaming TV',
    'Streaming Movies', 'Streaming Music', 'Unlimited Data', 'Contract'
)

In [None]:
indexers = [
    StringIndexer(inputCol=column, outputCol=column + "_index").fit(user_features)
    for column in ['Gender', 'Senior Citizen', 'Married', 'Dependents', 'Phone Service', 'Multiple Lines',
                   'Internet Service', 'Internet Type', 'Online Security', 'Online Backup',
                   'Device Protection Plan', 'Premium Tech Support', 'Streaming TV', 'Streaming Movies',
                   'Streaming Music', 'Unlimited Data', 'Contract']
]

for indexer in indexers:
    user_features = indexer.transform(user_features)

encoder = OneHotEncoder(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=[indexer.getOutputCol() + "_vec" for indexer in indexers]
)
user_features = encoder.fit(user_features).transform(user_features)
assembler = VectorAssembler(
    inputCols=['Age', 'Number of Dependents', 'Tenure in Months'] +
              [indexer.getOutputCol() + "_vec" for indexer in indexers],
    outputCol="features"
)
user_features = assembler.transform(user_features)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(user_features)
user_features = scaler_model.transform(user_features)


In [None]:
def cosine_similarity(vec1, vec2):
    dot_product = vec1.dot(vec2)
    norm_a = vec1.norm(2)
    norm_b = vec2.norm(2)
    return dot_product / (norm_a * norm_b)

user_features_rdd = user_features.select('Customer ID', 'scaled_features').rdd.map(lambda row: (row['Customer ID'], DenseVector(row['scaled_features'].toArray())))
user_features_dict = user_features_rdd.collectAsMap()
broadcast_user_features = spark.sparkContext.broadcast(user_features_dict)


In [None]:
def get_similar_users(user_id, broadcast_user_features, top_n=5):
    target_vector = broadcast_user_features.value[user_id]
    similarities = [(other_user_id, cosine_similarity(target_vector, other_vector))
                    for other_user_id, other_vector in broadcast_user_features.value.items()
                    if other_user_id != user_id]
    similarities.sort(key=lambda x: x[1], reverse=True)
    similar_users = [user_id for user_id, similarity in similarities[:top_n]]
    return similar_users


In [None]:
def recommend_services(user_id, df_filtered, broadcast_user_features, top_n=5):
    similar_users_ids = get_similar_users(user_id, broadcast_user_features, top_n)
    similar_users_data = df_filtered.filter(df_filtered['Customer ID'].isin(similar_users_ids))

    service_columns = ['Internet Type', 'Online Security', 'Online Backup', 'Device Protection Plan',
                       'Premium Tech Support', 'Streaming TV', 'Streaming Movies',
                       'Streaming Music', 'Unlimited Data', 'Contract']

    common_services = similar_users_data.groupBy(service_columns).count().orderBy('count', ascending=False).first()
    return common_services.asDict()


In [None]:
new_user_id = '1251-STYSZ'
recommended_services = recommend_services(new_user_id, df_filtered, broadcast_user_features)
print(recommended_services)

{'Internet Type': 'Fiber Optic', 'Online Security': 'Yes', 'Online Backup': 'No', 'Device Protection Plan': 'No', 'Premium Tech Support': 'No', 'Streaming TV': 'No', 'Streaming Movies': 'No', 'Streaming Music': 'No', 'Unlimited Data': 'Yes', 'Contract': 'One Year', 'count': 1}


In [None]:
new_user_id = '4482-EWFMI'
recommended_services = recommend_services(new_user_id, df_filtered, broadcast_user_features)
print(recommended_services)

{'Internet Type': 'Fiber Optic', 'Online Security': 'No', 'Online Backup': 'No', 'Device Protection Plan': 'No', 'Premium Tech Support': 'No', 'Streaming TV': 'No', 'Streaming Movies': 'No', 'Streaming Music': 'No', 'Unlimited Data': 'Yes', 'Contract': 'Month-to-Month', 'count': 5}
