In [None]:
import os
import requests
import csv
import time
import json
from google.cloud import storage

# Configurar la autenticación con la clave de servicio JSON
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "proyectofinalgogleyelp-41e96ec7a40a.json"

# Configuración de APIs (REEMPLAZAR CON TUS KEYS)
GOOGLE_API_KEY = "AIzaSyBVFqawIp7wy2kTnY7ZVt9veOS7-yKfohU"
YELP_API_KEY = "81MeibfW8d_OHhY9oRupbSWAy3cBuNrvOZJPNC3hQWkPa-ZFMIdDKeN3_pu6G8cVOvbR-1r8hIPxWqQV_u56wDQPPHr4kDve44nvtlxE7e3RMrq2M72jXM9ZXwTPZ3Yx"

# Configuración de Google Cloud Storage
BUCKET_NAME = "dataset-pf-gyelp"
FOLDER_PATH = "Yelp/airFlow/raw/"

# Inicializar cliente de Google Cloud Storage
storage_client = storage.Client()

def clean_bucket(bucket_name, folder_path):
    """Elimina archivos en el bucket antes de subir nuevos datos."""
    try:
        bucket = storage_client.bucket(bucket_name)
        blobs = list(bucket.list_blobs(prefix=folder_path))
        for blob in blobs:
            blob.delete()
        print(f"Carpeta {folder_path} en el bucket {bucket_name} limpiada.")
    except Exception as e:
        print(f"Error al limpiar el bucket: {str(e)}")

def fetch_google_restaurants(location="California", pages=3):
    """Obtiene datos de restaurantes desde Google Places API."""
    base_url = "https://maps.googleapis.com/maps/api/place/textsearch/json"
    restaurants = []
    next_page_token = None
    
    for _ in range(pages):
        params = {
            "query": f"restaurants in {location}",
            "key": GOOGLE_API_KEY,
            "type": "restaurant",
            "language": "es",
            "pagetoken": next_page_token or ""
        }
        response = requests.get(base_url, params=params)
        data = response.json()

        if "results" in data:
            for result in data["results"]:
                restaurants.append({
                    "Business_ID": result.get("place_id"),
                    "Name": result.get("name"),
                    "Address": result.get("formatted_address"),
                    "Rating": result.get("rating"),
                    "Latitude": result.get("geometry", {}).get("location", {}).get("lat"),
                    "Longitude": result.get("geometry", {}).get("location", {}).get("lng"),
                })
        
        next_page_token = data.get("next_page_token")
        if not next_page_token:
            break
        time.sleep(2)  # Evitar restricciones de API
    
    return restaurants

def fetch_yelp_restaurants(location="California"):
    """Obtiene datos de restaurantes desde Yelp API."""
    url = "https://api.yelp.com/v3/businesses/search"
    headers = {"Authorization": f"Bearer {YELP_API_KEY}"}
    params = {"location": location, "term": "restaurants", "limit": 50}
    
    response = requests.get(url, headers=headers, params=params)
    data = response.json()
    
    return [{
        "Business_ID": business.get("id"),
        "Name": business.get("name"),
        "Address": ", ".join(business.get("location", {}).get("display_address", [])),
        "Rating": business.get("rating"),
        "Latitude": business.get("coordinates", {}).get("latitude"),
        "Longitude": business.get("coordinates", {}).get("longitude"),
    } for business in data.get("businesses", [])]

def fetch_yelp_reviews(business_id):
    """Obtiene reseñas de Yelp para un negocio."""
    url = f"https://api.yelp.com/v3/businesses/{business_id}/reviews"
    headers = {"Authorization": f"Bearer {YELP_API_KEY}"}
    
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        data = response.json()
        return [{
            "Review_ID": review.get("id"),
            "Business_ID": business_id,
            "User_ID": review.get("user", {}).get("id"),
            "Rating": review.get("rating"),
            "Text": review.get("text"),
            "Date": review.get("time_created"),
        } for review in data.get("reviews", [])]
    else:
        print(f"Error al obtener reseñas para {business_id}: {response.status_code}")
        return []

def save_to_csv(data, filename):
    """Guarda los datos en CSV."""
    if not data:
        print(f"No hay datos para guardar en {filename}")
        return
    
    keys = data[0].keys()
    with open(filename, 'w', newline='', encoding='utf-8') as output_file:
        dict_writer = csv.DictWriter(output_file, fieldnames=keys)
        dict_writer.writeheader()
        dict_writer.writerows(data)
    print(f"Archivo guardado: {filename}")

def upload_to_bucket(source_file, bucket_name, folder_path):
    """Sube archivos a Google Cloud Storage."""
    try:
        bucket = storage_client.bucket(bucket_name)
        destination_blob = f"{folder_path}{os.path.basename(source_file)}"
        blob = bucket.blob(destination_blob)
        blob.upload_from_filename(source_file)
        print(f"Archivo {source_file} subido a gs://{bucket_name}/{destination_blob}")
    except Exception as e:
        print(f"Error al subir {source_file}: {str(e)}")

