<a href="https://colab.research.google.com/github/afislonge/BizRecProject/blob/dev/Loading_and_runnnin_second_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# code to drive mount


from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


model 1


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col, expr, radians, lit, row_number, sqrt, sin, cos, asin,power
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler

class RestaurantRecommenderPredictor:
    def __init__(self, spark_session):
        """
        Initialize the Restaurant Recommender Predictor
        """
        self.spark = spark_session
        self.df = None
        self.kmeans_model = None
        self.cuisine_indexer = None
        self.vector_assembler = None
        self.scaler = None

    def load_data(self, data_path):
        """
        Load and preprocess restaurant data
        """
        # Read the CSV file
        self.df = self.spark.read.csv(data_path, header=True, inferSchema=True)

        # Data cleaning and preprocessing
        self.df = self.df.na.drop()  # Remove rows with null values

        # Convert boolean columns
        self.df = (self.df.withColumn("has_parking",
                    expr("CASE WHEN parking = 'Yes' THEN true ELSE false END"))
                   .withColumn("has_wifi",
                    expr("CASE WHEN WiFi = 'Yes' THEN true ELSE false END")))

        # Encode categorical variables
        self.cuisine_indexer = StringIndexer(
            inputCol="cuisine_type",
            outputCol="cuisine_type_encoded"
        )
        self.df = self.cuisine_indexer.fit(self.df).transform(self.df)

        return self.df

    def load_saved_model(self, model_path):
        """
        Load the saved PySpark Pipeline Model
        """
        try:
            # Load the entire pipeline model
            self.kmeans_model = PipelineModel.load(model_path)

            # Extract specific stages from the pipeline
            stages = self.kmeans_model.stages

            # Find and store vector assembler and scaler
            for stage in stages:
                if isinstance(stage, VectorAssembler):
                    self.vector_assembler = stage
                elif isinstance(stage, StandardScaler):
                    self.scaler = stage

            return self.kmeans_model
        except Exception as e:
            print(f"Error loading model: {e}")
            return None

    def haversine_distance(self, lat1, lon1, lat2, lon2):
        """
        Calculate distance between two geographical points
        """
        lat1_rad = radians(lit(lat1))
        lon1_rad = radians(lit(lon1))
        lat2_rad = radians(lat2)
        lon2_rad = radians(lon2)

        return (
            6371 * 2 * asin(
                sqrt(
                    sin((lat2_rad - lat1_rad) / 2) ** 2 +
                    cos(lat1_rad) * cos(lat2_rad) *
                    sin((lon2_rad - lon1_rad) / 2) ** 2
                )
            )
        )

    def recommend_restaurants(
        self,
        user_location,
        min_rating,
        need_parking,
        need_wifi,
        cuisine_type,
        max_distance=60
    ):
        """
        Recommend restaurants based on user preferences
        """
        user_lat, user_lon = user_location

        # Apply initial filters
        filtered_df = self.df.filter(
            (col("rating") >= min_rating) &
            (col("cuisine_type") == cuisine_type)
        )

        # Apply parking filter
        if need_parking:
            filtered_df = filtered_df.filter(col("has_parking") == True)

        # Apply WiFi filter
        if need_wifi:
            filtered_df = filtered_df.filter(col("has_wifi") == True)

        # Add distance column
        with_distance_df = filtered_df.withColumn(
            "distance",
            self.haversine_distance(user_lat, user_lon, col("latitude"), col("longitude"))
        )

        # Filter by distance
        nearby_restaurants = with_distance_df.filter(
            col("distance") <= max_distance
        )

        # Rank restaurants
        window_spec = Window.partitionBy("bus_name").orderBy(col("rating").desc())

        recommended_restaurants = (
            nearby_restaurants
            .withColumn("rank", row_number().over(window_spec))
            .filter(col("rank") == 1)
            .orderBy(col("rating").desc())
            .select("business_id", "bus_name", "address", "rating", "distance", "cuisine_type")
            .limit(5)
        )

        return recommended_restaurants

