In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Final Project").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
df=spark.read.format('parquet').load("file:///home/talentum/FinalTable/Parq/FReviewTable.parquet")

In [4]:
df_only_restors=spark.read.format('parquet').load('file:///home/talentum/FinalTable/Parq/FBusinessPar.parquet')

In [5]:
#Alplphanumeric to numeric

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

# Get distinct alphanumeric values and create a mapping
unique_users = df.select("user_id").distinct().rdd.flatMap(lambda x: x).collect()
unique_items = df.select("business_id").distinct().rdd.flatMap(lambda x: x).collect()

user_mapping = {user: idx for idx, user in enumerate(unique_users)}
item_mapping = {item: idx for idx, item in enumerate(unique_items)}

# Define UDFs to map alphanumeric values to numeric
def map_user(user):
    return user_mapping.get(user, -1)

def map_item(item):
    return item_mapping.get(item, -1)

udf_map_user = udf(map_user, IntegerType())
udf_map_item = udf(map_item, IntegerType())

# Apply UDFs to convert alphanumeric values to numeric
df_numeric = df.withColumn("userId_numeric", udf_map_user(col("user_id"))) \
               .withColumn("itemId_numeric", udf_map_item(col("business_id")))




In [6]:
# Show converted DataFrame with numeric values and other columns
print("DataFrame with Numeric Values and Other Columns:")
df_numeric.count()

DataFrame with Numeric Values and Other Columns:


800000

In [7]:
# Show converted DataFrame with numeric values and other columns
print("DataFrame with Numeric Values and Other Columns:")
df_numeric.columns

DataFrame with Numeric Values and Other Columns:


['business_id',
 'review_id',
 'user_id',
 'stars',
 'useful',
 'text',
 'date',
 'name',
 'address',
 'city',
 'state',
 'latitude',
 'longitude',
 'review_count',
 'is_open',
 'categories',
 'yelping_since',
 'average_stars',
 'userId_numeric',
 'itemId_numeric']

In [8]:
df_numeric=df_numeric.select("userId_numeric","itemId_numeric","stars")
df=df_numeric.coalesce(1)
df_only_users=spark.read.format('parquet').load('file:///home/talentum/FinalTable/Parq/fuser.parquet')

In [9]:
#click here Shift enter 
df_u=df_only_users.select('userId_numeric','name').coalesce(1)

df_u.write.mode("overwrite").parquet('file:///home/talentum/Project/ReviewDf_name/')
df.write.mode("overwrite").parquet("file:///home/talentum/Project/ReviewDf/")

In [170]:
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#df=spark.read.format('parquet').load("file:///home/talentum/Project/ReviewDf")


In [78]:
df.sort('userId_numeric').head(20)

[Row(userId_numeric=0, itemId_numeric=7452, stars=1.0),
 Row(userId_numeric=1, itemId_numeric=696, stars=5.0),
 Row(userId_numeric=2, itemId_numeric=51395, stars=4.0),
 Row(userId_numeric=3, itemId_numeric=11736, stars=5.0),
 Row(userId_numeric=4, itemId_numeric=40457, stars=1.0),
 Row(userId_numeric=5, itemId_numeric=24933, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=7126, stars=1.0),
 Row(userId_numeric=6, itemId_numeric=53821, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=43764, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=14466, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=16892, stars=1.0),
 Row(userId_numeric=6, itemId_numeric=40458, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=61551, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=16892, stars=1.0),
 Row(userId_numeric=7, itemId_numeric=40459, stars=1.0),
 Row(userId_numeric=8, itemId_numeric=53489, stars=5.0),
 Row(userId_numeric=9, itemId_numeric=58241, stars=5.0),
 Row(userId_numeric=10, itemId_nume

In [151]:
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.sql.functions import col

# Prepare the data for ALS
ratings = df.rdd.map(lambda row: Rating(row["userId_numeric"], row["itemId_numeric"], row["stars"]))

# Split data into training and test sets
#training_rdd, test_rdd = ratings.randomSplit([0.8, 0.2])

# Train ALS model
rank = 10
numIterations = 10
block_size=-1 # parallel computing
model = ALS.train(ratings, rank, numIterations,blocks=block_size)


In [160]:
#To save the trained Model
path ='file:///home/talentum/Project/Model'

#path='dbfs:/mnt/ADLS/joinop/Model_ALS'# this is cloud stored model path 
#-------------------------------------------------------------------------

# Save the ALS model
model.save(sc,path)



In [62]:
#-------------------------------------------------------------------------
from pyspark.sql.functions import col
from pyspark.mllib.recommendation import MatrixFactorizationModel
model = MatrixFactorizationModel.load(sc, path)

In [63]:
# Function to get top 50 recomendation RETURNS A DF OBJECT
from pyspark.sql import SparkSession
from pyspark.sql import Row

def get_top_n_recommendations(user_id, n=50):
    # Get recommendations from the model
    recommendations = model.recommendProducts(user_id, n)
    
    # Convert the recommendations to a list of Rows
    rows = [Row(user_id=user_id, itemId_numeric=r.product, rating=r.rating) for r in recommendations]
    
    # Create a Spark DataFrame from the list of Rows
    recommendations_df = spark.createDataFrame(rows)
    
    return recommendations_df


In [75]:
df_numeric

800001

In [73]:
user_id_numeric = 11
#u_city = "norristown"
u_state = 'PA'
u_category = ''
u_nr=5

# Get the top N recommendations for the user
recommendations_df = get_top_n_recommendations(user_id_numeric, n=50)

# Filter the main DataFrame using the recommendations
filtered_df = df_only_restors \
    .filter((col('state').rlike(f"(?i).*{u_state}.*"))& 
            (col('categories').rlike(f"(?i).*{u_category}.*")))

# Join the recommendations with the filtered DataFrame
result_df = filtered_df.join(recommendations_df, on='itemId_numeric', how='inner')

# Collect the top 5 results based on the rating
top_5_results = result_df.orderBy(col("rating").desc()).limit(u_nr)

# Show the top 5 recommendations
top_5_results.select('itemId_numeric','name','address', 'city',
 'state','categories','stars').show()

+--------------+--------------------+--------------------+------------+-----+--------------------+-----+
|itemId_numeric|                name|             address|        city|state|          categories|stars|
+--------------+--------------------+--------------------+------------+-----+--------------------+-----+
|         40620|Mesa Fresh Mexica...|Marple Crossroads...| Springfield|   PA|Restaurants, Burg...|  3.5|
|         49848|Hybrid Planet Cha...|                    |   Havertown|   PA|Hotels & Travel, ...|  5.0|
|         28493|     Texas Roadhouse|1205 E Lancaster Ave| Downingtown|   PA|Chicken Wings, Bu...|  3.0|
|         49289|Rangoli: Vibrant ...| 10863 Bustleton Ave|Philadelphia|   PA|Pakistani, Specia...|  4.0|
|          9141|White Jasmin Thai...|      5 West Main St|    Lansdale|   PA|Thai, Soup, Noodl...|  4.5|
+--------------+--------------------+--------------------+------------+-----+--------------------+-----+

