El proceso de ETL comprendiendo la totalidad de los datos se realizará utilizando **pyspark** y **polars** y llamando a los datos desde su origen de drive para luego ser cargados a un data lakehouse en GCP

Instalamos las librerías pyspark y polars

In [None]:
!pip install pyspark polars gcsfs fastparquet

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastparquet
  Downloading fastparquet-2023.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m48.3 MB/s[0m eta [36m0:00:00[0m
Collecting cramjam>=2.3 (from fastparquet)
  Downloading cramjam-2.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m50.7 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=4c850028132519b9926076204cb1921e6b3965be534e76

Importamos las librerías necesarias

In [None]:
import os
import json
import pandas as pd
import polars as pl
from datetime import date, timedelta, datetime
import time
import re

import pyspark.pandas as ps
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

from google.cloud import storage
import pyarrow.parquet as pq
from google.colab import auth
from google.colab import drive
drive.mount('/content/drive')
auth.authenticate_user()



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Iniciamos una session de Spark

In [None]:
spark = SparkSession.builder.appName('ETL_maps').config ("spark.sql.execution.arrow.enabled", "true").getOrCreate()

In [None]:
spark

# ETL metadata-sitios

Información del comercio, incluyendo localización, atributos y categorías.

In [None]:
sitio1 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/1.json')

Mostramos la información del DF sitio1

In [None]:
# Mostramos el DF sitio1
sitio1.show(7)

# Descripción del DF
sitio1.describe().show()

+--------------------+--------------------+----------+-------------------+-----------+--------------------+--------------------+----------+-------------------+--------------------+--------------+-----+--------------------+--------------------+--------------------+
|                MISC|             address|avg_rating|           category|description|             gmap_id|               hours|  latitude|          longitude|                name|num_of_reviews|price|    relative_results|               state|                 url|
+--------------------+--------------------+----------+-------------------+-----------+--------------------+--------------------+----------+-------------------+--------------------+--------------+-----+--------------------+--------------------+--------------------+
|{[Wheelchair acce...|Porter Pharmacy, ...|       4.9|         [Pharmacy]|       NULL|0x88f16e41928ff68...|[[Friday, 8AM–6PM...|   32.3883|           -83.3571|     Porter Pharmacy|            16| NULL|[0x8

In [None]:
sitio2 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/2.json')

In [None]:
# Mostramos el DF sitio2
sitio2.show(7)

# Descripción del DF
sitio2.describe().show()

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+--------------+-----+--------------------+--------------------+--------------------+
|                MISC|             address|avg_rating|            category|         description|             gmap_id|               hours|          latitude|         longitude|                name|num_of_reviews|price|    relative_results|               state|                 url|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+--------------+-----+--------------------+--------------------+--------------------+
|{[Wheelchair acce...|Porter Pharmacy, ...|       4.9|          [Pharmacy]|                NULL|0x88f16e41928ff68...|[[Friday, 8AM–6PM...|           32.38

Abrimos el resto de archivos **sitio**

In [None]:
sitio3 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/3.json')
sitio4 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/4.json')
sitio5 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/5.json')
sitio6 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/6.json')
sitio7 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/7.json')
sitio8 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/8.json')
sitio9 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/9.json')
sitio10 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/10.json')
sitio11 = spark.read.json('/content/drive/MyDrive/Google Maps/metadata-sitios/11.json')

Observamos la estructura de cada esquema

In [None]:
sitios = [sitio1, sitio2, sitio3, sitio4, sitio5, sitio6, sitio7, sitio8, sitio9, sitio10, sitio11]

for i, df in enumerate(sitios):
    print(f"Schema de sitio{i + 1}:")
    df.printSchema()

Schema de sitio1:
root
 |-- MISC: struct (nullable = true)
 |    |-- Accessibility: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Activities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Amenities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Atmosphere: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Crowd: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Dining options: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- From the business: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Getting here: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Health & safety: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Highlights: array (nullab

Según los esquemas, la columna **MISC** los DFs de los sitios 1, 2, 3, 6 y 9 es diferente tipo que la misma columna en los DFs 4, 5, 7, 8, 10 y 11. Por lo que, unimos todos los DFs de acuerdo al tipo de dato de la columna **MISC**.

In [None]:
# sitioA contiene los DFs cuya columna MISC tiene 16 archivos
sitioA = sitio1.union(sitio2).union(sitio3).union(sitio6).union(sitio9)

In [None]:
# sitioB contiene los DFs cuya columna MISC tiene 17 archivos
sitioB = sitio4.union(sitio5).union(sitio7).union(sitio8).union(sitio10).union(sitio11)

In [None]:
'''
def desanidar_columna(df, columna):
  # Obtenemos el esquema de la columna que se quiere desanidar
  esquema = df.schema[columna].dataType

  # Iteramos a través de los campos de la columna a desanidar
  for nombre_campo in esquema.names:
    nuevo_nombre_columna = f"{nombre_campo}"
    df = df.withColumn(nuevo_nombre_columna, col(f"{columna}.{nombre_campo}"))

  # Eliminamos la columna anidada
  df = df.drop(columna)

  return df

'''

In [None]:
# sitioA = desanidar_columna(sitioA, 'MISC')
# sitioB = desanidar_columna(sitioB, 'MISC')

Mostramos

In [None]:
sitioA.show(5)

sitioB.show(5)

+--------------------+--------------------+----------+-------------------+-----------+--------------------+--------------------+----------+------------+----------------+--------------+-----+--------------------+-----------------+--------------------+
|                MISC|             address|avg_rating|           category|description|             gmap_id|               hours|  latitude|   longitude|            name|num_of_reviews|price|    relative_results|            state|                 url|
+--------------------+--------------------+----------+-------------------+-----------+--------------------+--------------------+----------+------------+----------------+--------------+-----+--------------------+-----------------+--------------------+
|{[Wheelchair acce...|Porter Pharmacy, ...|       4.9|         [Pharmacy]|       NULL|0x88f16e41928ff68...|[[Friday, 8AM–6PM...|   32.3883|    -83.3571| Porter Pharmacy|            16| NULL|[0x88f16e41929435...|Open ⋅ Closes 6PM|https://www.googl.

Eliminamos la columna MISC de ambos DFs

In [None]:
sitioA = sitioA.drop('MISC')
sitioB = sitioB.drop('MISC')

Unimos los dos dataframes

In [None]:
sitios = sitioA.union(sitioB)

sitios.show(5)
sitios.count()

+--------------------+----------+-------------------+-----------+--------------------+--------------------+----------+------------+----------------+--------------+-----+--------------------+-----------------+--------------------+
|             address|avg_rating|           category|description|             gmap_id|               hours|  latitude|   longitude|            name|num_of_reviews|price|    relative_results|            state|                 url|
+--------------------+----------+-------------------+-----------+--------------------+--------------------+----------+------------+----------------+--------------+-----+--------------------+-----------------+--------------------+
|Porter Pharmacy, ...|       4.9|         [Pharmacy]|       NULL|0x88f16e41928ff68...|[[Friday, 8AM–6PM...|   32.3883|    -83.3571| Porter Pharmacy|            16| NULL|[0x88f16e41929435...|Open ⋅ Closes 6PM|https://www.googl...|
|City Textile, 300...|       4.5| [Textile exporter]|       NULL|0x80c2c98c0e3c1

3025011

In [None]:
# Observamos la cantidad de valores nulos
sitios.select([sum(col(columna).isNull().cast("int")).alias(columna) for columna in sitios.columns]).show()

+-------+----------+--------+-----------+-------+------+--------+---------+----+--------------+-------+----------------+------+---+
|address|avg_rating|category|description|gmap_id| hours|latitude|longitude|name|num_of_reviews|  price|relative_results| state|url|
+-------+----------+--------+-----------+-------+------+--------+---------+----+--------------+-------+----------------+------+---+
|  80511|         0|   17419|    2770722|      0|787405|       0|        0|  37|             0|2749808|          295058|746455|  0|
+-------+----------+--------+-----------+-------+------+--------+---------+----+--------------+-------+----------------+------+---+



Eliminamos otras columnas innecesarias

In [None]:
sitios = sitios.drop('description', 'hours', 'num_of_reviews', 'price', 'relative_results', 'state', 'url')

In [None]:
# Fuera valores nulos
sitios = sitios.dropna()

sitios.show(5)
sitios.count()

+--------------------+----------+-------------------+--------------------+----------+------------+----------------+
|             address|avg_rating|           category|             gmap_id|  latitude|   longitude|            name|
+--------------------+----------+-------------------+--------------------+----------+------------+----------------+
|Porter Pharmacy, ...|       4.9|         [Pharmacy]|0x88f16e41928ff68...|   32.3883|    -83.3571| Porter Pharmacy|
|City Textile, 300...|       4.5| [Textile exporter]|0x80c2c98c0e3c16f...|34.0188913|-118.2152898|    City Textile|
|San Soo Dang, 761...|       4.4|[Korean restaurant]|0x80c2c778e3b73d3...|34.0580917|-118.2921295|    San Soo Dang|
|Nova Fabrics, 220...|       3.3|     [Fabric store]|0x80c2c89923b27a4...|34.0236689|-118.2329297|    Nova Fabrics|
|Nobel Textile Co,...|       4.3|     [Fabric store]|0x80c2c632f933b07...|34.0366942|-118.2494208|Nobel Textile Co|
+--------------------+----------+-------------------+-------------------

2927086

Eliminamos duplicados

In [None]:
sitios = sitios.dropDuplicates()

In [None]:
sitios.count()

2901730

Guardamos el DF `sitios`

In [None]:
sitios = sitios.toPandas()

In [None]:
sitios.to_parquet('gs://yelp-and-maps-data-processed/Maps/metadata_sitios_clean.parquet')

Finalizamos la sesión de spark

In [None]:
spark.stop()