In [1]:
from __future__ import print_function

import sys
import numpy as np

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext


from operator import add
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.feature import StopWordsRemover, VectorAssembler
from pyspark.ml.feature import IDF
from pyspark.ml import Pipeline, PipelineModel

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F


In [2]:
def cosineSimilarity(vector1, vector2):
    '''
    Function to compute cosine similarity between 2 vectors
    -------------------------------------------------------
    Input: 
        Vector-1: Each restaurants feature vector 
        Vector-2: myRestaurant Feature vector 
                  for which recommendations to be made
    
    Returns: A similarity score (between 0 and 1)
    '''
    #  sum(A . B) / (sqrt(sum (A**2)  * sqrt(sum (B**2)))
    numerator = np.dot(vector1, vector2)
    denominator = np.sqrt(np.dot(vector1, vector1)) * np.sqrt(np.dot(vector2, vector2))
    
    return float(numerator/denominator)


In [3]:
def keyWordsRecommendation(keyword, all_business_tfidf):
    
    
    input_word_df = sc.parallelize([(0, keyword)]).toDF(['business_id', 'text'])
    
    # For getting recommendation based on a keywords, first we need to transform it 
    # into word vector. So, we will load our pipelined model that we saved earlier
    input_word_df = pipeline_model.transform(input_word_df)
    
    #To Get word2vectors data for this keyword
    input_word_tfidf = input_word_df.select('tfidf_vec').collect()[0][0]
    
    # To obtain similarity
    similar_restaurants_rdd = all_business_tfidf.map(lambda x: (x[0], cosineSimilarity(x[1], input_word_tfidf)))
    
    sim_rest_by_keyword = similar_restaurants_rdd.toDF(['business_id', 'similarity_score']) \
                                             .orderBy('similarity_score', ascending=False).limit(10)
    
    
    return sim_rest_by_keyword

In [4]:
if __name__ == "__main__":
    if len(sys.argv) != 5:
        print("Usage: wordcount <file> <output> ", file=sys.stderr)
        exit(-1)
        
    # Creating instance of Spark Sessions
    sc = SparkContext()
    spark = SparkSession.builder.appName('FinalProject').getOrCreate()
    sqlContext = SQLContext(sc)  

Usage: wordcount <file> <output> 
24/05/09 16:44:59 WARN Utils: Your hostname, Adarshs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.200.63.84 instead (on interface en0)
24/05/09 16:44:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/09 16:44:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName('YelpDataAnalysis').getOrCreate()

# Paths to my datasets
business_path = '/Users/adarshraoakula/Documents/Large_Yelp_datasets/yelp_academic_dataset_business.json'
review_path = '/Users/adarshraoakula/Documents/Large_Yelp_datasets/yelp_academic_dataset_review.json'
user_path = '/Users/adarshraoakula/Documents/Large_Yelp_datasets/yelp_academic_dataset_user.json'

# Reading the datasets
business_df = spark.read.json(business_path)
reviews_df = spark.read.json(review_path)
users_df = spark.read.json(user_path)

# Counting the number of records in each DataFrame and print
business_count = business_df.count()
reviews_count = reviews_df.count()
users_count = users_df.count()

print(f'Business Count: {business_count}')
print(f'Reviews Count: {reviews_count}')
print(f'Users Count: {users_count}')

# Displaying the first 2 rows of each DataFrame
business_df.show(5)
reviews_df.show(5)
users_df.show(5)