def interactive_recommendation(predictor):
    """
    Interactive restaurant recommendation interface
    """
    print("Restaurant Recommender System")

    # Get user inputs
    user_lat = float(input("Enter your latitude: "))
    user_lon = float(input("Enter your longitude: "))
    min_rating = float(input("Enter minimum rating (0-5): "))
    need_parking = input("Need parking? (yes/no): ").lower() == 'yes'
    need_wifi = input("Need WiFi? (yes/no): ").lower() == 'yes'
    cuisine_type = input("Enter cuisine type: ")

    # Make recommendations
    recommendations = predictor.recommend_restaurants(
        user_location=(user_lat, user_lon),
        min_rating=min_rating,
        need_parking=need_parking,
        need_wifi=need_wifi,
        cuisine_type=cuisine_type
    )

    # Show recommendations
    recommendations.show()

def main():
    # Create Spark Session
    spark = SparkSession.builder \
        .appName("RestaurantRecommenderPredictor") \
        .getOrCreate()

    # Define paths
    model_path = '/content/drive/MyDrive/Models/restaurant_recommender_Final_model'
    data_path = '/content/drive/MyDrive/ProcessedCSV/Recommender_System_Newdata.csv'

    try:
        # Initialize predictor
        predictor = RestaurantRecommenderPredictor(spark)

        # Load dataset
        predictor.load_data(data_path)

        # Load saved model
        loaded_model = predictor.load_saved_model(model_path)

        if loaded_model:
            # Start interactive recommendation
            interactive_recommendation(predictor)
        else:
            print("Failed to load the model.")

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        spark.stop()

if __name__ == "__main__":
    main()

Restaurant Recommender System
Enter your latitude: 39.9493338
Enter your longitude: -75.1661758
Enter minimum rating (0-5): 3.5
Need parking? (yes/no): No
Need WiFi? (yes/no): No
Enter cuisine type: American
+--------------------+--------------------+--------------------+------+------------------+------------+
|         business_id|            bus_name|             address|rating|          distance|cuisine_type|
+--------------------+--------------------+--------------------+------+------------------+------------+
|TozkPEh-xhts3qfVe...|         Bad Brother|       726 N 24th St|   5.0|2.3317139167823364|    American|
|mVEqN8IPU-vCeKTXC...|      Kensington Pub|     2116 E Tioga St|   5.0|  7.47285437409349|    American|
|MwrBRpXMRNolVkaL3...|Bauman Family App...|116 Hoffmansville Rd|   5.0|55.545288742483265|    American|
|7SHh7-aWuAVlp3YKa...| Edwards-Freeman Nut|     441 E Hector St|   5.0|17.803916204265626|    American|
|9WY5jCB2IjnOizmgz...|             Freshii|     450 S Lenola Rd|

model 2

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when
import pandas as pd
from pyspark.ml import PipelineModel


# Load the saved model
loaded_model = PipelineModel.load("/content/drive/MyDrive/topic_senti_model")
print("Model loaded successfully!")
# Initialize Spark session
spark = SparkSession.builder.master("local[*]").appName("Business Prediction").getOrCreate()

# Sample out from model_1(from the provided data)
data = {
    "business_id": [
       "rQyJXOiZ39eRJ2l9OMpw-g",
       "jzsspHqP9kATWji8xUhNFw"
    ],
    "bus_name": [
        "Bad Brother",
        "Kensington Pub"
    ],
    "address": [
        "726 N 24th St",
        "2116 E Tioga St"
    ],
    "rating": [5.0, 5.0],
    "distance": [
        2.3317139167823364,
        7.47285437409349
    ],
    "cuisine_type": ["American", "American"]
}

# Convert the Pandas DataFrame to a PySpark DataFrame - this is only for experimental purpose please load output dataframe here.
first_model_predictions_df = pd.DataFrame(data)
Recomendation_df = spark.createDataFrame(first_model_predictions_df)

# Step 1: Extract all business IDs from the first model's predictions DataFrame
business_ids_from_first_model = Recomendation_df.select("business_id").distinct().rdd.flatMap(lambda x: x).collect()

# Step 2: Use the extracted business IDs in the second model
df = spark.read.csv("/content/drive/MyDrive/ProcessedCSV/Recommender_System_Newdata.csv", header=True, inferSchema=True).select("business_id", "rev_text")

# Filter the Data for the Selected Business IDs
test_data_subset = df.filter(col("business_id").isin(business_ids_from_first_model))

# Step 3: Transform the subset using the trained second model
subset_predictions = loaded_model.transform(test_data_subset)

# Step 4: Rename prediction column for clarity
subset_predictions = subset_predictions.withColumnRenamed("prediction", "sentimentPrediction")

