In [3]:
import sys
from pymongo import MongoClient

In [69]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import count, countDistinct, lit

# Recommender

In [5]:
from pymongo import MongoClient
client = MongoClient('localhost', 27017) ## or MongoClient("localhost:27")
collection = client.mealMaster.restaurants
print(f'There are {collection.count_documents({})} documents')
cursor = collection.find()

There are 30 documents


In [7]:
cursor[0].keys()

dict_keys(['_id', 'data', 'meta'])

In [11]:
cursor[0]['meta']['city']

{'slugsByLocale': {'it-IT': 'milano',
  'fr-FR': 'milan',
  'es-ES': 'milan',
  'ca-ES': 'mila',
  'fr-CH': 'milan',
  'de-CH': 'mailand',
  'en-US': 'milan',
  'nl-BE': 'milaan',
  'fr-BE': 'milan',
  'sv-SE': 'milano',
  'nl-NL': 'milaan',
  'pt-PT': 'milan',
  'en-AU': 'milan',
  'de-DE': 'mailand',
  'de-AT': 'mailand',
  'en-GB': 'milan'},
 'id': 348156,
 'name': 'Milan',
 'countryId': 111,
 'slug': 'milan',
 'hasHomepage': True,
 'latitude': 45.4642035,
 'longitude': 9.189981999999986,
 'isSeoIndexed': True,
 'photo': 'https://c.tfstatic.com/homepage_city/348156.jpg'}

In [8]:
cursor[0]['data'][0]

