In [8]:
import os
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from google.cloud import storage
import hashlib

def generate_md5(value):
    return hashlib.md5(value.encode()).hexdigest()

def download_from_gcs(bucket_name, files, destination_folder):
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    os.makedirs(destination_folder, exist_ok=True)
    
    for file in files:
        blob = bucket.blob(file)
        destination_path = os.path.join(destination_folder, os.path.basename(file))
        blob.download_to_filename(destination_path)
        print(f"📥 Descargado: {file}")

def load_dataframes(destination_folder):
    dataframes = {}
    for file in os.listdir(destination_folder):
        if file.endswith('.csv'):
            df_name = file.replace('.csv', '').replace('_cleaned', '') + '_cleaned'
            dataframes[df_name] = pd.read_csv(os.path.join(destination_folder, file))
    return dataframes

def convert_types(dataframes):
    for name, df in dataframes.items():
        for col in df.select_dtypes(include=['object']).columns:
            df[col] = df[col].astype(str)
    return dataframes

def process_missing_values(dataframes):
    for name, df in dataframes.items():
        numeric_df = df.select_dtypes(include=[np.number])
        for col in numeric_df.columns:
            mean = df[col].mean()
            df[col] = df[col].fillna(mean)
    print("✅ Valores nulos tratados correctamente.")

def transform_data(dataframes):
    users_cleaned = dataframes['users_cleaned'][['user_id', 'name', 'review_count', 'yelping_since']].copy()
    users_cleaned.loc[:, 'yelping_since'] = pd.to_datetime(users_cleaned['yelping_since']).dt.date
    
    reviews_cleaned = dataframes['reviews_cleaned'][['review_id', 'business_id', 'user_id', 'stars', 'text', 'date']].copy()
    reviews_cleaned.rename(columns={'date': 'review_date'}, inplace=True)
    reviews_cleaned.loc[:, 'review_date'] = pd.to_datetime(reviews_cleaned['review_date']).dt.date
    reviews_cleaned.loc[:, 'stars'] = reviews_cleaned['stars'].astype(int)
    
    business_cleaned = dataframes['business_cleaned'][['business_id', 'name', 'address', 'city', 'categories', 'latitude', 'longitude', 'review_count']].copy()
    business_cleaned.rename(columns={'name': 'business_name'}, inplace=True)

    # Normalización de ciudades
    cities = business_cleaned[['city']].drop_duplicates().copy()
    cities['city_id'] = cities['city'].apply(generate_md5)
    business_cleaned = business_cleaned.merge(cities, on='city', how='left').drop(columns=['city'])
    cities = cities[['city_id', 'city']]

    # Normalización de categorías con limpieza de datos
    business_cleaned['categories'] = business_cleaned['categories'].str.split(',').str[0].str.strip().str.lower()
    categories = business_cleaned[['categories']].drop_duplicates().copy()
    categories['category_id'] = categories['categories'].apply(generate_md5)
    
    # Merge con business_cleaned y eliminación de la columna original
    business_cleaned = business_cleaned.merge(categories, on='categories', how='left').drop(columns=['categories'])
    
    # Renombramos después del merge
    categories.rename(columns={'categories': 'category'}, inplace=True)

    # **Fix: Add category_id to fact_reviews**
    reviews_cleaned = reviews_cleaned.merge(
        business_cleaned[['business_id', 'category_id']],
        on='business_id',
        how='left'
    )

    return {
        'dim_user': users_cleaned,
        'fact_reviews': reviews_cleaned,  # Now includes category_id
        'dim_business': business_cleaned,
        'dim_city': cities,
        'dim_category': categories
    }


def plot_and_export(dataframes, output_path, bucket):
    os.makedirs(output_path, exist_ok=True)
    dataframes = {k: v for k, v in dataframes.items() if v is not None}
    for name, df in dataframes.items():
        if not df.empty:
            csv_path = os.path.join(output_path, f"{name}.csv")
            df.to_csv(csv_path, index=False)
            blob = bucket.blob(f"ETL/{name}.csv")
            blob.upload_from_filename(csv_path)
            print(f"☁️ Archivo subido a GCS: ETL/{name}.csv")


In [9]:
# Configurar la autenticación con la clave de servicio JSON
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "proyectofinalgogleyelp-41e96ec7a40a.json"

