# ETL

---

**Author:** Diego Antonio Garc√≠a Padilla

**Date:** Oct 29, 2025

## Enviroment setup

In [None]:
#@title Setup & Environment Verification

import warnings
warnings.filterwarnings('ignore')

import os
import sys

print("=== ENVIRONMENT CHECK ===")
print(f"Python: {sys.version.split()[0]}")
print(f"JAVA_HOME: {os.environ.get('JAVA_HOME')}")
print(f"SPARK_HOME: {os.environ.get('SPARK_HOME')}")
print(f"Driver Memory: {os.environ.get('SPARK_DRIVER_MEMORY')}")
print(f"Executor Memory: {os.environ.get('SPARK_EXECUTOR_MEMORY')}")
print("=" * 50)

=== ENVIRONMENT CHECK ===
Python: 3.10.12
JAVA_HOME: /usr/lib/jvm/java-8-openjdk-arm64/jre
SPARK_HOME: /opt/spark
Driver Memory: 8g
Executor Memory: 4g


In [2]:
#@title Import Libraries

# PySpark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer

# SciKit Learn
from sklearn.model_selection import train_test_split

# Data manipulation
import pandas as pd
import numpy as np

# Financial data
import yfinance as yf

# Hugging Face
from huggingface_hub import hf_hub_download

# Kaggle
import kagglehub

# Utilities
from datetime import datetime, timedelta
import json
import requests
import logging
from tqdm import tqdm
import time
import subprocess
from pathlib import Path

In [3]:
#@title Start Spark session

print("=== PRE-FLIGHT CHECK ===")

# Verify Java is available
try:
    java_version = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT)
    print("Java: ‚úÖ Available")
except Exception as e:
    print(f"Java: ‚ùå Not available - {e}")

print("=" * 50)

# üî• STOP any existing Spark sessions first
try:
    SparkContext.getOrCreate().stop()
    print("üßπ Cleaned up existing Spark session")
except:
    print("üÜï No existing session to clean")

print("=" * 50)

# Create fresh Spark session
spark = SparkSession.builder \
    .appName("Yelp_Sentiment_Analysis") \
    .master("local[8]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.shuffle.partitions", "100") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "1024m") \
    .config("spark.local.dir", "/tmp/spark") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print(f"\n‚úÖ Spark {spark.version} initialized successfully")
print(f"   Master: {spark.sparkContext.master}")
print(f"   App Name: {spark.sparkContext.appName}")
print(f"   Driver Memory: 8GB")
print(f"   Spark UI: http://localhost:4040")

=== PRE-FLIGHT CHECK ===
Java: ‚úÖ Available


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/05 16:17:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


üßπ Cleaned up existing Spark session

‚úÖ Spark 3.5.6 initialized successfully
   Master: local[8]
   App Name: Yelp_Sentiment_Analysis
   Driver Memory: 8GB
   Spark UI: http://localhost:4040


25/11/05 16:17:05 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


## Download data

### Amazon Reviews

In [4]:
#@title Download Yelp reviews dataset

yelp_path = kagglehub.dataset_download("yelp-dataset/yelp-dataset")

print("Path to dataset files:", yelp_path)

def explore_dataset(path):
    print(f"üìÅ Content of: {path}\n")
    total_size = 0
    
    for root, dirs, files in os.walk(path):
        for file in files:
            file_path = os.path.join(root, file)
            size = os.path.getsize(file_path)
            total_size += size
            size_mb = size / (1024 * 1024)
            print(f"  üìÑ {file}")
            print(f"     Size: {size_mb:.2f} MB")
    
    total_gb = total_size / (1024 * 1024 * 1024)
    print(f"\nüíæ Total size: {total_gb:.2f} GB ({total_size / (1024 * 1024):.2f} MB)")
    
    return total_size

explore_dataset(yelp_path)