{'name': 'Arcano Restaurant & Music',
 'slug': 'arcano-restaurant-music',
 'geo': {'longitude': '9.1645764', 'latitude': '45.4647843'},
 'servesCuisine': 'Meat Cuisine',
 'containedInPlace': {},
 'priceRange': 38,
 'currenciesAccepted': 'EUR',
 'address': {'street': 'Viale S. Michele del Carso, 9',
  'postalCode': '20123',
  'locality': 'Milano',
  'country': 'Italy'},
 'aggregateRatings': {'thefork': {'ratingValue': 10, 'reviewCount': 3},
  'tripadvisor': {'ratingValue': None, 'reviewCount': None}},
 'mainPhoto': {'source': 'https://res.cloudinary.com/tf-lab/image/upload/restaurant/1e514fe3-adcc-4fb3-a602-7c2338b20c04/4dbb066e-458d-4473-897c-454deb3ec0ed.jpg',
  '612x344': 'https://res.cloudinary.com/tf-lab/image/upload/f_auto,q_auto,w_612,h_344/restaurant/1e514fe3-adcc-4fb3-a602-7c2338b20c04/4dbb066e-458d-4473-897c-454deb3ec0ed.jpg',
  '480x270': 'https://res.cloudinary.com/tf-lab/image/upload/f_auto,q_auto,w_480,h_270/restaurant/1e514fe3-adcc-4fb3-a602-7c2338b20c04/4dbb066e-458d-447

In [124]:
class Recommender:
    def __init__(self, group_id) -> None:
        self.spark = SparkSession.builder.getOrCreate()
        self.df = self._load_restaurants()
        # self.df = self.df.repartition("city", "servesCuisine")

        self._load_group(group_id)
        # self.df_group = self.df_group.repartition("group_id")

    def _load_restaurants(self):
        client = MongoClient('localhost', 27017) ## or MongoClient("localhost:27")
        collection = client.mealMaster.restaurants
        print(f'There are {collection.count_documents({})} documents')
            
        columns = ['id', 'name', 'servesCuisine', 'price', 'city', 'city_id', 'rate_all']
        cursor = collection.find()
        vals = []
        for c in cursor:
            city = c['meta']['city']
            for r in c['data']:
                rate_all =  float (r['aggregateRatings']['thefork']['ratingValue']) if r['aggregateRatings']['thefork']['ratingValue'] else None
                servesCuisine = r['servesCuisine'] if 'servesCuisine' in r else None
                vals.append((r['id'], r['name'], servesCuisine, r['priceRange'], city['name'], city['id'], rate_all))
                
        df = self.spark.createDataFrame(vals, columns)
        client.close()
        return df
    
    def _load_group(self, group_id):
        # to be revise
        df = self.df.select('id')
        df = df.withColumn("group_id", lit(1))
        df = df.withColumn("rateGrp", lit(10.0))

        # join to self.df
        filtered_group = df.filter(df.group_id == group_id).select(df.id, df.rateGrp)
        self.df = self.df.join(filtered_group, on="id", how="inner")
        self.df.cache()
    
    def search(self, city, servesCuisine):
        filtered_restaurants = self.df.filter(self.df.city == city)
        if servesCuisine:
            filtered_restaurants = filtered_restaurants.filter(filtered_restaurants.servesCuisine == servesCuisine)
        return filtered_restaurants




In [125]:
r = Recommender(1)

There are 30 documents


In [126]:
r.df.count()

300

In [121]:
out = r.search("Milan", "Japanese")

UnboundLocalError: local variable 'filtered_restaurants' referenced before assignment

In [118]:
out.count()

28

In [107]:
out.show()

+------+--------------------+-------------+-----+-----+-------+--------+--------+-------+
|    id|                name|servesCuisine|price| city|city_id|rate_all|group_id|rateGrp|
+------+--------------------+-------------+-----+-----+-------+--------+--------+-------+
|585631|MiSushi3 - Corso ...|     Japanese|   28|Milan| 348156|     9.2|       1|   10.0|
|209757|Yang Sushi and Fu...|     Japanese|   35|Milan| 348156|     9.0|       1|   10.0|
|524573| Muud Creative Sushi|     Japanese|   35|Milan| 348156|     9.1|       1|   10.0|
|659825|Light Sushi Resta...|     Japanese|   25|Milan| 348156|     8.9|       1|   10.0|
|626545|           Umi Sushi|     Japanese|   28|Milan| 348156|     8.5|       1|   10.0|
|289393|Akitaya - Alla carta|     Japanese|   25|Milan| 348156|     8.8|       1|   10.0|
|690283|Bentobus·Città studi|     Japanese|   28|Milan| 348156|     8.6|       1|   10.0|
|737385|         Inada ramen|     Japanese|   14|Milan| 348156|     9.0|       1|   10.0|
| 58450|  

In [63]:
r.df_group.describe().show()

+-------+-----------------+--------+-------+
|summary|               id|group_id|rateGrp|
+-------+-----------------+--------+-------+
|  count|              300|     300|    300|
|   mean|        547584.91|     1.0|   10.0|
| stddev|220925.1678633137|     0.0|    0.0|
|    min|           200205|       1|   10.0|
|    max|            78143|       1|   10.0|
+-------+-----------------+--------+-------+



In [67]:
r.df_group.printSchema()

root
 |-- id: string (nullable = true)
 |-- group_id: integer (nullable = false)
 |-- rateGrp: double (nullable = false)



In [68]:
r.df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- servesCuisine: string (nullable = true)
 |-- price: long (nullable = true)
 |-- city: string (nullable = true)
 |-- city_id: long (nullable = true)
 |-- rate_all: double (nullable = true)



In [71]:
r.df.select("price", "rate_all", "city").describe().show()

+-------+------------------+-------------------+-----+
|summary|             price|           rate_all| city|
+-------+------------------+-------------------+-----+
|  count|               300|                294|  300|
|   mean|29.496666666666666|  8.815306122448982| null|
| stddev|12.347937510292255|0.40643762645019665| null|
|    min|                12|                7.3|Milan|
|    max|               100|               10.0|Milan|
+-------+------------------+-------------------+-----+



In [42]:
r.df.groupby('servesCuisine').count().show()

+--------------+-----+
| servesCuisine|count|
+--------------+-----+
| International|    6|
|        Fusion|   11|
|        Indian|   12|
|       Chinese|   12|
| Vegan cuisine|    4|
|      Japanese|   28|
|  Meat Cuisine|   33|
|      Milanese|   11|
|      Pizzeria|   28|
|Middle Eastern|    3|
|       Seafood|   29|
|         Sardo|    1|
|         Asian|    5|
| Mediterranean|   28|
|      American|    9|
|       Mexican|    8|
|     Siciliano|    8|
|      Hawaiian|    4|
|       Italian|   26|
|        French|    1|
+--------------+-----+
only showing top 20 rows



In [40]:
r.df.groupby('servesCuisine').agg(count("servesCuisine").alias("count")).count()

40

In [70]:
r.df.show()

+------+--------------------+--------------+-----+-----+-------+--------+
|    id|                name| servesCuisine|price| city|city_id|rate_all|
+------+--------------------+--------------+-----+-----+-------+--------+
|756321|Arcano Restaurant...|  Meat Cuisine|   38|Milan| 348156|    10.0|
|585631|MiSushi3 - Corso ...|      Japanese|   28|Milan| 348156|     9.2|
|548309|Panini di Mare Bi...|       Seafood|   20|Milan| 348156|     9.3|
|714585|  San Carlo al Corso|      Milanese|   30|Milan| 348156|     8.8|
|750529|           Broadwine| International|   35|Milan| 348156|     9.2|
|697067|               Slice|      Pizzeria|   15|Milan| 348156|     9.2|
|628683|       Costa Sigieri|         Sardo|   39|Milan| 348156|     9.1|
|444585| Little Lamb Bernina|         Asian|   25|Milan| 348156|     9.2|
| 54920|        Haruka Sushi|Middle Eastern|   35|Milan| 348156|     9.2|
| 61583|               Dawat|        Indian|   21|Milan| 348156|     8.8|
|688117|                  Fx|      Piz

In [97]:
out = r.df.repartition("city", "servesCuisine")

In [98]:
out.show()

+------+--------------------+--------------+-----+-----+-------+--------+
|    id|                name| servesCuisine|price| city|city_id|rate_all|
+------+--------------------+--------------+-----+-----+-------+--------+
|734817|             Salmone|        Fusion|   20|Milan| 348156|     9.0|
|444585| Little Lamb Bernina|         Asian|   25|Milan| 348156|     9.2|
|603737|               Fanwu|       Chinese|   25|Milan| 348156|     9.0|
|713379|JD - Sapore della...|       Chinese|   20|Milan| 348156|     8.9|
|756321|Arcano Restaurant...|  Meat Cuisine|   38|Milan| 348156|    10.0|
|488005|La Filetteria Ita...|  Meat Cuisine|   40|Milan| 348156|     9.1|
|710297|Arcano Restaurant...|  Meat Cuisine|   38|Milan| 348156|     8.3|
|645925|Osteria Filetteri...|  Meat Cuisine|   40|Milan| 348156|     9.0|
|628683|       Costa Sigieri|         Sardo|   39|Milan| 348156|     9.1|
|697067|               Slice|      Pizzeria|   15|Milan| 348156|     9.2|
|688117|                  Fx|      Piz