# Ejecución principal
if __name__ == "__main__":
    try:
        print("Limpiando bucket...")
        clean_bucket(BUCKET_NAME, FOLDER_PATH)
        
        print("Obteniendo datos de Google y Yelp...")
        google_data = fetch_google_restaurants()
        yelp_data = fetch_yelp_restaurants()

        reviews_data = []
        for business in yelp_data:
            reviews_data.extend(fetch_yelp_reviews(business["Business_ID"]))
        
        print("Guardando datos en CSV...")
        save_to_csv(google_data + yelp_data, "business_cleaned.csv")
        save_to_csv(reviews_data, "reviews_cleaned.csv")
        
        print("Subiendo archivos a Cloud Storage...")
        upload_to_bucket("business_cleaned.csv", BUCKET_NAME, FOLDER_PATH)
        upload_to_bucket("reviews_cleaned.csv", BUCKET_NAME, FOLDER_PATH)
        
        print("Proceso completado con éxito.")
    except Exception as e:
        print(f"Error: {str(e)}")


Limpiando bucket...
Carpeta Yelp/airFlow/raw/ en el bucket dataset-pf-gyelp limpiada.
Obteniendo datos de Google y Yelp...
Error al obtener reseñas para 2g3Af4y33ZMBLJzulZvbzQ: 404
Error al obtener reseñas para UAVRQRI52qclZPyHE_kWpA: 404
Error al obtener reseñas para 6zzzmpb5FlF4yTi8VnWcIw: 404
Error al obtener reseñas para gwANdrsPZr3ZVAqN52vwqg: 404
Error al obtener reseñas para a7QtM-9nN1W_eN8IUni6Jw: 404
Error al obtener reseñas para Ee3db0MVJ_fW-ChXmXoTtQ: 404
Error al obtener reseñas para O7jIKYL3p_MNl6alOMeL3Q: 404
Error al obtener reseñas para _2TZqeKiJC_qWH9QmITmmw: 404
Error al obtener reseñas para 8rmEBlB0_qBSy0P3u-kWjw: 404
Error al obtener reseñas para ztO9uuR8yQIOFCveEBW2MA: 404
Error al obtener reseñas para HUL6rCJwpFOiSdk4p4wn0w: 404
Error al obtener reseñas para mv5ftJ5xlMDEp9ORBYX7Tw: 404
Error al obtener reseñas para Yf741qv9lkVQzG636pVIGg: 404
Error al obtener reseñas para l_D9msy8yEAb00dPZiqfIA: 404
Error al obtener reseñas para O7A7Z752I1YVA-f4rmofHA: 404
Error a



Archivo business_cleaned.csv subido a gs://dataset-pf-gyelp/Yelp/airFlow/raw/business_cleaned.csv
Archivo reviews_cleaned.csv subido a gs://dataset-pf-gyelp/Yelp/airFlow/raw/reviews_cleaned.csv
Proceso completado con éxito.


: 

In [1]:
from google.cloud import storage
import os
import requests
import csv
import pandas as pd

gcs_client = storage.Client()
BUCKET_NAME = "dataset-pf-gyelp"
RAW_FOLDER = "Yelp/airFlow/raw/"
PROCESSED_FOLDER = "Yelp/airFlow/processed/"

def clear_bucket_folder(folder):
    """Elimina todos los archivos en una carpeta del bucket."""
    bucket = gcs_client.bucket(BUCKET_NAME)
    blobs = list(bucket.list_blobs(prefix=folder))
    
    if not blobs:
        print(f"No hay archivos en {BUCKET_NAME}/{folder} para eliminar.")
        return
    
    for blob in blobs:
        blob.delete()
    print(f"Se eliminaron {len(blobs)} archivos en {BUCKET_NAME}/{folder}")

def fetch_google_restaurants():
    """Obtiene datos de restaurantes de Google Places API y los guarda en un CSV."""
    GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
    url = "https://maps.googleapis.com/maps/api/place/textsearch/json"
    params = {"query": "restaurants in California", "key": GOOGLE_API_KEY}
    response = requests.get(url, params=params)
    data = response.json()
    
    restaurants = [
        {"Business_ID": r.get("place_id"), "Name": r.get("name"), "Address": r.get("formatted_address"), "Rating": r.get("rating")} 
        for r in data.get("results", [])
    ]
    save_to_csv(restaurants, "google_restaurants.csv", RAW_FOLDER)

def fetch_yelp_restaurants():
    """Obtiene datos de restaurantes de Yelp API y los guarda en un CSV."""
    YELP_API_KEY = os.getenv("YELP_API_KEY")
    url = "https://api.yelp.com/v3/businesses/search"
    headers = {"Authorization": f"Bearer {YELP_API_KEY}"}
    params = {"location": "California", "term": "restaurants", "limit": 50}
    response = requests.get(url, headers=headers, params=params)
    data = response.json()
    
    restaurants = [
        {"Business_ID": b.get("id"), "Name": b.get("name"), "Address": ", ".join(b.get("location", {}).get("display_address", [])), "Rating": b.get("rating")} 
        for b in data.get("businesses", [])
    ]
    save_to_csv(restaurants, "yelp_restaurants.csv", RAW_FOLDER)

if __name__ == "__main__":
    # Limpiar los buckets antes de iniciar
    clear_bucket_folder(RAW_FOLDER)
    clear_bucket_folder(PROCESSED_FOLDER)

    # Ejecutar las funciones de recolección y procesamiento
    fetch_google_restaurants()
    fetch_yelp_restaurants()



DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.