# Show Predictions for the subset (remove comment to inspect output)
# subset_predictions.select("business_id", "rev_text", "sentimentPrediction").show(truncate=False)

# Step 5: Optional: Evaluate accuracy on the subset
# Use 'sentimentPrediction' in place of 'sentimentIndex'
# Step 5: Optional: Evaluate accuracy on the subset
# Use 'sentimentPrediction' in place of 'sentimentIndex'
if 'evaluator' in locals():  # Check if evaluator is defined
    # Update the evaluator's labelCol and predictionCol to match the renamed columns
    evaluator.setLabelCol("sentimentPrediction")  # Set labelCol to "sentimentPrediction"
    evaluator.setPredictionCol("sentimentPrediction")  # Set predictionCol to "sentimentPrediction"
    subset_accuracy = evaluator.evaluate(subset_predictions)
    print(f"Accuracy on Subset: {subset_accuracy:.2f}")
else:
    print("Evaluator not defined")

# Step 6: Sentiment Percentages for the Subset (if needed)
subset_sentiment_count = subset_predictions.groupBy("business_id", "sentimentPrediction").agg(count("sentimentPrediction").alias("sentiment_count"))
subset_total_reviews = subset_predictions.groupBy("business_id").agg(count("rev_text").alias("total_reviews"))

subset_sentiment_percentage = subset_sentiment_count.join(subset_total_reviews, on="business_id")
subset_sentiment_percentage = subset_sentiment_percentage.withColumn(
    "sentiment_percentage",
    (col("sentiment_count") / col("total_reviews") * 100)
)

# Show Sentiment Percentages for the Subset
print("Sentiment Percentages for Selected Businesses:")
subset_sentiment_percentage.show(truncate=False)

# Step 7: Topic Percentages for the Subset
# subset_topic_count = subset_predictions.groupBy("business_id", "topic").agg(count("topic").alias("topic_count"))
# subset_topic_percentage = subset_topic_count.join(subset_total_reviews, on="business_id")
# subset_topic_percentage = subset_topic_percentage.withColumn(
#     "topic_percentage",
#     (col("topic_count") / col("total_reviews") * 100)
# )

# # Show Topic Percentages for the Subset
# print("Topic Percentages for Selected Businesses:")
# subset_topic_percentage.show(truncate=False)
# Add a 'topic' column (if not already present)
subset_predictions = subset_predictions.withColumn(
    "topic",
    when(col("rev_text").contains("food"), "food_quality")
    .when(col("rev_text").contains("service"), "customer_service")
    .when(col("rev_text").contains("clean"), "cleanliness")
    .otherwise("other")
)

# Topic Percentages
subset_topic_count = subset_predictions.groupBy("business_id", "topic").agg(count("topic").alias("topic_count"))
subset_topic_percentage = subset_topic_count.join(subset_total_reviews, on="business_id")
subset_topic_percentage = subset_topic_percentage.withColumn(
    "topic_percentage",
    (col("topic_count") / col("total_reviews") * 100)
)

# Show Topic Percentages
print("Topic Percentages for Selected Businesses:")
subset_topic_percentage.show(truncate=False)



Model loaded successfully!
Evaluator not defined
Sentiment Percentages for Selected Businesses:
+----------------------+-------------------+---------------+-------------+--------------------+
|business_id           |sentimentPrediction|sentiment_count|total_reviews|sentiment_percentage|
+----------------------+-------------------+---------------+-------------+--------------------+
|rQyJXOiZ39eRJ2l9OMpw-g|0.0                |79             |169          |46.74556213017752   |
|rQyJXOiZ39eRJ2l9OMpw-g|1.0                |90             |169          |53.25443786982249   |
|jzsspHqP9kATWji8xUhNFw|0.0                |94             |145          |64.82758620689654   |
|jzsspHqP9kATWji8xUhNFw|1.0                |51             |145          |35.172413793103445  |
+----------------------+-------------------+---------------+-------------+--------------------+

Topic Percentages for Selected Businesses:
+----------------------+----------------+-----------+-------------+------------------+
|busi

integrated model

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, radians, lit, row_number, sqrt, sin, cos, asin, power, count, when
from pyspark.sql.window import Window
from pyspark.ml import PipelineModel
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
import pandas as pd

