# Revisión de datos para ETL y E/R
La finalidad de este notebook es hacer una revisión de los datos actuales para definir un esquema E/R y pipeline ETL.
Vamos a trabajar con la libreria SPARK.

In [1]:
import findspark
import pickle
import pandas as pd
import os
os.environ["SPARK_HOME"] = r"F:\DataScience\spark\spark-3.5.3-bin-hadoop3\spark-3.5.3-bin-hadoop3"

findspark.init()


from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").config("spark.executor.memory", "8g").appName("PySpark").getOrCreate()
#para cerrar la sesión debemos usar spark.stop()

In [2]:
print(spark.version)

3.5.3


In [3]:
import re
from pyspark.sql.functions import explode
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_extract
from pyspark.sql.functions import sum, when


# Datos Google
Aqui tenemos basicamente dos datasets.

1. metadata-sitios: Metadata de todos los sites. Incluye cantidad de comentarios, descripcion, locación, categoria, rating promedio, etc.
2. review-estados: Cada uno de los reviews organizados por estado. Información relevante: usuario, texto, rating, gmap_id (id del negocio) 

## Metadata Sitios

In [7]:
ruta_lectura = r"F:\DataScience\PF - DataNova\datasets\Google Maps\metadata-sitios\1.json"

In [8]:
df = spark.read.json(ruta_lectura)
# Considerar el uso de persist o cache
df_cached = df.cache()

In [None]:
#intentamos cargar todos los archivos.
#ruta_lectura_full = r"F:\DataScience\PF - DataNova\datasets\Google Maps\metadata-sitios\*.json"
#df_full = spark.read.json(ruta_lectura_full)
# este método no resulta eficiente, toma mas de 20 minutos en cargar todos los archivos.

