# Extracción de datos Datasets Google Maps y YELP


## importaciones


In [7]:
import os
import pandas as pd
import numpy as np
import json
import datetime
import geojson
from shapely.geometry import Point, Polygon, shape
from shapely.errors import TopologicalError
from fuzzywuzzy import process, fuzz

### Definiciones y carpetas


Creación de diccionario de Estados


In [8]:
state_dictionary = {
    "AL": "Alabama",
    "AK": "Alaska",
    "AZ": "Arizona",
    "AR": "Arkansas",
    "CA": "California",
    "CO": "Colorado",
    "CT": "Connecticut",
    "DE": "Delaware",
    "FL": "Florida",
    "GA": "Georgia",
    "HI": "Hawaii",
    "ID": "Idaho",
    "IL": "Illinois",
    "IN": "Indiana",
    "IA": "Iowa",
    "KS": "Kansas",
    "KY": "Kentucky",
    "LA": "Louisiana",
    "ME": "Maine",
    "MD": "Maryland",
    "MA": "Massachusetts",
    "MI": "Michigan",
    "MN": "Minnesota",
    "MS": "Mississippi",
    "MO": "Missouri",
    "MT": "Montana",
    "NE": "Nebraska",
    "NV": "Nevada",
    "NH": "New Hampshire",
    "NJ": "New Jersey",
    "NM": "New Mexico",
    "NY": "New York",
    "NC": "North Carolina",
    "ND": "North Dakota",
    "OH": "Ohio",
    "OK": "Oklahoma",
    "OR": "Oregon",
    "PA": "Pennsylvania",
    "RI": "Rhode Island",
    "SC": "South Carolina",
    "SD": "South Dakota",
    "TN": "Tennessee",
    "TX": "Texas",
    "UT": "Utah",
    "VT": "Vermont",
    "VA": "Virginia",
    "WA": "Washington",
    "WV": "West Virginia",
    "WI": "Wisconsin",
    "WY": "Wyoming",
}

diccionario de geolocacización


In [9]:
with open("Geojson/us-states.json", "r") as f: 
    data_geo_1 = geojson.load(f)
with open("Geojson/geojson-counties-fips.json", "r") as f:
    data_geo_2 = geojson.load(f)


def get_state(lat, lon):
    def search_states(data_geo, state_dict):
        for feature in data_geo["features"]:
            try:
                geometry_type = feature["geometry"]["type"]
                if geometry_type == "Polygon":
                    coordinates = feature["geometry"]["coordinates"][0]
                elif geometry_type == "MultiPolygon":
                    coordinates = [
                        sub_coords[0]
                        for sub_coords in feature["geometry"]["coordinates"]]
                else: continue
                polygon = Polygon(coordinates)
                if polygon.contains(Point(lon, lat)):
                    state_abbr = feature["properties"]["STATE"]
                    return state_dict[state_abbr]
            except (KeyError, IndexError): pass
        return None

    state = search_states(data_geo_1, state_dictionary_inv)
    if state: return state
    state = search_states(data_geo_2, state_index_inv)
    return state


def get_state_ab(address):
    try:
        state = address.split(", ")[-1].split(" ")[0]
        if state in list(state_dictionary.keys()): return state
        else: return np.nan
    except: return np.nan


state_index = pd.read_parquet("Datalake/estados.parquet")["State"].to_dict()
state_index_inv = {v: k for k, v in state_index.items()}
state_dictionary_inv = {v: k for k, v in state_dictionary.items()}

Creación de directorios si no existen


In [10]:
os.mkdir("Datalake") if not os.path.exists("Datalake") else None
os.chdir("Datalake")
for subdir in ["Google", "Yelp"]:
    os.mkdir(subdir) if not os.path.exists(subdir) else None
os.chdir("../")

# Extracción de los datasets de Google Maps


## Metadata de Sitios


Se recorren los archivos línea por línea, durante el proceso se almacenan sólo las filas que incluyan <code>Restaurant</code> en la columna de categoría, así se optimiza el tamaño del dataset final.


In [11]:
lineas_json = []