Path to dataset files: /root/.cache/kagglehub/datasets/yelp-dataset/yelp-dataset/versions/4
üìÅ Content of: /root/.cache/kagglehub/datasets/yelp-dataset/yelp-dataset/versions/4

  üìÑ Dataset_User_Agreement.pdf
     Size: 0.08 MB
  üìÑ yelp_academic_dataset_business.json
     Size: 113.36 MB
  üìÑ yelp_academic_dataset_checkin.json
     Size: 273.67 MB
  üìÑ yelp_academic_dataset_review.json
     Size: 5094.40 MB
  üìÑ yelp_academic_dataset_tip.json
     Size: 172.24 MB
  üìÑ yelp_academic_dataset_user.json
     Size: 3207.52 MB

üíæ Total size: 8.65 GB (8861.26 MB)


9291705417

## Data Explotation

In [5]:
#@title Load Yelp Reviews as Spark dataset

# Parquet path
parquet_path = "../data/raw/yelp_reviews_raw.parquet"

if os.path.exists(parquet_path):
    print(f"‚úÖ Already exists: {parquet_path}")
    df_reviews = spark.read.parquet(parquet_path)
    print(f"üîÑ Loaded \n")
else:
    # Load the dataset
    reviews_file = os.path.join(yelp_path, "yelp_academic_dataset_review.json")

    # Read JSON file with Spark
    df_reviews = spark.read.json(reviews_file)

    # Show schema to understand structure
    print("üìã Schema of Yelp Reviews:")
    df_reviews.printSchema()

    # Basic statistics
    print(f"\nüìä Total reviews: {df_reviews.count():,}")

    # Show sample data
    print("\nüîç Sample reviews:")
    df_reviews.show(5, truncate=50)

    # Check stars distribution
    print("\n‚≠ê Stars distribution:")
    df_reviews.groupBy('stars').count().orderBy('stars').show()

    # Check text lengths

    print("\nüìù Text statistics:")
    df_reviews.select(
        F.avg(F.length(F.col('text'))).alias('avg_length'),
        F.min(F.length(F.col('text'))).alias('min_length'),
        F.max(F.length(F.col('text'))).alias('max_length')
    ).show()

    df_reviews.write.parquet(parquet_path, mode="overwrite")
    print(f"\nüíæ Parquet saved: {parquet_path}")

‚úÖ Already exists: ../data/raw/yelp_reviews_raw.parquet
üîÑ Loaded 



In [6]:
#@title Sample dataset

# Parquet path
parquet_path = "../data/filtered/yelp_reviews_sentiment.parquet"

if os.path.exists(parquet_path):
    print(f"‚úÖ Already exists: {parquet_path}")
    df_sentiment = spark.read.parquet(parquet_path)
    print(f"üîÑ Loaded")
else:
    #  Sample directly from the original dataframe with stars
    # 10% sample = ~700K reviews (still >1GB when processed with text)
    df_sample = df_reviews.sample(fraction=0.20, seed=42)

    # Create sentiment column
    df_sentiment = df_sample.select(
        F.col('review_id'),
        F.col('text'),
        F.col('stars'),
        F.col('useful'),
        F.col('date')
    ).withColumn('sentiment',
        F.when(F.col('stars').isin([1.0, 2.0]), 'negative')
        .when(F.col('stars') == 3.0, 'neutral')
        .when(F.col('stars').isin([4.0, 5.0]), 'positive')
    )

    # Single count operation
    total_reviews = df_sentiment.count()
    print(f"\n‚úÖ Sample dataset created: {total_reviews:,} reviews")

    # Get distribution (single pass)
    print("\nüéØ Sentiment distribution:")
    sentiment_counts = df_sentiment.groupBy('sentiment').count().collect()
    for row in sentiment_counts:
        percentage = (row['count'] / total_reviews) * 100
        print(f"   {row['sentiment']}: {row['count']:,} ({percentage:.1f}%)")

    # Show one sample per sentiment (lightweight)
    print("\nüìå Sample reviews:")
    for sent in ['negative', 'neutral', 'positive']:
        sample = df_sentiment.filter(F.col('sentiment') == sent).select('text', 'stars').first()
        if sample:
            print(f"\n{sent.upper()} ({sample['stars']} stars):")
            print(f"   {sample['text'][:150]}...")

    df_sentiment.write.parquet(parquet_path, mode="overwrite")
    print(f"\nüíæ Parquet saved: {parquet_path}")

