# ETL

## Enviroment setup

To keep constant track of the files used in this Notebook, I will use my own Google Drive to store the datasets and models.

In [1]:
# Google Drive

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In addition, to enable other people to run this Notebook, I defined a dictionary of paths to maintain consistency throughout the Notebook.

In [None]:
# Paths

BASE_PATH = "/content/drive/MyDrive/TEC/AI/ProyectoBenji"

PATHS = {
  "RAW_DATA_PATH": f"{BASE_PATH}/data/raw",
  "PROCESSED_DATA_PATH": f"{BASE_PATH}/data/processed",
  "CLEAN_DATA_PATH": f"{BASE_PATH}/data/clean",
}

In [3]:
# Data manipulation
import pandas as pd
import numpy as np

# Natural Language Toolkit
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('punkt_tab', quiet=True)

# Kaggle
import kagglehub

# Utilities
import os
import json
from datetime import datetime, timedelta
from pathlib import Path

# PyArrow for efficient Parquet writing
import pyarrow as pa
import pyarrow.parquet as pq

## Data collection & exploration

The Yelp Academic Dataset comprises 6,990,280 reviews distributed across multiple JSON files, totaling 8.65 GB. For this analysis, I focus exclusively on the yelp_academic_dataset_review.json file (5.09 GB), which contains the core review data including text content, star ratings, timestamps, and user engagement metrics.

This file provides all necessary features for sentiment analysis, eliminating the need for additional business or user metadata files.

In [4]:
#@title YelpDataLoader

class YelpDataLoader:
  def __init__(self):
    self.kaggle_id = "yelp-dataset/yelp-dataset"
    self.kaggle_path = None

  def _load_dataframe_if_exists(self, output_path):
    if os.path.exists(output_path):
      print(f"\nAlready exists: {output_path}")
      df = pd.read_parquet(output_path)
      print(f"\nLoaded existing parquet.")
      return df
    else:
      return None

  def _write_parquet(self, df, output_path):
      output_dir = Path(output_path).parent
      output_dir.mkdir(parents=True, exist_ok=True)
      df.to_parquet(output_path, index=False)
      print(f"\nParquet saved: {output_path}")

  def _download_dataset(self):
    print("\nDownloading... \n")
    path = kagglehub.dataset_download(self.kaggle_id)
    print("\nPath to dataset files:", path)
    self.kaggle_path = path
    return path

  def load_yelp_dataset(self, output_path, chunk_size=100000):
    yelp_df = self._load_dataframe_if_exists(output_path)

    if yelp_df is not None:
      return yelp_df

    print(f"\nParquet not found: {output_path}")
    self.kaggle_path = self._download_dataset()

    reviews_file = os.path.join(self.kaggle_path, "yelp_academic_dataset_review.json")

    print(f"\nAttempting to read JSON from: {reviews_file}")
    print(f"Loading in chunks of {chunk_size:,} records...")

    # Escribir directamente a parquet sin mantener todo en memoria
    output_dir = Path(output_path).parent
    output_dir.mkdir(parents=True, exist_ok=True)

    first_chunk = True
    chunk_number = 0

    for chunk in pd.read_json(reviews_file, lines=True, chunksize=chunk_size):
      chunk_number += 1
      print(f"  Processing chunk {chunk_number}: {len(chunk):,} records")

      if first_chunk:
        chunk.to_parquet(output_path, index=False)
        first_chunk = False
      else:
        # Append mode
        existing_df = pd.read_parquet(output_path)
        combined_df = pd.concat([existing_df, chunk], ignore_index=True)
        combined_df.to_parquet(output_path, index=False)

    print(f"\nParquet saved: {output_path}")

    # Cargar el resultado final
    return pd.read_parquet(output_path)

  def explore_kaggle_dataset(self):
    if self.kaggle_path is None:
      print("\nKaggle dataset content not explored as it was loaded from existing parquet and not downloaded in this session.")
      return

    print(f"\nContent of: {self.kaggle_path}\n")
    total_size = 0

    for root, dirs, files in os.walk(self.kaggle_path):
      for file in files:
        file_path = os.path.join(root, file)
        size = os.path.getsize(file_path)
        total_size += size
        size_mb = size / (1024 * 1024)
        print(f"  {file}")
        print(f"     Size: {size_mb:.2f} MB")

    total_gb = total_size / (1024 * 1024 * 1024)
    print(f"\nTotal size: {total_gb:.2f} GB ({total_size / (1024 * 1024):.2f} MB)")

  def explore_reviews_dataset(self, df):
    print("\nInfo of Yelp Reviews:")
    print(df.dtypes)
    print("\n")
    print(df.info())

    print(f"\nTotal reviews: {len(df):,}")

    print("\nSample reviews:")
    print(df.head(5).to_string())

    print("\nStars distribution:")
    stars_dist = df['stars'].value_counts().sort_index()
    print(stars_dist)

  def sample_dataset(self, df, output_path, fraction=0.2, seed=42):
    df_sample = self._load_dataframe_if_exists(output_path)

    if df_sample is not None:
      return df_sample

    print(f"\nParquet not found: {output_path}")

    print(f"\nSampling dataset using {fraction * 100}% of reviews...")
    df_sample = df.sample(frac=fraction, random_state=seed)
    sample_reviews = len(df_sample)
    print(f"\nSample dataset created: {sample_reviews:,} reviews")

    print(f"\nCreating parquet file...")
    self._write_parquet(df_sample, output_path)

    return df_sample