class RestaurantRecommenderPredictor:
    def __init__(self, spark_session):
        """
        Initialize the Restaurant Recommender Predictor
        """
        self.spark = spark_session
        self.df = None
        self.kmeans_model = None
        self.cuisine_indexer = None
        self.vector_assembler = None
        self.scaler = None

    def load_data(self, data_path):
        """
        Load and preprocess restaurant data
        """
        # Read the CSV file
        self.df = self.spark.read.csv(data_path, header=True, inferSchema=True)

        # Data cleaning and preprocessing
        self.df = self.df.na.drop()  # Remove rows with null values

        # Convert boolean columns
        self.df = (self.df.withColumn("has_parking",
                    expr("CASE WHEN parking = 'Yes' THEN true ELSE false END"))
                   .withColumn("has_wifi",
                    expr("CASE WHEN WiFi = 'Yes' THEN true ELSE false END")))

        # Encode categorical variables
        self.cuisine_indexer = StringIndexer(
            inputCol="cuisine_type",
            outputCol="cuisine_type_encoded"
        )
        self.df = self.cuisine_indexer.fit(self.df).transform(self.df)

        return self.df

    def load_saved_model(self, model_path):
        """
        Load the saved PySpark Pipeline Model
        """
        try:
            self.kmeans_model = PipelineModel.load(model_path)

            # Extract specific stages from the pipeline
            stages = self.kmeans_model.stages

            # Find and store vector assembler and scaler
            for stage in stages:
                if isinstance(stage, VectorAssembler):
                    self.vector_assembler = stage
                elif isinstance(stage, StandardScaler):
                    self.scaler = stage

            return self.kmeans_model
        except Exception as e:
            print(f"Error loading model: {e}")
            return None

    def haversine_distance(self, lat1, lon1, lat2, lon2):
        """
        Calculate distance between two geographical points
        """
        lat1_rad = radians(lit(lat1))
        lon1_rad = radians(lit(lon1))
        lat2_rad = radians(lat2)
        lon2_rad = radians(lon2)

        return (
            6371 * 2 * asin(
                sqrt(
                    sin((lat2_rad - lat1_rad) / 2) ** 2 +
                    cos(lat1_rad) * cos(lat2_rad) *
                    sin((lon2_rad - lon1_rad) / 2) ** 2
                )
            )
        )

    def recommend_restaurants(
        self,
        user_location,
        min_rating,
        need_parking,
        need_wifi,
        cuisine_type,
        max_distance=60
    ):
        """
        Recommend restaurants based on user preferences
        """
        user_lat, user_lon = user_location

        # Apply initial filters
        filtered_df = self.df.filter(
            (col("rating") >= min_rating) &
            (col("cuisine_type") == cuisine_type)
        )

        # Apply parking filter
        if need_parking:
            filtered_df = filtered_df.filter(col("has_parking") == True)

        # Apply WiFi filter
        if need_wifi:
            filtered_df = filtered_df.filter(col("has_wifi") == True)

        # Add distance column
        with_distance_df = filtered_df.withColumn(
            "distance",
            self.haversine_distance(user_lat, user_lon, col("latitude"), col("longitude"))
        )

        # Filter by distance
        nearby_restaurants = with_distance_df.filter(
            col("distance") <= max_distance
        )

        # Rank restaurants
        window_spec = Window.partitionBy("bus_name").orderBy(col("rating").desc())

        recommended_restaurants = (
            nearby_restaurants
            .withColumn("rank", row_number().over(window_spec))
            .filter(col("rank") == 1)
            .orderBy(col("rating").desc())
            .select("business_id", "bus_name", "address", "rating", "distance", "cuisine_type")
            .limit(5)
        )

        return recommended_restaurants

    def analyze_sentiments_and_topics(self, recommendation_df, data_path, model_path):
        """
        Analyze sentiments and topics for recommended restaurants
        """
        # Load the saved model
        loaded_model = PipelineModel.load(model_path)

        # Load review data
        review_df = self.spark.read.csv(data_path, header=True, inferSchema=True).select("business_id", "rev_text")

        # Filter reviews for recommended restaurants
        business_ids = recommendation_df.select("business_id").distinct().rdd.flatMap(lambda x: x).collect()
        filtered_reviews = review_df.filter(col("business_id").isin(business_ids))

        # Transform reviews using the sentiment model
        predictions = loaded_model.transform(filtered_reviews)
        predictions = predictions.withColumnRenamed("prediction", "sentimentPrediction")

        # Calculate sentiment percentages
        sentiment_count = predictions.groupBy("business_id", "sentimentPrediction").agg(count("sentimentPrediction").alias("sentiment_count"))
        total_reviews = predictions.groupBy("business_id").agg(count("rev_text").alias("total_reviews"))

        sentiment_percentage = sentiment_count.join(total_reviews, on="business_id")
        sentiment_percentage = sentiment_percentage.withColumn(
            "sentiment_percentage",
            (col("sentiment_count") / col("total_reviews") * 100)
        )

        # Add topic analysis
        predictions = predictions.withColumn(
            "topic",
            when(col("rev_text").contains("food"), "food_quality")
            .when(col("rev_text").contains("service"), "customer_service")
            .when(col("rev_text").contains("clean"), "cleanliness")
            .otherwise("other")
        )

        topic_count = predictions.groupBy("business_id", "topic").agg(count("topic").alias("topic_count"))
        topic_percentage = topic_count.join(total_reviews, on="business_id")
        topic_percentage = topic_percentage.withColumn(
            "topic_percentage",
            (col("topic_count") / col("total_reviews") * 100)
        )

        return sentiment_percentage, topic_percentage

