In [1]:
import pandas as pd
import requests
import os
from dotenv import load_dotenv 
from nltk.sentiment import SentimentIntensityAnalyzer
import nltk

A partir de la API publica de yelp, puedo realizar ingesta de datos referente a los locales y sus cacacteristicas junto a reviews de estos.

## Business Yelp

Ingesta de de los locales que son restaurantes y pertencen a los estados trabajados.


In [2]:

load_dotenv('../extras/.env') # Cargo ela archivo donde esta la variable de entorno.
api_key_yelp =  os.getenv("API_KEY_YELP") # Cargo la variable de entorno

In [None]:
def state_normalize(state):
    if state == 'CA':
        return 'California'
    elif state == 'FL':
        return 'Florida'
    elif state == 'NJ':
        return 'New Jersey'
    elif state == 'IL':
        return 'Illinois'

In [3]:
def business(state):
    url = f'https://api.yelp.com/v3/businesses/search'

    params = {
        'location': state,
        'categories':','.join(['restaurant','Restaurant','restaurants','Restaurants']),
        'limit':50
    }

    headers = {
        'Authorization': f'Bearer {api_key_yelp}',
        'accept': 'application/json'
    }

    response = requests.get(url, headers=headers,params=params)

    if response.status_code == 200:
        data = response.json()
        # Convierte el JSON en un DataFrame de pandas
        #df = pd.json_normalize(data)
        businesses = pd.json_normalize(data['businesses'])
        return businesses
    else:
        print(f'Error en la solicitud. Código de estado: {response.status_code}')
        print(response.text)

In [4]:
yelp_bussines = business('TX')
yelp_bussines['categories'] = yelp_bussines['categories'].apply(lambda x: [item['title'] for item in x] if isinstance(x, list) else [])


In [None]:
url = f'https://api.yelp.com/v3/businesses/search'

yelp_bussines = pd.DataFrame()
for state in ['CA','FL','NJ','IL']:
    businesses = business(state)
    yelp_bussines = pd.concat([businesses,yelp_bussines])

In [None]:
yelp_bussines.shape[0]


Como puede verse cargue datos de locales para los estados a usar, pero solo se ingestar 200 locales, y se consumieron 4 consultas de las 500

Voy a realizar un etl para dejar los datos, igual a los del Data Wherehouse

In [None]:
yelp_bussines.columns

In [None]:

yelp_bussines['categories'] = yelp_bussines['categories'].apply(lambda x: [item['title'] for item in x] if isinstance(x, list) else [])


yelp_bussines.rename(columns={
    'id':'business_id',
    'name':'name',
    'coordinates.latitude':'latitude',
    'coordinates.longitude':'longitude',
    'categories':'categories',
    'rating':'stars',
    'location.state':'state',
    
},inplace=True)
yelp_bussines['state'] = yelp_bussines['state'].apply(state_normalize)
columnas = ['business_id', 'name', 'latitude','longitude','categories','stars','state']
yelp_bussines = yelp_bussines[columnas]
# funcion que elimina los locales que ya existen

## Reviews Yelp


In [None]:
def ingest_reviews_yelp(business_id):
    url =f'https://api.yelp.com/v3/businesses/{business_id}/reviews?limit=50&sort_by=yelp_sort"'

    headers = {
        'Authorization': f'Bearer {api_key_yelp}',
        'accept': 'application/json'
    }

    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        data = response.json()
        reviews_list = data.get('reviews', [])
        return  pd.json_normalize(reviews_list)

    else:
        print(f'Error en la solicitud. Código de estado: {response.status_code}')


In [None]:
#funcion que retorna una lista con los id que no existan ya en el DW
business_id = 'eEOYSgkmpB90uNA7lDOMRA'
reviews_yelp = ingest_reviews_yelp(business_id)
reviews_yelp['business_id'] = business_id
reviews_yelp.shape[0]

Puede verse que cada review se saca a partir de un business id por ende, con 500 consultas que es el limite diario se obtendrian reviews solo para 500 locales.