# Configuración
bucket_name = "dataset-pf-gyelp"
destination_folder = "./dataWorkingon"
output_path = "./output_data"

files = [
    "Yelp/processed/reviews_cleaned.csv",
    "Yelp/processed/user_cleaned.csv",
    "Yelp/processed/users_cleaned.csv",
    "Yelp/processed/tips_cleaned.csv",
    "Yelp/processed/review_cleaned.csv",
    "Yelp/processed/business_cleaned.parquet"
    "Yelp/processed/business_cleaned.csv",
]

# Inicializar cliente de almacenamiento
client = storage.Client()
bucket = client.bucket(bucket_name)





In [10]:
# Proceso ETL
download_from_gcs(bucket_name, files, destination_folder)

📥 Descargado: Yelp/processed/reviews_cleaned.csv
📥 Descargado: Yelp/processed/user_cleaned.csv
📥 Descargado: Yelp/processed/users_cleaned.csv
📥 Descargado: Yelp/processed/tips_cleaned.csv
📥 Descargado: Yelp/processed/review_cleaned.csv


NotFound: 404 GET https://storage.googleapis.com/download/storage/v1/b/dataset-pf-gyelp/o/Yelp%2Fprocessed%2Fbusiness_cleaned.parquetYelp%2Fprocessed%2Fbusiness_cleaned.csv?alt=media: No such object: dataset-pf-gyelp/Yelp/processed/business_cleaned.parquetYelp/processed/business_cleaned.csv: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)

In [None]:
# Ruta donde se guardaron los archivos descargados
base_path = "./dataWorkingon"

def load_file(file_path):
    if os.path.exists(file_path):  
        try:
            if file_path.endswith('.csv'):
                print(f"📂 Cargando CSV: {file_path}")
                return pd.read_csv(file_path, encoding='utf-8')  # Prueba con 'latin1' si falla
            elif file_path.endswith('.parquet'):
                print(f"📂 Cargando Parquet: {file_path}")
                return pd.read_parquet(file_path)
            else:
                print(f"⚠️ Formato no soportado: {file_path}")
                return None
        except Exception as e:
            print(f"❌ Error al cargar {file_path}: {e}")
            return None  
    else:
        print(f"⚠️ Archivo no encontrado: {file_path}")
        return None

# Construir rutas de archivos con base_path
dataframes = {file: load_file(os.path.join(base_path, os.path.basename(file))) for file in files}

# ✅ Revisar qué archivos se cargaron correctamente
for file, df in dataframes.items():
    if df is not None:
        print(f"✔️ {file} cargado con {len(df)} filas")

📂 Cargando CSV: ./dataWorkingon\user_cleaned.csv


  return pd.read_csv(file_path, encoding='utf-8')  # Prueba con 'latin1' si falla


📂 Cargando CSV: ./dataWorkingon\reviews_cleaned.csv
📂 Cargando CSV: ./dataWorkingon\users_cleaned.csv


  return pd.read_csv(file_path, encoding='utf-8')  # Prueba con 'latin1' si falla


📂 Cargando CSV: ./dataWorkingon\tips_cleaned.csv
📂 Cargando CSV: ./dataWorkingon\review_cleaned.csv
📂 Cargando CSV: ./dataWorkingon\business_cleaned.csv
📂 Cargando Parquet: ./dataWorkingon\business_cleaned.parquet
✔️ Yelp/processed/user_cleaned.csv cargado con 1987897 filas
✔️ Yelp/processed/reviews_cleaned.csv cargado con 6990282 filas
✔️ Yelp/processed/users_cleaned.csv cargado con 2105597 filas
✔️ Yelp/processed/tips_cleaned.csv cargado con 908915 filas
✔️ Yelp/processed/review_cleaned.csv cargado con 4559049 filas
✔️ Yelp/processed/business_cleaned.csv cargado con 150346 filas
✔️ Yelp/processed/business_cleaned.parquet cargado con 150346 filas


In [None]:
dataframes = load_dataframes(destination_folder)


  dataframes[df_name] = pd.read_csv(os.path.join(destination_folder, file))
  dataframes[df_name] = pd.read_csv(os.path.join(destination_folder, file))


In [None]:
convert_types(dataframes)