def main():
    # Create Spark Session
    spark = SparkSession.builder \
        .appName("Restaurant Recommender and Sentiment Analyzer") \
        .getOrCreate()

    # Define paths
    model_path = '/content/drive/MyDrive/Models/restaurant_recommender_Final_model'
    review_model_path = '/content/drive/MyDrive/topic_senti_model'
    data_path = '/content/drive/MyDrive/ProcessedCSV/Recommender_System_Newdata.csv'
    # review_data_path = '/content/drive/MyDrive/ProcessedCSV/Reviews.csv'

    try:
        # Initialize predictor
        predictor = RestaurantRecommenderPredictor(spark)

        # Load dataset
        predictor.load_data(data_path)

        # Load saved model
        loaded_model = predictor.load_saved_model(model_path)

        if loaded_model:
            # Interactive recommendation
            print("Restaurant Recommender System")

            user_lat = float(input("Enter your latitude: "))
            user_lon = float(input("Enter your longitude: "))
            min_rating = float(input("Enter minimum rating (0-5): "))
            need_parking = input("Need parking? (yes/no): ").lower() == 'yes'
            need_wifi = input("Need WiFi? (yes/no): ").lower() == 'yes'
            cuisine_type = input("Enter cuisine type: ")

            recommendations = predictor.recommend_restaurants(
                user_location=(user_lat, user_lon),
                min_rating=min_rating,
                need_parking=need_parking,
                need_wifi=need_wifi,
                cuisine_type=cuisine_type
            )

            print("Recommended Restaurants:")
            recommendations.show()

            # Analyze sentiments and topics
            sentiment_percentage, topic_percentage = predictor.analyze_sentiments_and_topics(
                recommendations, data_path, review_model_path
            )

            print("Sentiment Percentages:")
            sentiment_percentage.show()

            print("Topic Percentages:")
            topic_percentage.show()
        else:
            print("Failed to load the model.")

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        spark.stop()

if __name__ == "__main__":
    main()


Restaurant Recommender System
Enter your latitude: 39.9493338
Enter your longitude: -75.1661758
Enter minimum rating (0-5): 3.5
Need parking? (yes/no): No
Need WiFi? (yes/no): No
Enter cuisine type: American
Recommended Restaurants:
+--------------------+--------------------+--------------------+------+------------------+------------+
|         business_id|            bus_name|             address|rating|          distance|cuisine_type|
+--------------------+--------------------+--------------------+------+------------------+------------+
|TozkPEh-xhts3qfVe...|         Bad Brother|       726 N 24th St|   5.0|2.3317139167823364|    American|
|mVEqN8IPU-vCeKTXC...|      Kensington Pub|     2116 E Tioga St|   5.0|  7.47285437409349|    American|
|MwrBRpXMRNolVkaL3...|Bauman Family App...|116 Hoffmansville Rd|   5.0|55.545288742483265|    American|
|7SHh7-aWuAVlp3YKa...| Edwards-Freeman Nut|     441 E Hector St|   5.0|17.803916204265626|    American|
|9WY5jCB2IjnOizmgz...|             Fres