In [8]:
# revision del df.
df_cached.show()

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+-------------------+--------------------+--------------+-----+--------------------+--------------------+--------------------+
|                MISC|             address|avg_rating|            category|         description|             gmap_id|               hours|          latitude|          longitude|                name|num_of_reviews|price|    relative_results|               state|                 url|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------------+-------------------+--------------------+--------------+-----+--------------------+--------------------+--------------------+
|{[Wheelchair acce...|Porter Pharmacy, ...|       4.9|          [Pharmacy]|                NULL|0x88f16e41928ff68...|[[Friday, 8AM–6PM...|           32

In [None]:
# cantidad de filas.
df_cached.count()

275001

In [61]:
# Esquema de la tabla. total de 15 columnas
df_cached.printSchema()

root
 |-- MISC: struct (nullable = true)
 |    |-- Accessibility: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Activities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Amenities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Atmosphere: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Crowd: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Dining options: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- From the business: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Getting here: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Health & safety: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Highlights: array (nullable = true)
 |    |

In [62]:
df_cached.describe().show()

+-------+--------------------+------------------+--------------------+--------------------+-------------------+-------------------+--------------------+------------------+-----+------------------+--------------------+
|summary|             address|        avg_rating|         description|             gmap_id|           latitude|          longitude|                name|    num_of_reviews|price|             state|                 url|
+-------+--------------------+------------------+--------------------+--------------------+-------------------+-------------------+--------------------+------------------+-----+------------------+--------------------+
|  count|              264939|            275001|               13155|              275001|             275001|             275001|              274994|            275001|13450|            195523|              275001|
|   mean|                NULL| 4.307215610124819|                NULL|                NULL|  37.49011244811361| -92.274707529695

### 1st Look
1. Posibles columnas a remover: 
    - description: No proporciona data relevante. Solo 13155 de 275001 sites tienen un descripcion, es decir, menos del 5%.
    - state: No brinda información relevante para nuestro análisis. Solo información del estado (abierto o cerrado) del momento en el cual se extrajo la data.
    - url: Dirección url del sitio en google maps.
    - price: Demasiados valores nulos.
    - hours: Horarios de atención del negocio. (Evaluar remoción, a menos que sea necesario para el análisis.)

2. No poseemos información de la ciudad o el estado, podemos usar una libreria como 'geopy' o 'geocoder' para obtener esta data a partir de latitude y longitud. Opcional: (Extraer de columna address)
3. Columnas clave: gmap_id, avg_rating, category, 
4. MISC: Tiene un esquema definido. Importante validar que data puede ser de utilidad dentro de este esquema.
5. CATEGORY: Realizar un análisis de las categorías existentes. (i,e: Nube de palabras)

### Remoción de valores

In [30]:
df_cached = df_cached.drop(*["description", "hours", "price", "state", "url"])
df_cached.columns

['MISC',
 'address',
 'avg_rating',
 'category',
 'gmap_id',
 'latitude',
 'longitude',
 'name',
 'num_of_reviews',
 'relative_results']

### Creacion de columna region

In [51]:
# Debido a que la API geopy tiene un limite de peticiones, tendremos que usar la columna address para obtener el estado.
state_regexp = r",\s([A-Z]{2})\s\d{5}"
df_cached = df_cached.withColumn("state", regexp_extract("address", state_regexp, 1))
df_cached.show()

+--------------------+--------------------+----------+--------------------+--------------------+------------------+-------------------+--------------------+--------------+--------------------+-----+
|                MISC|             address|avg_rating|            category|             gmap_id|          latitude|          longitude|                name|num_of_reviews|    relative_results|state|
+--------------------+--------------------+----------+--------------------+--------------------+------------------+-------------------+--------------------+--------------+--------------------+-----+
|{[Wheelchair acce...|Porter Pharmacy, ...|       4.9|          [Pharmacy]|0x88f16e41928ff68...|           32.3883|           -83.3571|     Porter Pharmacy|            16|[0x88f16e41929435...|   GA|
|                NULL|City Textile, 300...|       4.5|  [Textile exporter]|0x80c2c98c0e3c16f...|        34.0188913|       -118.2152898|        City Textile|             6|[0x80c2c624136ea8...|   CA|
|{[Wh

### MISC


In [65]:
# Mostramos el esquema expandido de la columna misc.
expanded_misc_df = df_cached.select("MISC.*")
expanded_misc_df.show()

+--------------------+----------+--------------------+----------+------------------+---------------+--------------------+------------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+
|       Accessibility|Activities|           Amenities|Atmosphere|             Crowd| Dining options|   From the business|Getting here|     Health & safety|      Highlights|           Offerings|            Payments|            Planning|         Popular for|Recycling|     Service options|
+--------------------+----------+--------------------+----------+------------------+---------------+--------------------+------------+--------------------+----------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+
|[Wheelchair acces...|      NULL|                NULL|      NULL|              NULL|           NULL|                NULL|        NULL|[M

In [None]:
# Metodo para conteo de nulos por columna. Hay un gran número de nulos. Sin embargo, de momento mantendremos la columna.
summary = expanded_misc_df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(f"n_nulls_{c}") for c in expanded_misc_df.columns
])

In [71]:
summary.show()

+---------------------+------------------+-----------------+------------------+-------------+----------------------+-------------------------+--------------------+-----------------------+------------------+-----------------+----------------+----------------+-------------------+-----------------+-----------------------+
|n_nulls_Accessibility|n_nulls_Activities|n_nulls_Amenities|n_nulls_Atmosphere|n_nulls_Crowd|n_nulls_Dining options|n_nulls_From the business|n_nulls_Getting here|n_nulls_Health & safety|n_nulls_Highlights|n_nulls_Offerings|n_nulls_Payments|n_nulls_Planning|n_nulls_Popular for|n_nulls_Recycling|n_nulls_Service options|
+---------------------+------------------+-----------------+------------------+-------------+----------------------+-------------------------+--------------------+-----------------------+------------------+-----------------+----------------+----------------+-------------------+-----------------+-----------------------+
|               116316|            27

### CATEGORY
Se trata de una columna de tipo ARRAY
Cada establecimiento tiene una lista de categorias. 
Las categorías, pueden ser palabras o frases.

In [63]:
# Visualización
category_df = df_cached.select("category")
category_df.show(truncate=False)

+-----------------------------------------------------------------------+
|category                                                               |
+-----------------------------------------------------------------------+
|[Pharmacy]                                                             |
|[Textile exporter]                                                     |
|[Korean restaurant]                                                    |
|[Fabric store]                                                         |
|[Fabric store]                                                         |
|[Fabric store]                                                         |
|[Restaurant]                                                           |
|[Nail salon, Waxing hair removal service]                              |
|[Bakery, Health food restaurant]                                       |
|[Greeting card shop, Service establishment]                            |
|[Dentist, Cosmetic dentist, Dental cl

In [None]:
# Explode. Expandimos las listas para visualizar los valores en categoria
category_df_exploded = category_df.select(explode("category").alias("category"))
category_df_exploded.count() #se cuenta la cantidad de datos posterior al explode.

530472

In [66]:
#visualizamos las categorias
category_df_exploded.show(truncate=False)

+---------------------------+
|category                   |
+---------------------------+
|Pharmacy                   |
|Textile exporter           |
|Korean restaurant          |
|Fabric store               |
|Fabric store               |
|Fabric store               |
|Restaurant                 |
|Nail salon                 |
|Waxing hair removal service|
|Bakery                     |
|Health food restaurant     |
|Greeting card shop         |
|Service establishment      |
|Dentist                    |
|Cosmetic dentist           |
|Dental clinic              |
|Auto glass shop            |
|Window tinting service     |
|Beauty salon               |
|Ski rental service         |
+---------------------------+
only showing top 20 rows



In [71]:
# agrupamos por categoria
category_df_exploded_count = category_df_exploded.groupBy("category").count()
# valores distintos.
category_df_exploded_count.count()

3769

In [72]:
# Mostramos los valores de manera descendente. (Top 100)
category_df_exploded_count.orderBy("count", ascending=False).show(100, truncate=False)

+----------------------------+-----+
|category                    |count|
+----------------------------+-----+
|Service establishment       |10055|
|Beauty salon                |5612 |
|Auto repair shop            |5499 |
|Restaurant                  |5386 |
|Church                      |5349 |
|Convenience store           |5122 |
|Gas station                 |4786 |
|Doctor                      |4492 |
|ATM                         |4285 |
|Insurance agency            |4094 |
|Hair salon                  |3963 |
|Dentist                     |3806 |
|Nail salon                  |2930 |
|Bank                        |2853 |
|Auto insurance agency       |2835 |
|Park                        |2767 |
|Attorney                    |2754 |
|Real estate agency          |2695 |
|Home insurance agency       |2537 |
|Corporate office            |2517 |
|Non-profit organization     |2501 |
|Life insurance agency       |2438 |
|Barber shop                 |2379 |
|Massage therapist           |2362 |
|

In [77]:
# Vamos a tokenizar para tener palabras por separado.
from pyspark.sql.functions import split, col, lower

In [None]:
# se aplica tokenización, y lower()
category_df_exploded_tokenized =  category_df_exploded.withColumn("WORDS", split(lower(col("category")), " "))
category_df_exploded_tokenized.show(truncate=False)

+---------------------------+--------------------------------+
|category                   |WORDS                           |
+---------------------------+--------------------------------+
|Pharmacy                   |[pharmacy]                      |
|Textile exporter           |[textile, exporter]             |
|Korean restaurant          |[korean, restaurant]            |
|Fabric store               |[fabric, store]                 |
|Fabric store               |[fabric, store]                 |
|Fabric store               |[fabric, store]                 |
|Restaurant                 |[restaurant]                    |
|Nail salon                 |[nail, salon]                   |
|Waxing hair removal service|[waxing, hair, removal, service]|
|Bakery                     |[bakery]                        |
|Health food restaurant     |[health, food, restaurant]      |
|Greeting card shop         |[greeting, card, shop]          |
|Service establishment      |[service, establishment]  

In [83]:
category_df_exploded_tokenized_exploded = category_df_exploded_tokenized.select(explode(col("WORDS")).alias("WORD"))
#mostramos un preview
category_df_exploded_tokenized_exploded.show(truncate=False)

+----------+
|WORD      |
+----------+
|pharmacy  |
|textile   |
|exporter  |
|korean    |
|restaurant|
|fabric    |
|store     |
|fabric    |
|store     |
|fabric    |
|store     |
|restaurant|
|nail      |
|salon     |
|waxing    |
|hair      |
|removal   |
|service   |
|bakery    |
|health    |
+----------+
only showing top 20 rows



In [84]:
# contamos la cantidad total de filas.
category_df_exploded_tokenized_exploded.count()

1159882

In [92]:
# agrupamos y contamos
category_df_exploded_tokenized_exploded_count = category_df_exploded_tokenized_exploded.groupBy("WORD").count()

print("Total palabras: ",category_df_exploded_tokenized_exploded_count.count()) 

print("Top: ")
category_df_exploded_tokenized_exploded_count.orderBy("count", ascending=False).show(200)

Total palabras:  2533
Top: 
+------------------+-----+
|              WORD|count|
+------------------+-----+
|             store|67760|
|           service|66700|
|              shop|35682|
|            agency|29369|
|            repair|17459|
|         insurance|17189|
|        restaurant|16466|
|              auto|15231|
|          supplier|14137|
|        contractor|13340|
|             salon|13165|
|            center|12740|
|            church|12329|
|            rental|10254|
|     establishment|10084|
|           company| 9856|
|              home| 9122|
|            supply| 8709|
|          attorney| 8611|
|               car| 7936|
|            school| 7574|
|            estate| 7405|
|            beauty| 7353|
|            clinic| 7182|
|            office| 6893|
|              real| 6740|
|            dealer| 6673|
|      organization| 6522|
|           station| 6392|
|               and| 6311|
|           dentist| 5989|
|          clothing| 5755|
|              hair| 5731|


#### Filtrado de negocios
En este apartado vamos a filtrar los negocios que nos interesan basados en sus categorías.

In [78]:
# Toca validar que servicios nos interesa filtrar:
categories = ["resort", "bar", "hotel", "sport", "turism", "gym", "theater", "pub", "restaurant", "food"]
#  filtro Diana: 'resort|bar|hotel|sport|turism|gym|theater|pubs|restaurant|food'

In [79]:
##### Filtro Incluyente ##### 
# Definimos el patrón de regular expresion
regex_pattern = r"(?i)\b(" + "|".join([re.escape(word) for word in categories]) + r")\b"
# Filtramos.
df_filtered = df_cached.select("gmap_id", explode("category").alias("category")).filter(col("category").rlike(regex_pattern))
# Remover duplicados
df_filtered_uniques = df_filtered.dropDuplicates(["gmap_id"])
df_filtered_uniques.show(truncate=False)

print("Business count: ", df_filtered_uniques.count())


+-------------------------------------+-----------------------+
|gmap_id                              |category               |
+-------------------------------------+-----------------------+
|0x1412bcfc0336fce9:0x33da3c68ef2176ac|Food products supplier |
|0x145c2bc56025f085:0xc0703bf29e3f5869|Mexican restaurant     |
|0x146c6711a70d7991:0xe478335f760c8ccc|Bar & grill            |
|0x14e0372676eb0cb1:0x1fb6b46c7e6aadf4|Hookah bar             |
|0x14e198ad826a1173:0xf5e0b932f6da828b|Fish & chips restaurant|
|0x14e3db43e40a22fb:0xaaa14b01a99cb9a |Gym                    |
|0x14faf9b542b970ff:0x6197fddb303864af|Gym                    |
|0x1525d35eb74ccdf3:0xd40c297f66eefede|Breakfast restaurant   |
|0x152682cdb239941d:0xe982e9f65d30ce09|Bar                    |
|0x40569e72d3c238e5:0x177ebe23bd0003d3|Restaurant             |
|0x406607f9019940bd:0x8f897fe6fbeaec74|Pizza restaurant       |
|0x40660fd6818fdb1d:0xa7dc46c3beab855b|Gym                    |
|0x4070c364e137a8a7:0x6931d81237468391|P

In [80]:
##### Filtro Excluyente ##### 
# removemos los proveedores.
df_further_filtered = df_filtered_uniques.filter(~df_filtered_uniques["category"].contains("supplier"))
df_further_filtered.show()
df_further_filtered.count()

+--------------------+--------------------+
|             gmap_id|            category|
+--------------------+--------------------+
|0x145c2bc56025f08...|  Mexican restaurant|
|0x146c6711a70d799...|         Bar & grill|
|0x14e0372676eb0cb...|          Hookah bar|
|0x14e198ad826a117...|Fish & chips rest...|
|0x14e3db43e40a22f...|                 Gym|
|0x14faf9b542b970f...|                 Gym|
|0x1525d35eb74ccdf...|Breakfast restaurant|
|0x152682cdb239941...|                 Bar|
|0x40569e72d3c238e...|          Restaurant|
|0x406607f9019940b...|    Pizza restaurant|
|0x40660fd6818fdb1...|                 Gym|
|0x4070c364e137a8a...|    Pizza restaurant|
|0x407636ffaaab494...|          Restaurant|
|0x4108778147262b9...|Performing arts t...|
|0x410be7021527430...|                 Gym|
|0x438e3003f933c30...|               Hotel|
|0x4ca5fecdf839b93...|           Snack bar|
|0x4ca8ad3fffffffa...|          Restaurant|
|0x4ca8bcc4c9f3b7f...|Performing arts t...|
|0x4cad85e6d5f862d...|          

15586

In [81]:
df_selected_gmap_id = df_further_filtered.select("gmap_id")

In [82]:
# Filtramos el dataset original con las ocurrencias encontradas.
df_filtered = df_cached.join(df_selected_gmap_id, on="gmap_id", how="inner").dropDuplicates(["gmap_id"])
df_filtered.show()
df_filtered.count()

+--------------------+--------------------+--------------------+----------+--------------------+------------------+------------------+--------------------+--------------+--------------------+
|             gmap_id|                MISC|             address|avg_rating|            category|          latitude|         longitude|                name|num_of_reviews|    relative_results|
+--------------------+--------------------+--------------------+----------+--------------------+------------------+------------------+--------------------+--------------+--------------------+
|0x145c2bc56025f08...|{[Wheelchair acce...|La Posada Del Rey...|       4.8|[Mexican restaurant]|        29.5051316|       -98.4581639|   La Posada Del Rey|             8|[0x865cf6cdb7fee6...|
|0x14e198ad826a117...|{[Wheelchair acce...|Bellflower Fish &...|       4.2|[Fish & chips res...|         33.888707|      -118.1377589|Bellflower Fish &...|             5|                NULL|
|0x407636ffaaab494...|{NULL, NULL, NULL.

15586

In [83]:
# Liberamos el cache.
df_cached.unpersist()

DataFrame[MISC: struct<Accessibility:array<string>,Activities:array<string>,Amenities:array<string>,Atmosphere:array<string>,Crowd:array<string>,Dining options:array<string>,From the business:array<string>,Getting here:array<string>,Health & safety:array<string>,Highlights:array<string>,Offerings:array<string>,Payments:array<string>,Planning:array<string>,Popular for:array<string>,Recycling:array<string>,Service options:array<string>>, address: string, avg_rating: double, category: array<string>, gmap_id: string, latitude: double, longitude: double, name: string, num_of_reviews: bigint, relative_results: array<string>]

# Funcion ETL para metadata-sitios

In [4]:
# variables globales
CATEGORIES_INCLUDE = ["resort", "bar", "hotel", "sport", "turism", "gym", "theater", "pub", "restaurant", "food"]
CATEGORIES_EXCLUDE = ["supplier"]
REGEX_PATTERN_INCLUDE = r"(?i)\b(" + "|".join([re.escape(word) for word in CATEGORIES_INCLUDE]) + r")\b"
REGEX_PATTERN_EXCLUDE = r"(?i)\b(" + "|".join([re.escape(word) for word in CATEGORIES_EXCLUDE]) + r")\b"

# funcion
def metadata_treatment(spark_df):
    # Dropeado de columnas
    spark_df = spark_df.drop(*["description", "hours", "price", "state", "url"])

    # Explosion and Inclusion and exclusion. Extraemos las gmaps_id deseadas
    select_gmap_id = spark_df.select("gmap_id", explode("category").alias("category")).filter(col("category").rlike(REGEX_PATTERN_INCLUDE))\
        .filter(~col("category").rlike(REGEX_PATTERN_EXCLUDE)).dropDuplicates(["gmap_id"]).select("gmap_id")

    # filtramos el dataset original en base a la lista obtenida
    spark_df_filtered = spark_df.join(select_gmap_id, on="gmap_id", how="inner").dropDuplicates(["gmap_id"])

    # Anexamos la columna "state" en referencia al estado . ie. california: CA.
    state_regexp = r",\s([A-Z]{2})\s\d{5}"
    spark_df_filtered = spark_df_filtered.withColumn("state", regexp_extract("address", state_regexp, 1))
    spark_df_filtered = spark_df_filtered.dropna(subset="state")
    
    return spark_df_filtered


In [9]:
# Test
filtered_df = metadata_treatment(df_cached)

In [10]:
print(filtered_df.count())
filtered_df.show()

15530
+--------------------+--------------------+--------------------+----------+--------------------+------------------+------------------+--------------------+--------------+--------------------+-----+
|             gmap_id|                MISC|             address|avg_rating|            category|          latitude|         longitude|                name|num_of_reviews|    relative_results|state|
+--------------------+--------------------+--------------------+----------+--------------------+------------------+------------------+--------------------+--------------+--------------------+-----+
|0x145c2bc56025f08...|{[Wheelchair acce...|La Posada Del Rey...|       4.8|[Mexican restaurant]|        29.5051316|       -98.4581639|   La Posada Del Rey|             8|[0x865cf6cdb7fee6...|   TX|
|0x14e198ad826a117...|{[Wheelchair acce...|Bellflower Fish &...|       4.2|[Fish & chips res...|         33.888707|      -118.1377589|Bellflower Fish &...|             5|                NULL|   CA|
|0x4

In [11]:
# Guardamos el esquema de salida.
output_schema = filtered_df.schema

In [105]:
filtered_df.printSchema()

root
 |-- gmap_id: string (nullable = true)
 |-- MISC: struct (nullable = true)
 |    |-- Accessibility: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Activities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Amenities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Atmosphere: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Crowd: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Dining options: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- From the business: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Getting here: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- Health & safety: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- High

# Rutina para union de datasets metadata
Definimos una rutina para aplicar el ETL a cada archivo presente.

In [12]:
# Definición del folder
folder_path = r"F:\DataScience\PF - DataNova\datasets\Google Maps\metadata-sitios"

### Opcion 1: Crear un solo archivo y ir anexando cada dataset despues de filtrado

In [13]:
count = 0

for file_name in os.listdir(folder_path):
    
    # recuperamos el nombre de cada archivo.
    file_path = os.path.join(folder_path, file_name)

    # cargamos el archivo a un df de spark
    df = spark.read.schema(output_schema).json(file_path)

    # aplicamos los métodos ETL.
    df_ETLed = metadata_treatment(df)

    if count == 0:
        union_df = df_ETLed.select("*")
    else:
        union_df = union_df.union(df_ETLed)

    count+=1
    print("Iteración: ", count)
    


Iteración:  1
Iteración:  2
Iteración:  3
Iteración:  4
Iteración:  5
Iteración:  6
Iteración:  7
Iteración:  8
Iteración:  9
Iteración:  10
Iteración:  11


In [14]:
union_df.count()

296047

In [17]:
union_df.write.mode("overwrite").json(r"F:\DataScience\PF - DataNova\datasets\summarized\google\metadata-sitios-summ.json")

Py4JJavaError: An error occurred while calling o631.json.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:784)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


# Finalización tarea de Spark

In [53]:
spark.stop()