for i in range(1, 12):
    path = f"Datasets/Google Maps/metadata-sitios/{i}.json"
    try:
        with open(path, "r") as file:
            for l in file:
                try:
                    linea_j = json.loads(l)
                    if "restaurant" in " ".join(linea_j["category"]).lower():
                        lineas_json.append(linea_j)
                except: pass
    except FileNotFoundError: pass

if lineas_json:
    df = pd.DataFrame(lineas_json)
    df_maps_restaurantes = df
else: print("No se encontraron datos para procesar.")

# eliminación de locales permanentemente cerrados
df_maps_restaurantes = df_maps_restaurantes[
    df_maps_restaurantes["state"] != "Permanently closed"]

### Obtención de información de Estados


En base al campo <code>Address</code> obtenemos el estado donde se encuentra el negocio. Nos servirá para luego seleccionar los estados con más restaurantes.


In [12]:
def get_state_ab(st):
    try:
        state = st.split(", ")[-1].split(" ")[0]
        if state in list(state_dictionary.keys()): return state
        else: return np.nan
    except: return np.nan


df["state_ab"] = df["address"].apply(get_state_ab)
top_5 = df["state_ab"].value_counts().head(5).index.to_list()
df["us_state"] = df["state_ab"].map(state_dictionary)

top_5_url = [
    f"Datasets/Google Maps/reviews-estados/review-{state_dictionary[i].replace(' ', '_')}/"
    for i in top_5]

cantidad_archivos = {}
for i in top_5_url: 
	for j in os.walk(i): cantidad_archivos[i] = len(j[2])

## Reviews Estados


Ya con los estados elegidos estamos en condiciones de ingestar los datos de las carpetas correspondientes dentro del directorio <code>reviews-estados</code>.
Es información masiva lo que genera un archivo de grandes dimensiones, sin embargo previamente filtramos por el parámetro de año <code>2017-2019</code> valiéndonos del campo <code>time</code>, que tiene es un <code>timestamp</code>, pero con 3 digitos más que el usado por <code>datetime</code> de Python. Le agregamos el campo <code>Estado</code> que es más descriptivo.


In [13]:
lineas_json_revs_google = []
for i in top_5_url:
    count = 0
    for c in range(1, cantidad_archivos[i] + 1):
        file_name = f"{i}{c}.json"
        if os.path.exists(file_name):
            with open(file_name, "r", encoding="utf-8") as f:
                for s in f:
                    try:
                        linea = json.loads(s)
                        linea["anio"] = datetime.datetime.fromtimestamp(
                            linea["time"] / 1000).year
                        linea["estado"] = i.split("-")[-1][:-1]
                        lineas_json_revs_google.append(linea)
                    except: pass
    count += 1

df_revs_google = pd.DataFrame(lineas_json_revs_google)
merge_site_reviews = pd.merge(df_revs_google, df, left_on="gmap_id", right_on="gmap_id")
df_maps_reviews = merge_site_reviews

# Extracción de los Dataset de YELP


## Business


Contiene los datos de las entidades negocios de Yelp, a un primer vistazo tiene las columnas duplicadas, por lo que hay que hacer un recorte, ya que la segunda mitad tiene datos vacíos en su inmensa mayoría.


In [14]:
df_business = pd.read_pickle(r"Datasets\Yelp\business.pkl")
df_business = df_business.iloc[:, :-14]
df_business = df_business[df_business.state.isin(top_5)]


def is_restaurant(st):
    try:
        test = "".join(st).lower()
        return "restaurant" in test
    except: return False


df_business = df_business[df_business["categories"].apply(is_restaurant)]
df_yelp_restaurantes = df_business

# eliminación de locales cerrados
df_yelp_restaurantes = df_yelp_restaurantes[df_yelp_restaurantes["is_open"] != 0]

### Review


In [15]:
lineas_json_review = []

with open(r"Datasets\Yelp\review.json", "r", encoding="utf-8") as f:
    count = 0
    for i in f:
        try:
            linea = json.loads(i)
            if linea["useful"] == 1: lineas_json_review.append(linea)
        except: pass