### Load dataset

In [5]:
yelp_data_loader = YelpDataLoader()

parquet_path = f"{PATHS["RAW_DATA_PATH"]}/yelp_reviews_raw.parquet"

df_reviews = yelp_data_loader.load_yelp_dataset(parquet_path)


Already exists: /content/drive/MyDrive/TEC/AI/ProyectoBenji/data/raw/yelp_reviews_raw.parquet

Loaded existing parquet.


In [6]:
yelp_data_loader.explore_kaggle_dataset()


Kaggle dataset content not explored as it was loaded from existing parquet and not downloaded in this session.


In [7]:
# Explore dataset

yelp_data_loader.explore_reviews_dataset(df_reviews)


Info of Yelp Reviews:
review_id              object
user_id                object
business_id            object
stars                   int64
useful                  int64
funny                   int64
cool                    int64
text                   object
date           datetime64[ns]
dtype: object


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3300000 entries, 0 to 3299999
Data columns (total 9 columns):
 #   Column       Dtype         
---  ------       -----         
 0   review_id    object        
 1   user_id      object        
 2   business_id  object        
 3   stars        int64         
 4   useful       int64         
 5   funny        int64         
 6   cool         int64         
 7   text         object        
 8   date         datetime64[ns]
dtypes: datetime64[ns](1), int64(4), object(4)
memory usage: 226.6+ MB
None

Total reviews: 3,300,000

Sample reviews:
                review_id                 user_id             business_id  stars  useful  funny  

### Sample dataset

In [8]:
# Sample dataset

# Parquet path
parquet_path = f"{PATHS['PROCESSED_DATA_PATH']}/yelp_reviews_sentiment.parquet"

df_sample = yelp_data_loader.sample_dataset(df_reviews, parquet_path)


Already exists: /content/drive/MyDrive/TEC/AI/ProyectoBenji/data/processed/yelp_reviews_sentiment.parquet

Loaded existing parquet.


## Data Cleaning

1. **Feature selection**

To reduce memory usage, I will select only the features I'll need (text, sentiment).

2. **Deduplication**

Remove duplicate reviews if any.

3. **Create sentiment column**

As the dataset originally works with stars. I need to create another variable to categorize the dataset in less categories:

- **Negative:** Reviews with 1-2 stars
- **Neutral:** Reviews with 3 stars  
- **Positive:** Reviews with 4-5 stars

This grouping is standard practice in sentiment analysis and reflects how consumers interpret ratings in real-world scenarios.

4. **Dataset balancing**

Yelp reviews are naturally imbalanced (~67% positive, ~23% negative, ~10% neutral). An unbalanced model learns to predict the majority class to maximize accuracy without actually learning sentiment patterns.

Balanced sampling (equal examples per class) forces the model to learn discriminative features for each sentiment rather than exploiting class distribution.

5. **Text cleaning** Normalize and extract text features

In this step I addded some feature for future analysis.

Regarding cleaning, I normalized sequences so that sequences such as “good” and “Good” would be treated as equal. Also, I removed punctuation marks that only add noise and make analysis more difficult.

6. **Tokenization and stop words removal**

Splited the text into individual words and removed irrelevant words.

This is important beaucse ML models can't process text directly; they need numerical inputs. Tokenization is the first step to convert text into numbers.

In [9]:
#@title YelpCleaningPipeline