In [None]:
sid = SentimentIntensityAnalyzer()
nltk.download('vader_lexicon')
def puntajeNLP(x):
    if x > 1.5:
        return 2 # Positivo
    elif x >= 1:
        return 0 # Neutro
    else: 
        return 1 # Negativo
analisis = reviews_yelp['text'].apply(lambda x: sid.polarity_scores(x)["compound"])
valorEstrellas = reviews_yelp['rating'] / 5 
analisis += valorEstrellas
analisis = analisis.apply(lambda x: puntajeNLP(x))
reviews_yelp['text'] = analisis
reviews_yelp['sentiment'] = reviews_yelp['text'].astype('int')



reviews_yelp.rename(columns={
    'id':'review_id',
    'user.id':'user_id',
    'business_id':'business_id',
    'sentiment':'sentiment',
    'time_created':'date',
    'user.name':'name'
},inplace=True)

columns= ['review_id','user_id','business_id','sentiment','date','name']
reviews_yelp = reviews_yelp[columns]

In [None]:
reviews_yelp

In [None]:

reviews_yelp['date'] = pd.to_datetime(reviews_yelp['date'],unit='s')
reviews_yelp['date'].max()

In [None]:
import sys
sys.path.append('../../extras/')
import s3
bucket_nombre = 'projectgoogleyelp'
objeto_key = 'yelp/reviews_yelp.parquet.gz'
destino_archivo = '../../extras/asd.parquet.gz'
# Llama a la función para descargar el objeto desde S3
s3.download_file(bucket_nombre,objeto_key,destino_archivo)


In [None]:
yelp_bussines = pd.read_parquet(destino_archivo)

In [None]:
reviews_yelp

In [None]:
pd.to_datetime(yelp_bussines['date']).max() > reviews_yelp['date'].max()

In [None]:
reviews_yelp[((pd.to_datetime(yelp_bussines['date']).max())<reviews_yelp['date']) & (~reviews_yelp['review_id'].isin(yelp_bussines['review_id']))]


In [None]:
yelp_bussines['categories'] = yelp_bussines['categories'].apply(lambda x: [item['title'] for item in x] if isinstance(x, list) else [])


In [14]:
categories_data = []
for index, row in yelp_bussines.iterrows():
    local_id = row['id']
    categories = row['categories']
    for category in categories:
        categories_data.append({'business_id': local_id, 'categoria': category})

# Crear un nuevo DataFrame para la tabla de categorías
categorias_df = pd.DataFrame(categories_data)
#Consultar la base de datos  de categorias 

In [27]:
df = categorias_df.drop_duplicates(subset='business_id').drop(columns='categoria')
df['categoria'] = 'Restaurants'
df

Unnamed: 0,business_id,categoria
0,ZUtFIxtTKRRhZ2fuvl6_cg,Restaurants
2,237MSviehmCzD_wwKg4-zA,Restaurants
5,TqbzDpUI9_SREzH_AfejlQ,Restaurants
8,No7l5_GrsGBIu0OdvLslrw,Restaurants
9,MtrXTHsxOCw0iCtOlPhrGQ,Restaurants
11,yTnMkgBS19T90hxWOEOy4A,Restaurants
13,I1VlZBW80UL3E3NvahlqXg,Restaurants
16,vg2_-zFEzbKJb5MKyU9q4g,Restaurants
19,Eaani_7M6xAsasMnG4t5HQ,Restaurants
22,x4WealiPVG-tzwzhe7nvpw,Restaurants


In [None]:

categories_data = []
for index, row in yelp_bussines.iterrows():
    local_id = row['id']
    categories = row['categories']
    for category in categories:
        categories_data.append({'business_id': local_id, 'categoria': category})

# Crear un nuevo DataFrame para la tabla de categorías
categorias_df = pd.DataFrame(categories_data)
#Consultar la base de datos  de categorias 

In [9]:
df_1 = pd.read_parquet('../processed/yelp/bussiness_yelp.parquet.gz')

In [None]:
df_filtered = df_1[df_1['categories'].apply(lambda x: 'Mediterranean' in x)]
df_filtered

In [None]:
categorias_df