df_reviews = pd.DataFrame(lineas_json_review)
df_reviews["funny"] = df_reviews["funny"].astype("int8")
df_reviews["stars"] = df_reviews["stars"].astype("int8")
df_reviews["cool"] = df_reviews["cool"].astype("int8")
df_reviews.drop("useful", axis=1, inplace=True, errors="ignore")

df_yelp_reviews = df_reviews

# Limpieza de columnas


Se mantendrán sólo las columnas relacionadas al Modelo Entidad Relación, al mismo tiempo, serán renombradas para tener un formato unificado, que permita la unión en un sólo dataframe según las tablas del MER.

- Se crea df_maps_reviews.review_id tomando los primeros 10 caracteres de gmap_id y user_id


In [16]:
df_maps_restaurantes = df_maps_restaurantes[[
        "name",
        "gmap_id",
        "category",
        "num_of_reviews",
        "state",
        "latitude",
        "longitude",
        "MISC",
        "avg_rating",]]
df_maps_restaurantes = df_maps_restaurantes.rename(
    columns={
        "name": "nombre",
        "gmap_id": "id_restaurante",
        "category": "categorias",
        "num_of_reviews": "cantidad_resenas",
        "state": "estado",
        "latitude": "latitud",
        "longitude": "longitud",
        "MISC": "atributos",
        "avg_rating": "calificacion",})

df_yelp_restaurantes = df_yelp_restaurantes[[
        "name",
        "business_id",
        "categories",
        "review_count",
        "state",
        "latitude",
        "longitude",
        "attributes",
        "stars",]]
df_yelp_restaurantes = df_yelp_restaurantes.rename(
    columns={
        "name": "nombre",
        "business_id": "id_restaurante",
        "categories": "categorias",
        "review_count": "cantidad_resenas",
        "state": "estado",
        "latitude": "latitud",
        "longitude": "longitud",
        "attributes": "atributos",
        "stars": "calificacion",
    }
)

df_maps_reviews["review_id"] = (
    df_maps_reviews["gmap_id"].str[:10] + df_maps_reviews["user_id"].str[:10])
df_maps_reviews["sentiment_score"] = 100
df_maps_reviews = df_maps_reviews[[
        "user_id",
        "gmap_id",
        "review_id",
        "rating",
        "anio",
        "text",
        "sentiment_score",]]
df_maps_reviews = df_maps_reviews.rename(
    columns={
        "user_id": "id_usuario",
        "gmap_id": "id_restaurante",
        "review_id": "id_resena",
        "rating": "calificacion",
        "anio": "anio",
        "text": "texto",
        "sentiment_score": "puntaje_de_sentimiento",})

df_yelp_reviews["anio"] = df_yelp_reviews["date"].str[:4]
df_yelp_reviews["sentiment_score"] = 100
df_yelp_reviews = df_yelp_reviews[[
        "user_id",
        "business_id",
        "review_id",
        "stars",
        "anio",
        "text",
        "sentiment_score",]]
df_yelp_reviews = df_yelp_reviews.rename(
    columns={
        "user_id": "id_usuario",
        "business_id": "id_restaurante",
        "review_id": "id_resena",
        "stars": "calificacion",
        "anio": "anio",
        "text": "texto",
        "sentiment_score": "puntaje_de_sentimiento",})

# Unión de Dataframes y limpieza de duplicados


Se unirán los Dataframes de restaurantes, y los dataframes de reviews, además de esto, se hará una revisión de los posibles registros duplicados y se eliminarán, esto apoyandose en los datos de longitud y latitud


In [17]:
df_restaurantes = pd.concat([df_yelp_restaurantes, df_maps_restaurantes])
df_restaurantes = df_restaurantes.drop_duplicates(subset=["id_restaurante"])


# Corrección de columna categorias
def convert_to_string(value):
    if isinstance(value, np.ndarray): return ", ".join(map(str, value))
    return str(value)

df_restaurantes["categorias"] = df_restaurantes["categorias"].apply(convert_to_string)