class YelpCleaningPipeline:
  def __init__(self):
    self.stop_words = set(stopwords.words('english'))

  def _load_dataframe_if_exists(self, output_path):
    if os.path.exists(output_path):
      print(f"\nAlready exists: {output_path}")
      df = pd.read_parquet(output_path)
      print(f"\nLoaded existing parquet.")
      return df
    else:
      return None

  def _write_parquet(self, df, output_path):
      output_dir = Path(output_path).parent
      output_dir.mkdir(parents=True, exist_ok=True)
      df.to_parquet(output_path, index=False)
      print(f"\nParquet saved: {output_path}")

  def select_features(self, df, features):
    print(f"\nSelecting {features} features...")
    return df[features].copy()

  def drop_duplicates(self, df):
    print(f"\nDropping duplicates...")
    return df.drop_duplicates()

  def create_sentiment_column(self, df):
    print("\nCreating sentiment feature...")

    def classify_sentiment(stars):
        if stars in [1.0, 2.0]:
            return 'negative'
        elif stars == 3.0:
            return 'neutral'
        elif stars in [4.0, 5.0]:
            return 'positive'
        else:
            return None

    df_sentiment = df.copy()
    df_sentiment['sentiment'] = df_sentiment['stars'].apply(classify_sentiment)

    print("\nSentiment distribution:")
    total_reviews = len(df_sentiment)
    sentiment_counts = df_sentiment['sentiment'].value_counts()
    for sentiment, count in sentiment_counts.items():
        percentage = (count / total_reviews) * 100
        print(f"   {sentiment}: {count:,} ({percentage:.1f}%)")

    return df_sentiment[['text', 'sentiment']]

  def balance_dataset(self, df):
    sentiment_counts = df['sentiment'].value_counts()
    min_count = sentiment_counts.min()

    print(f"\nBalancing dataset: {len(df):,} reviews...")

    df_negative = df[df['sentiment'] == 'negative'].head(min_count)
    df_neutral = df[df['sentiment'] == 'neutral'].head(min_count)
    df_positive = df[df['sentiment'] == 'positive'].head(min_count)

    df_balanced = pd.concat([df_negative, df_neutral, df_positive], ignore_index=True)

    print(f"\nBalanced dataset: {len(df_balanced):,} reviews")

    return df_balanced

  def clean_text(self, df):
    print("\nCleaning text...")

    df_clean = df.copy()

    df_clean['text_length'] = df_clean['text'].str.len()
    df_clean['word_count'] = df_clean['text'].str.split().str.len()
    df_clean['text_clean'] = df_clean['text'].str.lower().str.replace(r'[^a-zA-Z0-9\s]', '', regex=True)

    print("\nText cleaned (sample rows):")
    print(df_clean[['text', 'text_clean', 'sentiment']].head(3).to_string())

    return df_clean

  def tokenize_text(self, df):
    print("\nTokenizing and removing stop words...")

    df_final = df.copy()

    # Tokenize
    df_final['tokens'] = df_final['text_clean'].apply(lambda x: word_tokenize(x) if isinstance(x, str) else [])

    # Remove stop words
    df_final['tokens_filtered'] = df_final['tokens'].apply(
        lambda tokens: [word for word in tokens if word not in self.stop_words and len(word) > 0]
    )

    print("\nSample tokenized and filtered text:")
    print(df_final[['text_clean', 'tokens_filtered', 'sentiment']].head(5).to_string())

    return df_final

  def run_pipeline(self, df, output_path):

    df_input = self._load_dataframe_if_exists(output_path)

    if df_input is not None:
      return df_input

    # Select relevant features
    df_selected = self.select_features(df, ['text', 'stars'])
    df_duplicates = self.drop_duplicates(df_selected)
    df_sentiment = self.create_sentiment_column(df_duplicates)
    df_balanced = self.balance_dataset(df_sentiment)
    df_clean = self.clean_text(df_balanced)
    df_tokenized = self.tokenize_text(df_clean)

    self._write_parquet(df_tokenized, output_path)

    return df_tokenized

In [10]:
# Cleaning and feature engineering

pipeline = YelpCleaningPipeline()

parquet_path = f"{PATHS['CLEAN_DATA_PATH']}/yelp_reviews_cleaned.parquet"

df_cleaned = pipeline.run_pipeline(df_sample, parquet_path)


Already exists: /content/drive/MyDrive/TEC/AI/ProyectoBenji/data/clean/yelp_reviews_cleaned.parquet

Loaded existing parquet.
