In [14]:
# Intialization
import os
import sys


os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'

In [15]:
#Entrypoint 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Final Project").enableHiveSupport().getOrCreate()

sc = spark.sparkContext

# Loading the Raw Datasets

In [26]:
df=spark.read.format('parquet').load("file:///home/talentum/FinalTable/Parq/FReviewTable.parquet")
df_only_restors=spark.read.format('parquet').load('file:///home/talentum/FinalTable/Parq/FBusinessPar.parquet')

# Alphanumeric To Numeric Conversion

In [27]:
#Alplphanumeric to numeric

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

# Get distinct alphanumeric values and create a mapping
unique_users = df.select("user_id").distinct().rdd.flatMap(lambda x: x).collect()
unique_items = df.select("business_id").distinct().rdd.flatMap(lambda x: x).collect()

user_mapping = {user: idx for idx, user in enumerate(unique_users)}
item_mapping = {item: idx for idx, item in enumerate(unique_items)}

# Define UDFs to map alphanumeric values to numeric
def map_user(user):
    return user_mapping.get(user, -1)

def map_item(item):
    return item_mapping.get(item, -1)

udf_map_user = udf(map_user, IntegerType())
udf_map_item = udf(map_item, IntegerType())

# Apply UDFs to convert alphanumeric values to numeric
df_numeric = df.withColumn("userId_numeric", udf_map_user(col("user_id"))) \
               .withColumn("itemId_numeric", udf_map_item(col("business_id")))




In [28]:
# checking the final count 
print("DataFrame with Numeric Values and Other Columns:")
df_numeric.count()

DataFrame with Numeric Values and Other Columns:


800000

In [29]:
# checking the columns
print("DataFrame with Numeric Values and Other Columns:")
df_numeric.columns

DataFrame with Numeric Values and Other Columns:


['business_id',
 'review_id',
 'user_id',
 'stars',
 'useful',
 'text',
 'date',
 'name',
 'address',
 'city',
 'state',
 'latitude',
 'longitude',
 'review_count',
 'is_open',
 'categories',
 'yelping_since',
 'average_stars',
 'userId_numeric',
 'itemId_numeric']

#  Extracting only required columns

In [30]:
df_numeric=df_numeric.select("userId_numeric","itemId_numeric","stars")
df=df_numeric.coalesce(1)
df_only_users=spark.read.format('parquet').load('file:///home/talentum/FinalTable/Parq/fuser.parquet')

# Saving the output for Model training

In [22]:
#click here Shift enter 
df_u=df_only_users.select('userId_numeric','name').coalesce(1)

df_u.write.mode("overwrite").parquet('file:///home/talentum/Project/ReviewDf_name/')
df.write.mode("overwrite").parquet("file:///home/talentum/Project/ReviewDf/")

In [31]:
#checking the output
df.sort('userId_numeric').head(20)

[Row(userId_numeric=0, itemId_numeric=7452, stars=1.0),
 Row(userId_numeric=1, itemId_numeric=696, stars=5.0),
 Row(userId_numeric=2, itemId_numeric=51395, stars=4.0),
 Row(userId_numeric=3, itemId_numeric=11736, stars=5.0),
 Row(userId_numeric=4, itemId_numeric=40457, stars=1.0),
 Row(userId_numeric=5, itemId_numeric=24933, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=7126, stars=1.0),
 Row(userId_numeric=6, itemId_numeric=53821, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=43764, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=14466, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=16892, stars=1.0),
 Row(userId_numeric=6, itemId_numeric=40458, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=61551, stars=5.0),
 Row(userId_numeric=6, itemId_numeric=16892, stars=1.0),
 Row(userId_numeric=7, itemId_numeric=40459, stars=1.0),
 Row(userId_numeric=8, itemId_numeric=53489, stars=5.0),
 Row(userId_numeric=9, itemId_numeric=58241, stars=5.0),
 Row(userId_numeric=10, itemId_nume

# Training Model on saved Users

In [32]:
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.sql.functions import col

# Prepare the data for ALS
ratings = df.rdd.map(lambda row: Rating(row["userId_numeric"], row["itemId_numeric"], row["stars"]))

# Split data into training and test sets
#training_rdd, test_rdd = ratings.randomSplit([0.8, 0.2])

# Train ALS model
rank = 10
numIterations = 10
block_size=-1 # parallel computing(relying on  systems default)
model = ALS.train(ratings, rank, numIterations,blocks=block_size)


# Saving Model 

In [None]:
#To save the trained Model
path ='file:///home/talentum/Project/Model'


#-------------------------------------------------------------------------

# Save the ALS model
model.save(sc,path)