df_reviews = pd.concat([df_yelp_reviews, df_maps_reviews])
df_reviews = df_reviews.drop_duplicates(subset=["id_resena"])
# corrección de columna anio
df_reviews["anio"] = df_reviews["anio"].astype(int)

In [18]:
def find_similar_duplicates(df, name_col, lat_col, lon_col, threshold=90):
    duplicates = []
    seen = {}

    for index, row in df.iterrows():
        lat_lon = (float(row[lat_col]), float(row[lon_col]))
        if lat_lon in seen:
            # Chequear similitud de nombre
            result = process.extractOne(row[name_col], seen[lat_lon], scorer=fuzz.ratio)
            if result is not None:
                match, score = result[0], result[1]
                if score >= threshold: duplicates.append(index)
                else: seen[lat_lon].append(row[name_col])
        else: seen[lat_lon] = [row[name_col]]
    return duplicates


duplicate_indices = find_similar_duplicates(
    df_restaurantes, "nombre", "latitud", "longitud")
df_restaurantes = df_restaurantes.drop(index=duplicate_indices)

Además de esto, se hace un conteo de las "categorías" en las que está clasificado Subway, y se descartan todos los resturantes que no entren dentro del 20% que más se repite, esto aplicando la ley de pareto.
Además se cruzan los registros, junto a los de reviews, para eliminar los reviews no relacionados a restaurantes válidos.


In [19]:
subway_restaurants = df_restaurantes[
    df_restaurantes["nombre"].str.contains("subway", case=False, na=False)]
categorias_uniques = subway_restaurants["categorias"].unique()
categorias_separadas = [
    categorias.strip("][").split(", ") for categorias in categorias_uniques]
conteo_categorias = {}
for categorias_lista in categorias_separadas:
    for categoria in categorias_lista:
        categoria_sin_comillas = categoria.strip("'")
        if categoria_sin_comillas in conteo_categorias: 
            conteo_categorias[categoria_sin_comillas] += 1
        else: conteo_categorias[categoria_sin_comillas] = 1

total_categorias = len(conteo_categorias)
categorias_ordenadas = sorted(conteo_categorias.items(), key=lambda x: x[1])
umbral = int(total_categorias * 0.8)
categorias_filtradas = [
    categoria for categoria, conteo in categorias_ordenadas if conteo >= umbral]
df_restaurantes = df_restaurantes[
    df_restaurantes["categorias"].apply(
        lambda x: any(cat in x for cat in categorias_filtradas))]
df_merged = pd.merge(
    df_reviews, df_restaurantes[["id_restaurante"]], on="id_restaurante", how="inner")
df_reviews = df_merged.drop_duplicates().reset_index(drop=True)

# Creación de Dataframe Estados


Por necesidades del proyecto, se creará un dataframe con la información de los estados, para ser consultado, cargado y consumido.

El dataset elegido para conseguir los datos de densidad de población contiene la población del último censo, del año 2020, debe actualizarse con el nuevo censo.

En caso de faltar el archivo, se volverá a crear, sino, se leerá y actualizará los datos de población.


In [20]:
if not os.path.exists("Datalake/estados.parquet"):
    df_estados = pd.DataFrame({
            "State Abbreviation": list(state_dictionary.keys()),
            "State": [state_dictionary[abrv] for abrv in list(state_dictionary.keys())],
            "Population": [0] * len(list(state_dictionary.keys())),})
    df_poblacion = pd.read_csv("Datasets/us_pop_by_state.csv")

    for index, row in df_poblacion.iterrows():
        state_code = row["state_code"]
        population = row["2020_census"]
        if state_code in df_estados["State Abbreviation"].values:
            df_estados.loc[
                df_estados["State Abbreviation"] == state_code, "Population"
            ] = population

    df_estados["id_estado"] = range(1, len(df_estados) + 1)
    df_estados = df_estados[["id_estado", "State", "State Abbreviation", "Population"]]