1-COMO DESAGREGAR LAS CATEGORIAS NUEVAS INGESTADAS<br>
2- DESAGREGAR LAS LOS USUARIOS INGESTADOS

In [16]:
import pandas as pd
import pymysql as mysql

# Configura la conexión a la base de datos
conexion = mysql.connect(host = 'servidorgrupo.cpfbmucjyznh.us-east-2.rds.amazonaws.com',
                         user = 'admin',
                         password = '1533542415',
                         database='QUANTYLE_ANALITICS')


In [17]:
# Crea un cursor para ejecutar consultas SQL
cursor = conexion.cursor()

# Ejecuta una consulta para obtener el nombre de las tablas
cursor.execute("SHOW TABLES")

# Obtiene los resultados de la consulta
tablas = cursor.fetchall()

In [18]:
tablas

(('categories',),
 ('categories_google',),
 ('categories_yelp',),
 ('google',),
 ('reviews_google',),
 ('reviews_yelp',),
 ('state',),
 ('user_yelp',),
 ('yelp',))

In [22]:
tables  = ['categories','categories_yelp','reviews_yelp','user_yelp','yelp']

In [20]:
def get_table(table_name):
    #iniciarmysql
    cursor = conexion.cursor()
    consulta = f"SELECT * FROM {table_name}"
    cursor.execute(consulta)
    # Obtiene los resultados de la consulta
    resultados = cursor.fetchall()
    # Obtiene los nombres de las columnas
    columnas = [columna[0] for columna in cursor.description]
    # Crea un DataFrame de Pandas con los resultados y los nombres de las columnas
    df = pd.DataFrame(resultados, columns=columnas)
    cursor.close()
    #cerrarmysql
    return df

In [23]:
for tabla in tables:
    print(get_table(tabla))

Empty DataFrame
Columns: [categories_id, name]
Index: []
Empty DataFrame
Columns: [id, business_id, categories_id]
Index: []
Empty DataFrame
Columns: [review_id, user_id, bussiness_id, sentiment, date]
Index: []
Empty DataFrame
Columns: [user_id, name, creation, review_count, useful, fans, stars]
Index: []
Empty DataFrame
Columns: [bussiness_id, name, latitude, longitude, stars, state_id]
Index: []


In [12]:
df_new_data = pd.read_parquet('../processed/yelp/bussiness_yelp.parquet.gz')
categories_data = []
for index, row in df_new_data.iterrows():
    local_id = row['business_id']
    categories = row['categories']
    for category in categories:
        categories_data.append({'business_id': local_id, 'categoria': category})

# Crear un nuevo DataFrame para la tabla de categorías
categorias_df = pd.DataFrame(categories_data)
#Consultar la base de datos  de categorias 

In [2]:
import pandas as pd
df_new_data = pd.read_parquet('../processed/yelp/reviews_yelp.parquet.gz')


In [5]:
print(df_new_data.columns)

Index(['review_id', 'user_id', 'business_id', 'sentiment', 'date'], dtype='object')


In [3]:
df_new_data.groupby('user_id').agg({
    'review_id':'count',
    
    
})

0         2016-07-25 07:31:06
1         2017-06-28 01:04:59
2         2017-01-14 23:31:35
3         2016-07-14 23:09:38
4         2014-11-30 07:35:54
                  ...        
1138330   2021-08-19 02:30:16
1138331   2021-09-12 05:20:37
1138332   2021-04-25 13:44:37
1138333   2021-10-06 01:52:41
1138334   2021-04-02 13:54:12
Name: date, Length: 1138335, dtype: datetime64[ns]

In [65]:
categories_acualizada = get_table('categories')
categorias_yelp_new =  pd.merge(categories_acualizada,categorias_new_data,left_on='name',right_on='categoria',how='inner')
categorias_yelp_new = categorias_yelp_new[['business_id','categories_id']]
cursor = conexion.cursor()
consulta = "INSERT INTO categories_yelp  VALUES(NULL,%s,%s)"
cursor.executemany(consulta, categorias_yelp_new.to_list())
conexion.commit()