24/05/09 16:45:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/05/09 16:45:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/05/09 16:45:12 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Business Count: 150346
Reviews Count: 6990280
Users Count: 1987897
+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|         city|               hours|is_open|  latitude|   longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|{NULL, NULL, NULL...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...|Santa Barbara|                NULL|      0|34.4266787|-119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|{NULL, NULL, NULL...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|    

In [6]:
import sys
print("Python executable path:", sys.executable)
#Just to know the my path for python 

Python executable path: /Users/adarshraoakula/anaconda3/bin/python


In [7]:
import os
import findspark
findspark.init()

# Now that PySpark is initialized, we can find SPARK_HOME
print("SPARK_HOME:", findspark.find())
#To know the spark session running environment and its path

SPARK_HOME: /Users/adarshraoakula/anaconda3/lib/python3.11/site-packages/pyspark


In [8]:
!java -version
!echo $JAVA_HOME


java version "21.0.2" 2024-01-16 LTS
Java(TM) SE Runtime Environment (build 21.0.2+13-LTS-58)
Java HotSpot(TM) 64-Bit Server VM (build 21.0.2+13-LTS-58, mixed mode, sharing)



In [9]:
from pyspark.sql import SparkSession

# Created Spark session to know any mutliple spark sessions running 
spark = SparkSession.builder \
    .appName("Simple Test") \
    .config("spark.master", "local[*]") \
    .getOrCreate()

# Created a simple DataFrame
df = spark.createDataFrame([("Adarsh", 23)], ["name", "age"])

# To Show DataFrame
df.show()

# Stop Spark session
spark.stop()


24/05/09 16:45:20 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+------+---+
|  name|age|
+------+---+
|Adarsh| 23|
+------+---+



In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, concat_ws
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml import Pipeline, PipelineModel

# Create or get SparkSession with increased memory and additional configurations
spark = SparkSession.builder \
    .appName("Restaurant Recommendations") \
    .master("local[*]") \
    .getOrCreate()


try:
    # Loading the dataset which are in json files
    reviews_df = spark.read.json("/Users/adarshraoakula/Documents/Large_Yelp_datasets/yelp_academic_dataset_review.json")
    business_df = spark.read.json("/Users/adarshraoakula/Documents/Large_Yelp_datasets/yelp_academic_dataset_business.json")

    # Create review text dataframe from reviews data
    reviews_text = reviews_df.select('business_id', 'text')

    # Repartition the DataFrame to manage potential data skew
    reviews_text = reviews_text.repartition(200, "business_id")

    # Group all reviews per business and concatenate into a single string
    reviews_by_business = reviews_text.groupBy('business_id').agg(concat_ws(" ", collect_list('text')).alias('text'))

    # Text processing pipeline
    tokenizer = RegexTokenizer(pattern='\w+', inputCol='text', outputCol='tokens')
    stopWordsRemover = StopWordsRemover(inputCol='tokens', outputCol='nostopwords')
    countVectorizer = CountVectorizer(inputCol='nostopwords', outputCol='rawFeatures', vocabSize=1000)
    tfiDF = IDF(inputCol='rawFeatures', outputCol='tfidf_vec')

    pipeline = Pipeline(stages=[tokenizer, stopWordsRemover, countVectorizer, tfiDF])

    # Fit the model
    pipeline_model = pipeline.fit(reviews_by_business)

    # Save the pipeline model
    pipeline_model.write().overwrite().save("pipeline_model")

    # Load the pipeline model
    pipeline_model = PipelineModel.load("pipeline_model")

    # Convert the review data into feature vectors
    transformed_reviews_by_business = pipeline_model.transform(reviews_by_business)

    # Example recommendation logic (to be implemented)
    print("Transformations complete. Example recommendation logic pending implementation.")

except Exception as e:
    print(f"An error occurred: {e}")
finally:
    # Stop SparkSession to ensure it closes properly even if an error occurs
    spark.stop()


                                                                                

Transformations complete. Example recommendation logic pending implementation.


In [11]:
from pyspark.sql import SparkSession

# Stop existing Spark session if exists
if 'spark' in locals():
    spark.stop()

# Reinitialize Spark Session
spark = SparkSession.builder \
    .appName("Restaurant Recommendations Debugging") \
    .config("spark.master", "local[*]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()
#This process is really crucial because the multiple spark sessions running may get output errors

In [12]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import col

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Text Analysis with Null Handling") \
    .config("spark.master", "local[*]") \
    .getOrCreate()

# Loading business data
business_df = spark.read.json("/Users/adarshraoakula/Documents/Large_Yelp_datasets/yelp_academic_dataset_business.json")

# Fill or filtering out null values in the 'categories' column
business_df = business_df.na.fill({"categories": ""})  
# Replace nulls with empty string if appropriate for your use case
# Alternatively, you could filter out the rows where 'categories' is null
# business_df = business_df.filter(col("categories").isNotNull())

# Tokenize the non-null text
tokenizer = Tokenizer(inputCol="categories", outputCol="words")
wordsData = tokenizer.transform(business_df)

# Compute term frequency
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
featurizedData = hashingTF.transform(wordsData)

# Compute inverse document frequency
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
all_business_tfidf = idfModel.transform(featurizedData).select("business_id", "features")

# Show some results
all_business_tfidf.show()

# Stop Spark session to free resources
spark.stop()


24/05/09 16:48:45 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/05/09 16:48:47 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


+--------------------+--------------------+
|         business_id|            features|
+--------------------+--------------------+
|Pns2l4eNsfO8kk83d...|(262144,[51144,54...|
|mpf3x-BjTdTEA3yCZ...|(262144,[4562,619...|
|tUFrWirKiKi_TAnsV...|(262144,[9282,178...|
|MTSW4McQd7CbVtyjq...|(262144,[36957,51...|
|mWMc6_wTdE0EUBKIG...|(262144,[28225,12...|
|CF33F8-E6oudUQ46H...|(262144,[32727,36...|
|n_0UpQx1hsNbnPUSl...|(262144,[3326,928...|
|qkRM_2X51Yqxk3btl...|(262144,[7808,158...|
|k0hlBqXX-Bt0vf1op...|(262144,[128724,1...|
|bBDDEgkFA1Otx9Lfe...|(262144,[32727,36...|
|UJsufbvfyfONHeWdv...|(262144,[9282,741...|
|eEOYSgkmpB90uNA7l...|(262144,[36957,95...|
|il_Ro8jwPlHresjw9...|(262144,[51144,13...|
|jaxMSoInw8Poo3XeM...|(262144,[51144,61...|
|0bPLkL0QhhPO5kt1_...|(262144,[36957,64...|
|MUTTqe8uqyMdBl186...|(262144,[182451,2...|
|rBmpy_Y1UbBx8ggHl...|(262144,[34146,51...|
|M0XSSHqrASOnhgbWD...|(262144,[9282,511...|
|8wGISYjYkE2tSqn3c...|(262144,[38399,51...|
|ROeacJQwBeh05Rqg7...|(262144,[6

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when
from pyspark.sql.types import FloatType

def calculate_similarity(df, keyword):
    """Calculate similarity based on keyword presence in the categories."""
    return df.withColumn("similarity_score",
                         when(col("categories").contains(keyword), 0.9).otherwise(0.1))

def main():
    spark = SparkSession.builder \
        .appName("Yelp Restaurant Recommendations") \
        .config("spark.master", "local[*]") \
        .getOrCreate()

    try:
        # Loading the datasets in json 
        business_df = spark.read.json("/Users/adarshraoakula/Documents/Large_Yelp_datasets/yelp_academic_dataset_business.json")

        # Handle null values in 'categories' and filter for desired restaurants
        business_df = business_df.na.fill({"categories": ""})
        business_df = business_df.filter(col("categories").contains("Chinese"))  # Assuming you're only interested in Indian restaurants

        # Calculate similarity based on the presence of the keyword  in categories
        key_word = 'Chinese'
        similar_businesses = calculate_similarity(business_df, key_word)

        # Select necessary fields for output
        final_df = similar_businesses.select(
            "business_id",
            "similarity_score",
            "name",
            "categories",
            "stars",
            "review_count"
        ).orderBy("similarity_score", ascending=False).limit(10)

        # Show results
        print(f"\nRestaurants similar to keyword - {key_word}")
        final_df.show(truncate=False)

    except Exception as e:
        print(f"An error occurred: {e}")

    finally:
        # Stop the Spark session
        spark.stop()

if __name__ == "__main__":
    main()



Restaurants similar to keyword - Chinese
+----------------------+----------------+------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+-----+------------+
|business_id           |similarity_score|name                                            |categories                                                                                                                  |stars|review_count|
+----------------------+----------------+------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+-----+------------+
|f7FPMRaMbGiX-CXPaLFCJw|0.9             |Taste Restaurant At Lucky Lou Seafood and Dimsum|Seafood, Restaurants, Dim Sum, Chinese, Cantonese                                                                           |4.0  |212         |
|Pns2l4eNsfO8kk83d

In [14]:
!pip install folium


[33mDEPRECATION: Loading egg at /Users/adarshraoakula/anaconda3/lib/python3.11/site-packages/pyBWMD-0.0.1-py3.11.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation.. Discussion can be found at https://github.com/pypa/pip/issues/12330[0m[33m


In [17]:
import folium

def create_map(df, zoom_start=10):
    # Assuming the DataFrame `df` has columns 'latitude', 'longitude', 'name', and 'stars'
    # Convert Spark DataFrame to Pandas DataFrame for visualization in Jupyter
    pd_df = df.toPandas()

    # Center the map around the first entry's location
    map_center = [pd_df['latitude'][0], pd_df['longitude'][0]]
    map = folium.Map(location=map_center, zoom_start=zoom_start)

    # Add markers to the map
    for index, row in pd_df.iterrows():
        tooltip = f"{row['name']} - Rating: {row['stars']}"
        folium.Marker([row['latitude'], row['longitude']], popup=row['name'], tooltip=tooltip).add_to(map)

    return map

# Example usage within your Spark context
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Restaurant Visualization") \
    .config("spark.master", "local[*]") \
    .getOrCreate()

try:
    business_df = spark.read.json("/Users/adarshraoakula/Documents/Large_Yelp_datasets/yelp_academic_dataset_business.json")
    filtered_df = business_df.filter(
        (business_df['categories'].contains('Chinese')) &
        (business_df['latitude'].isNotNull()) &
        (business_df['longitude'].isNotNull())
    )

    # Call the function to create and display the map
    folium_map = create_map(filtered_df)
    display(folium_map)

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    spark.stop()