else:
    df_estados = pd.read_parquet("Datalake/estados.parquet")
    df_poblacion = pd.read_csv("Datasets/us_pop_by_state.csv")
    for index, row in df_poblacion.iterrows():
        state_code = row["state_code"]
        population = row["2020_census"]
        if state_code in df_estados["State Abbreviation"].values:
            df_estados.loc[
                df_estados["State Abbreviation"] == state_code, "Population"
            ] = population

# Corrección de Estados según Geolocalización


In [21]:
state_dictionary_inv = {v: k for k, v in state_dictionary.items()}


def get_state_from_geojson(lat, lon, data_geo):
    for feature in data_geo["features"]:
        polygon = shape(feature["geometry"])
        # Verificar las diferentes estructuras de propiedades
        if "NAME" in feature["properties"]:
            state_name = feature["properties"]["NAME"]
        elif "name" in feature["properties"]:
            state_name = feature["properties"]["name"]
        else: continue
        try: 
            if polygon.contains(Point(lon, lat)):
                return state_dictionary_inv.get(state_name, None)
        except TopologicalError as e:
            print(f"TopologicalError: {e} at ({lat}, {lon})")
            return None
    return None


def get_state_1(lat, lon): return get_state_from_geojson(lat, lon, data_geo_1)
def get_state_2(lat, lon): return get_state_from_geojson(lat, lon, data_geo_2)


def safe_get_state(row):
    try: return get_state_1(row["latitud"], row["longitud"]) or get_state_2(
            row["latitud"], row["longitud"])
    except TopologicalError as e:
        print(f"Skipping row due to TopologicalError: {e}")
        return None


df_restaurantes.loc[:, "state_ab"] = df_restaurantes.apply(safe_get_state, axis=1)
df_restaurantes.drop(columns=["estado"], inplace=True)


se mapearán los estados corregidos en df_restaurantes.state_ab con su contraparte en df_estados.id_estados


In [22]:
df_restaurantes["id_estado"] = df_restaurantes["state_ab"].map(
    df_estados.set_index("State Abbreviation")["id_estado"])
df_restaurantes.dropna(subset=["id_estado"], inplace=True)
df_restaurantes = df_restaurantes.iloc[:, [1, 9, 0, 2, 3, 4, 5, 6, 7, 8]].copy()
df_restaurantes.drop(columns=["state_ab"], inplace=True)

# Extracción de Dataframes finales


Conversión de Dtypes para ahorrar memoria y almacenamiento


In [23]:
df_restaurantes.loc[:, "cantidad_resenas"] = df_restaurantes["cantidad_resenas"].astype(
    "uint32")
df_restaurantes.loc[:, "calificacion"] = df_restaurantes["calificacion"].astype(
    "float16")
df_restaurantes.loc[:, "id_estado"] = df_restaurantes["id_estado"].astype("uint8")
df_reviews.loc[:, "calificacion"] = df_reviews["calificacion"].astype("float16")
df_reviews.loc[:, "anio"] = df_reviews["anio"].astype("float16")
df_reviews.loc[:, "puntaje_de_sentimiento"] = df_reviews[
    "puntaje_de_sentimiento"].astype("float16")
df_estados.loc[:, "id_estado"] = df_estados["id_estado"].astype("uint8")
df_estados.loc[:, "Population"] = df_estados["Population"].astype("uint32")

In [24]:
try:
    df_restaurantes_existente = pd.read_parquet(r"Datalake\restaurantes.parquet")
    df_reviews_existente = pd.read_parquet(r"Datalake\reviews.parquet")
except FileNotFoundError:
    df_restaurantes_existente = pd.DataFrame()
    df_reviews_existente = pd.DataFrame()

df_restaurantes_concat = pd.concat([df_restaurantes_existente, df_restaurantes])
df_reviews_concat = pd.concat([df_reviews_existente, df_reviews])

df_restaurantes_concat["atributos"] = df_restaurantes_concat["atributos"].apply(str)
df_restaurantes_concat.to_parquet(r"Datalake\restaurantes.parquet")
df_reviews_concat.to_parquet(r"Datalake\reviews.parquet")
df_estados.to_parquet(r"Datalake\estados.parquet")