‚úÖ Already exists: ../data/filtered/yelp_reviews_sentiment.parquet
üîÑ Loaded


## Data Cleaning

In [7]:
#@title Select relevant features and drop duplicates

df_filtered = df_sentiment.select("text", "sentiment") \
                .dropDuplicates()

df_filtered.show(10)

                                                                                

+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
|Not sure who dump...| negative|
|I unfortunately h...| negative|
|I selected Extrem...|  neutral|
|Here is pretty Zo...| positive|
|Shaun and the sta...| positive|
|Disclaimer. This ...| positive|
|After hearing abo...| positive|
|Called Blaisdell'...| positive|
|Terrible\nDirty\n...| negative|
|Ya'll tripping fo...| positive|
+--------------------+---------+
only showing top 10 rows



In [8]:
#@title Clean text

# Add text length
df_clean = df_filtered.withColumn('text_length', F.length(F.col('text')))

# Add word count
df_clean = df_clean.withColumn('word_count', 
    F.size(F.split(F.col('text'), ' ')))

# Clean text: lowercase, remove special characters
df_clean = df_clean.withColumn('text_clean',
    F.lower(F.regexp_replace(F.col('text'), '[^a-zA-Z0-9\\s]', ''))
)

print("\nüßπ Text cleaned:")
df_clean.select('text', 'text_clean', 'sentiment').show(3, truncate=80)


üßπ Text cleaned:




+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+---------+
|                                                                            text|                                                                      text_clean|sentiment|
+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+---------+
|Not sure who dumps out hot coffee 45 minutes prior to closing their doors. Al...|not sure who dumps out hot coffee 45 minutes prior to closing their doors als...| negative|
|I unfortunately have had this happen twice now but last week I ordered a part...|i unfortunately have had this happen twice now but last week i ordered a part...| negative|
|I selected Extreme Maids to do a move in/deep cleaning. Unfortunately, the mo...|i selected extreme maids to do a move indeep cle

                                                                                

In [9]:
#@title Tokenize text

# Parquet path
parquet_path = "../data/clean/yelp_reviews_tokenized.parquet"

# Tokenize text
tokenizer = Tokenizer(inputCol="text_clean", outputCol="tokens")
df_tokenized = tokenizer.transform(df_clean)

# Remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_filtered")
df_tokenized = remover.transform(df_tokenized)

print("\nüìù Sample tokenized text:")
df_tokenized.select('text_clean', 'tokens_filtered').show(5, truncate=80)

if not os.path.exists(parquet_path):
    df_tokenized.write.parquet(parquet_path, mode="overwrite")
    print(f"\nüíæ Parquet saved: {parquet_path}")


üìù Sample tokenized text:


                                                                                

+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|                                                                      text_clean|                                                                 tokens_filtered|
+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|not sure who dumps out hot coffee 45 minutes prior to closing their doors als...|[sure, dumps, hot, coffee, 45, minutes, prior, closing, doors, also, greeted,...|
|i unfortunately have had this happen twice now but last week i ordered a part...|[unfortunately, happen, twice, last, week, ordered, part, online, store, show...|
|i selected extreme maids to do a move indeep cleaning unfortunately the mover...|[selected, extreme, maids, move, indeep, cleaning, unfortunately, movers, arr...|
|here is pretty 




üíæ Parquet saved: ../data/clean/yelp_reviews_tokenized.parquet


                                                                                