Unnamed: 0,business_id,categoria
0,eEOYSgkmpB90uNA7lDOMRA,Vietnamese
1,eEOYSgkmpB90uNA7lDOMRA,Food
2,eEOYSgkmpB90uNA7lDOMRA,Restaurants
3,eEOYSgkmpB90uNA7lDOMRA,Food Trucks
4,0bPLkL0QhhPO5kt1_EXmNQ,Food
...,...,...
65142,UBQAksw81m0sMrAd8g-ECg,Restaurants
65143,UBQAksw81m0sMrAd8g-ECg,Mexican
65144,FBTKjIHyMk8V4frov04ClQ,Restaurants
65145,FBTKjIHyMk8V4frov04ClQ,Pizza


In [None]:
def yelp_ER():
    
    extract_api = extract_businesses() # Extraigo los datos de la API referentes a los estados seleccionados y restaurantes
    yelp_new_data = transform_business(extract_api) # Realizo las trasnformaciones necesarias para que los datos esten limpios
    yelp_origen = get_table('yelp') # Cargo de la base de datos la tabla de yelp en un dataframe
    
    yelp_new_data = yelp_new_data[~(yelp_new_data['business_id'].isin(yelp_origen['bussiness_id']))] #De los restaurantes extraidos tomo solo los que su id NO esta en la DB
    
    conexion = mysql_get_connection() # Genero una conexion a mysql
    cursor = conexion.cursor() 
    
    consulta = "INSERT INTO yelp  VALUES(%s,%s,%s,%s,%s,%s)" 
    cursor.executemany(consulta, yelp_new_data.drop(columns='categories').values.tolist()) # Inserto los nuevos locales, sin insertar las categorias
    
    conexion.commit()
    conexion.close()
    
    categories_origen = get_table('categories') # Cargo la tabla de categorias de la base de datos.    
    
    categorias_new_data = get_categoires(yelp_new_data) # Funcion que recibe el DF con las categorias como listas, y devuelve otro con bunisess_id y el nombre de cada categoria.
    
    categorias_new = categorias_new_data[~(categorias_new_data['categoria'].isin(categories_origen['name']))] # Selecciono las categorias que no estan en la DB
    categories = categorias_new.drop_duplicates(subset='categoria')['categoria'].values.tolist() # Elimino las categorias duplicadas y las convierto en lista de listas.
    categorias_para_agregar = [(int(indice), valor) for indice, valor in enumerate(categories, start=(categories_origen['categories_id'].max()+1))] # En funcion del id maximo que hay en DB genero lista de tuplas con id y id de categoria
    
    conexion = mysql_get_connection() 
    cursor = conexion.cursor()
    
    # Ingesto las nuevas categorias.
    consulta = "INSERT INTO categories  VALUES(%s,%s)"
    cursor.executemany(consulta, categorias_para_agregar)
    
    conexion.commit()
    conexion.close()
    
    
    categories_acualizada = get_table('categories') # Cargo la tabla de categorias actualizada.
    
    #Hago un join entre la tabla business_id,categoria creada anteriormente con las categorias de la BD, y me quedo solo con business_id y categoria id
    categorias_yelp_new =  pd.merge(categories_acualizada,categorias_new_data,left_on='name',right_on='categoria',how='inner')
    
    conexion = mysql_get_connection()
    
    # Como business id ya es unico simplemente agrego las filas a la tabla cateogires_yelp
    categorias_yelp_new = categorias_yelp_new[['business_id','categories_id']]
    cursor = conexion.cursor()
    consulta = "INSERT INTO categories_yelp  VALUES(NULL,%s,%s)"
    cursor.executemany(consulta, categorias_yelp_new.values.tolist())
    conexion.commit()
    conexion.close()
    

[bussiness_id, name, latitude, longitude, stars, state_id]

In [None]:
def get_categories(df):
    #Convierto las categorias de cada fila que estan en listas, a una tabla de id_,categoria.
    categories_data = []
    for index, row in df.iterrows():
        local_id = row['business_id']
        categories = row['categories']
        for category in categories:
            categories_data.append({'business_id': local_id, 'categoria': category})

    # Crear un nuevo DataFrame para la tabla de categorías
    categorias_new_data = pd.DataFrame(categories_data)