# PySpark: Reto Restaurantes Tacos&Burritos

Autor: Gerardo Andrés Arias Remolina

github: gerardoarias



A continuación disponemos de una base de datos proveniente de [Kaggle](https://www.kaggle.com/datafiniti/restaurants-burritos-and-tacos). En ella se encuentran cerca de 20000 observaciones relacionadas con restaurantes o pequeños negocios que ofrecen dentro de sus menus tacos o burritos (aunque no es lo que exclusivamente ofrecen). El objetivo de este reto será usar PySpark para limpiar la base de datos y con las herramientas con las que dispone Spark poder generar un ranking de los tipos de cocina más populares dentro de esta muestra.

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 42 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=5c2345772069e754ecbebc44cff77a388d5413ff45de04ef4800dd524140db17
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
# Con esta función descomprimimos un archivo .zip donde se encuentra el dataset en el entorno de Google Colab
import zipfile

ruta_zip = "/content/drive/MyDrive/BD_GoogleColab/BD_Restaurantes_TacosBurritos.zip"
ruta_extraccion = "/content"
password = None
archivo_zip = zipfile.ZipFile(ruta_zip, "r")
try:
    print(archivo_zip.namelist())
    archivo_zip.extractall(pwd=password, path=ruta_extraccion)
except:
    pass
archivo_zip.close()

['just tacos and burritos.csv']


Primero configuramos el contexto de  Spark (SparkContext). Ello es clave porque este contexto es el motor interno que nos permite conectarnos con los clusters. Además, resulta clave configurar el motor con diferentes fuentes de datos. Por eso cargamos SQLContext, facilitando así las funcionalidades de SQL.

Igualmente cargamos el dataset de interés indicándole que está en formato de texto plano csv, que tiene encabezado, y le indicamos la ubicación. Además con inferSchema = true, le indicamos a Spark que infiera el tipo de datos para cada una de las variables.

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext

conf = SparkConf().setMaster('local').setAppName('Mi programa')
sc = SparkContext(conf = conf) 
sqlContext = SQLContext(sc)

dfspark = sqlContext.read.format('csv').option('header','true').option('inferSchema','true').load('/content/just tacos and burritos.csv')




In [4]:
dfspark.show(5)

+--------------------+--------------------+--------------------+-----------+-------+--------------------+-------------------+-------------------+--------------------+----------+------------+--------------------+---------------+---------------+--------------+--------------+--------------------+--------------------+-----------------+--------------------+----------+------------------+-------------+-------------+------------+--------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----

Ahora filtramos el dataset para que no hallan observaciones de nuestra variable de interés (cuisines) nulas.

In [5]:
dfspark = dfspark.filter('cuisines is not NULL')

Revisamos el tipo de variables que Spark ha inferido. A pesar de que Spark se aproxima muy bien al tipo de variables, es importante revisarlas porque pudo haber catalogado algunas de ellas como objetos e impedir con ello la ejecución correcto de nuestro modelo. 

Adicionalmente, podemos identificar columnas adicionales que por error se hayan considerado como variables y que no lo sean.

In [6]:
dfspark.printSchema()

root
 |-- id: string (nullable = true)
 |-- address: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- cuisines: string (nullable = true)
 |-- dateAdded: timestamp (nullable = true)
 |-- dateUpdated: timestamp (nullable = true)
 |-- keys: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- menuPageURL: string (nullable = true)
 |-- menus.amountMax: double (nullable = true)
 |-- menus.amountMin: double (nullable = true)
 |-- menus.category: string (nullable = true)
 |-- menus.currency: string (nullable = true)
 |-- menus.dateSeen: string (nullable = true)
 |-- menus.description: string (nullable = true)
 |-- menus.name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- postalCode: string (nullable = true)
 |-- priceRangeCurrency: string (nullable = true)
 |-- priceRangeMin: integer (nullable = true)
 |-- priceRangeMax

Aislamos la columna de interés en donde se encuentran los tipos de cocina.

In [7]:
A = sc.parallelize(dfspark.select('cuisines').rdd.collect())

Hacemos este objeto persistente en memoria

In [8]:
A.persist()

ParallelCollectionRDD[20] at readRDDFromFile at PythonRDD.scala:274

Revisamos el detalle del contenido del objeto. Encontramos que los string de tipo de cocina incluyen espacios y que cada tipo de cocina se encuentra separado por comas. Por lo que posteriormente limpiaremos esto y separaremos por comas con ayuda de una función.

In [9]:
A.collect()

[Row(cuisines='Restaurant Delivery Service'),
 Row(cuisines='Restaurant Delivery Service'),
 Row(cuisines='Mexican'),
 Row(cuisines='Mexican'),
 Row(cuisines='Mexican'),
 Row(cuisines='Mexican'),
 Row(cuisines='Mexican'),
 Row(cuisines='Mexican'),
 Row(cuisines='Mexican'),
 Row(cuisines='Mexican'),
 Row(cuisines='Mexican'),
 Row(cuisines='Mexican'),
 Row(cuisines='Mexican'),
 Row(cuisines='Coffee Shops'),
 Row(cuisines='Coffee Shops'),
 Row(cuisines='Coffee Shops'),
 Row(cuisines='Mexican, Latin American'),
 Row(cuisines='Mexican, Latin American'),
 Row(cuisines='American,Seafood Restaurants'),
 Row(cuisines='American,Seafood Restaurants'),
 Row(cuisines='American,Seafood Restaurants'),
 Row(cuisines='American,Seafood Restaurants'),
 Row(cuisines='American,Seafood Restaurants'),
 Row(cuisines='Traditional American'),
 Row(cuisines='Traditional American'),
 Row(cuisines='Traditional American'),
 Row(cuisines='Traditional American'),
 Row(cuisines='Traditional American'),
 Row(cuisines='

In [10]:
def split_cuisines(x):
  x = x[0]                  # Nos aseguramos tomar el dato en particular
  x = x.replace(' ','')     # Reemplazamos los espacios, a no tenerlos 
  x = x.split(',')          # Dividimos por la coma
  return x

# Con esta función nos aseguramos que las observaciones con 2 o más tipos de cocina
# se separen. Eliminamos los espacios y dividimos por la coma

Con ayuda de flatMap aplicamos la función sobre cada uno de los elementos. A pesar de que flatmap es muy similar a map, es importante entender que en este caso particular necesitamos devolver varias elementos. Le pasamos a flatmap un RDD de N elementos y nos devuelve un nuevo RDD de M elementos, ello porque le solicitamos que transformara el RDD en donde en la columna cuisines se agregaban todos los tipos de cocina a uno en donde los separa de acuerdo con la función que definimos previamente.

In [11]:
flatmap  = A.flatMap(split_cuisines)

In [12]:
flatmap.collect() # Con collect nos muestra el resultado de la transformación

['RestaurantDeliveryService',
 'RestaurantDeliveryService',
 'Mexican',
 'Mexican',
 'Mexican',
 'Mexican',
 'Mexican',
 'Mexican',
 'Mexican',
 'Mexican',
 'Mexican',
 'Mexican',
 'Mexican',
 'CoffeeShops',
 'CoffeeShops',
 'CoffeeShops',
 'Mexican',
 'LatinAmerican',
 'Mexican',
 'LatinAmerican',
 'American',
 'SeafoodRestaurants',
 'American',
 'SeafoodRestaurants',
 'American',
 'SeafoodRestaurants',
 'American',
 'SeafoodRestaurants',
 'American',
 'SeafoodRestaurants',
 'TraditionalAmerican',
 'TraditionalAmerican',
 'TraditionalAmerican',
 'TraditionalAmerican',
 'TraditionalAmerican',
 'TraditionalAmerican',
 'TakeOutRestaurants',
 'Pizza',
 'Pizza',
 'TakeOutRestaurants',
 'TakeOutRestaurants',
 'Seafood',
 'SeafoodRestaurants',
 'Japanese',
 'Thai',
 'Hibachi',
 'ContemporaryAmerican',
 'WineBar',
 'AfternoonTea',
 'FastFoodRestaurants',
 'Pizza',
 'TakeOutRestaurants',
 'TakeOutRestaurants',
 'TakeOutRestaurants',
 'TakeOutRestaurants',
 'TakeOutRestaurants',
 'TakeOutRestau

Ahora aplicamos una nueva transformación en donde vamos a asignarle a cada columna un puntaje de 1. Después con ayuda de reduceByKey sumamos los puntajes de las observaciones que sean iguales. Por lo que tendremos entonces un objeto que por cada tipo de cocina nos indica el número de veces que se ha repetido.

Finalmente con ayuda de sortBy ordenamos de forma descendente y nos muestra un ranking de los tipos de cocina más populares.

In [13]:
mapfun = flatmap.map(lambda x: (x,1))

In [14]:
results = mapfun.reduceByKey(lambda x,y: x+y)

In [15]:
results.sortBy(lambda x: x[1], ascending=False).collect()

[('Mexican', 10291),
 ('TakeOutRestaurants', 5035),
 ('MexicanRestaurants', 4034),
 ('American', 3306),
 ('TraditionalAmerican', 3033),
 ('Restaurants', 2783),
 ('FastFoodRestaurants', 2482),
 ('LatinAmericanRestaurants', 1600),
 ('Southwestern', 1573),
 ('AmericanRestaurants', 1498),
 ('Seafood', 1387),
 ('ContemporaryAmerican', 1100),
 ('Pizza', 832),
 ('FastFood', 752),
 ('Bars', 750),
 ('Hamburgers', 595),
 ('Mexican/Southwestern', 480),
 ('CoffeeShops', 425),
 ('LatinAmerican', 383),
 ('Caterers', 376),
 ('ComfortFood', 370),
 ('Californian', 342),
 ('Italian', 328),
 ('ContemporaryMexican', 327),
 ('Spanish', 325),
 ('NewAmerican', 310),
 ('Steak', 302),
 ('Steakhouse', 299),
 ('Burgers', 280),
 ('FamilyStyleRestaurants', 264),
 ('Bar/Lounge/BottleService', 259),
 ('International', 244),
 ('Barbecue', 226),
 ('Mediterranean', 214),
 ('Tapas/SmallPlates', 198),
 ('GastroPub', 196),
 ('Asian', 180),
 ('Latin/Spanish', 175),
 ('Tex-Mex', 172),
 ('Southwest', 164),
 ('TraditionalMexi

#### Conclusión y Resultados

Con ayuda de PySpark encontramos que dentro de los restaurantes que ofrecen Tacos y Burritos, el tipo de cocina más común corresponde a 'Mexican' con 10291 restaurantes, seguido de 'TakeOutRestaurants' con 5034 restaurantes.