{'business_cleaned':                    business_id                      name  \
 0       Pns2l4eNsfO8kk83dixA6A  Abby Rappoport, LAC, CMQ   
 1       mpf3x-BjTdTEA3yCZrAYPw             The UPS Store   
 2       tUFrWirKiKi_TAnsVWINQQ                    Target   
 3       MTSW4McQd7CbVtyjqoe9mw        St Honore Pastries   
 4       mWMc6_wTdE0EUBKIGXDVfA  Perkiomen Valley Brewery   
 ...                        ...                       ...   
 150341  IUQopTMmYQG-qRtBk-8QnA              Binh's Nails   
 150342  c8GjPIOTGVmIemT7j5_SyQ      Wild Birds Unlimited   
 150343  _QAMST-NrQobXduilWEqSw         Claire's Boutique   
 150344  mtGm22y5c2UHNXDFAjaPNw  Cyclery & Fitness Center   
 150345  jV_XOycEzSlTx-65W906pg                   Sic Ink   
 
                                 address           city    state postal_code  \
 0                1616 Chapala St, Ste 2  Santa Barbara  Unknown       93101   
 1       87 Grasso Plaza Shopping Center         Affton  Unknown       63123   
 2    

In [None]:
process_missing_values(dataframes)


✅ Valores nulos tratados correctamente.


In [None]:
def check_columns(dataframes):
    for name, df in dataframes.items():
        print(f"\n🔍 {name} - Columnas y tipos de datos:")
        print(df.dtypes)
        print("-" * 50)

In [None]:
check_columns(dataframes)



🔍 business_cleaned - Columnas y tipos de datos:
business_id      object
name             object
address          object
city             object
state            object
postal_code      object
latitude        float64
longitude       float64
stars           float64
review_count      int64
is_open            bool
attributes       object
categories       object
hours            object
dtype: object
--------------------------------------------------

🔍 reviews_cleaned - Columnas y tipos de datos:
review_id       object
user_id         object
business_id     object
stars          float64
useful         float64
funny          float64
cool           float64
text            object
date            object
dtype: object
--------------------------------------------------

🔍 review_cleaned - Columnas y tipos de datos:
review_id       object
user_id         object
business_id     object
stars          float64
useful         float64
funny          float64
cool           float64
text            object

In [None]:
transformed_dataframes = transform_data(dataframes)

In [None]:
print(transformed_dataframes.keys())

dict_keys(['dim_user', 'fact_reviews', 'dim_business', 'dim_city', 'dim_category'])


In [None]:
print(transformed_dataframes['fact_reviews'].head())


                review_id             business_id                 user_id  \
0  KU_O5udG6zpxOg-VcAEodg  XQfwVwDr-v0ZS3_CbbE5Xw  mh_-eMZ6K5RLWhZyISBhwA   
1  BiTunyQ73aT9WBnpR9DZGw  7ATYjTIgM3jUlt4UM3IypQ  OyoGAe7OKpv6SyGZT5g77Q   
2  saUsX_uimxRlCVr67Z4Jig  YjUWPpI6HXG530lwP-fb2A  8g_iMtfSiwikVnbP2etR0A   
3  AqPFMleE6RsU23_auESxiA  kxX2SOes4o-D3ZQBkiMRfA  _7bHUi9Uuf5__HHc_Q8guQ   
4  Sx8TMOWLNuJBWer-0pcmoA  e4Vwtrqf-wpJfwesgvdgxQ  bcjbaE6dDog4jkNY91ncLQ   

   stars                                               text review_date  \
0    3.0  If you decide to eat here, just be aware it is...  2018-07-07   
1    5.0  I've taken a lot of spin classes over the year...  2012-01-03   
2    3.0  Family diner. Had the buffet. Eclectic assortm...  2014-02-05   
3    5.0  Wow!  Yummy, different,  delicious.   Our favo...  2015-01-04   
4    4.0  Cute interior and owner (?) gave us tour of up...  2017-01-14   

                        category_id  
0  49cfbe257fc2454107e5b4daf9f83e13  
1  0e6cd57

In [None]:
check_columns(dataframes)



🔍 business_cleaned - Columnas y tipos de datos:
business_id      object
name             object
address          object
city             object
state            object
postal_code      object
latitude        float64
longitude       float64
stars           float64
review_count      int64
is_open            bool
attributes       object
categories       object
hours            object
dtype: object
--------------------------------------------------

🔍 reviews_cleaned - Columnas y tipos de datos:
review_id       object
user_id         object
business_id     object
stars          float64
useful         float64
funny          float64
cool           float64
text            object
date            object
dtype: object
--------------------------------------------------

🔍 review_cleaned - Columnas y tipos de datos:
review_id       object
user_id         object
business_id     object
stars          float64
useful         float64
funny          float64
cool           float64
text            object

In [None]:
# Exportar
client = storage.Client()
bucket = client.bucket(bucket_name)
plot_and_export(transformed_dataframes, output_path, bucket)

☁️ Archivo subido a GCS: ETL/dim_user.csv
☁️ Archivo subido a GCS: ETL/fact_reviews.csv
☁️ Archivo subido a GCS: ETL/dim_business.csv
☁️ Archivo subido a GCS: ETL/dim_city.csv
☁️ Archivo subido a GCS: ETL/dim_category.csv


In [None]:
from google.cloud import bigquery


# Configurar credenciales
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "proyectofinalgogleyelp-41e96ec7a40a.json"

# Definir Proyecto y Dataset en BigQuery
PROJECT_ID = "proyectofinalgogleyelp"
DATASET_ID = "proyecto_dw"

# Inicializar Cliente de BigQuery
client = bigquery.Client(project=PROJECT_ID)

# Definir Esquemas de las Tablas
SCHEMAS = {
    "dim_category": [
        bigquery.SchemaField("category_id", "STRING"),
        bigquery.SchemaField("category", "STRING"),
    ],
    "dim_city": [
        bigquery.SchemaField("city_id", "STRING"),
        bigquery.SchemaField("city", "STRING"),
    ],
    "dim_business": [
        bigquery.SchemaField("business_id", "STRING"),
        bigquery.SchemaField("business_name", "STRING"),
        bigquery.SchemaField("address", "STRING"),
        bigquery.SchemaField("city_id", "STRING"),
        bigquery.SchemaField("category_id", "STRING"),
        bigquery.SchemaField("latitude", "FLOAT64"),
        bigquery.SchemaField("longitude", "FLOAT64"),
        bigquery.SchemaField("review_count", "INT64"),
    ],
    "fact_reviews": [
        bigquery.SchemaField("review_id", "STRING"),
        bigquery.SchemaField("business_id", "STRING"),
        bigquery.SchemaField("user_id", "STRING"),
        bigquery.SchemaField("category_id", "STRING"),
        bigquery.SchemaField("review_date", "DATE"),
        bigquery.SchemaField("stars", "INT64"),
        bigquery.SchemaField("text", "STRING"),
    ],
    "dim_user": [
        bigquery.SchemaField("user_id", "STRING"),
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("review_count", "INT64"),
        bigquery.SchemaField("yelping_since", "DATE"),
    ]
}

# Función para subir DataFrames a BigQuery
def upload_to_bigquery(dataframes):
    for table_name, df in dataframes.items():
        table_id = f"{PROJECT_ID}.{DATASET_ID}.{table_name}"
        print(f"Subiendo {table_name} a {table_id}...")

        # Convertir fechas al formato DATE
        for field in SCHEMAS[table_name]:
            if field.field_type == "DATE" and field.name in df.columns:
                df[field.name] = pd.to_datetime(df[field.name]).dt.date  # Convertir a formato DATE

        # Configuración de carga
        job_config = bigquery.LoadJobConfig(
            schema=SCHEMAS[table_name],
            write_disposition="WRITE_TRUNCATE",
        )

        # Cargar DataFrame en BigQuery
        job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
        job.result()  # Esperar a que termine

        print(f"✅ {table_name} subida con éxito.")

# Suponiendo que `transformed_dataframes` ya está creado
upload_to_bigquery(transformed_dataframes)


NameError: name 'transformed_dataframes' is not defined

In [None]:
# Verificar que los DataFrames tienen las columnas correctas antes de subir
for table_name, schema_fields in SCHEMAS.items():
    if table_name in dataframes:
        expected_columns = {field.name for field in schema_fields}
        actual_columns = set(dataframes[table_name].columns)
        missing_columns = expected_columns - actual_columns
        if missing_columns:
            raise ValueError(f"La tabla {table_name} tiene columnas faltantes: {missing_columns}")
