In [1]:
!pip install prefect==2.12.1 transformers pyspark kagglehub openpyxl griffe==0.39.1 email-validator -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.1/56.1 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.8/154.8 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m41.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m11.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m80.9/80.9 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m50.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.6/3.6 MB[0m [31m74.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.2/98.2 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0

In [2]:
import os
import kagglehub
import pandas as pd
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import concat_ws, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from transformers import pipeline
from prefect import flow, task, get_run_logger

# -------------------------
# Constants / Paths
# -------------------------
DATA_LAKE_TRAIN = "/content/data_lake/train"
DATA_LAKE_TEST = "/content/data_lake/test"
DATA_LAKE_TRAIN_SENTIMENT = "/content/data_lake/train_with_sentiment_sample"
SENTIMENT_EXCEL = "/content/data_lake/train_with_sentiment_sample.xlsx"


In [3]:
from google.colab import files
files.upload()  # upload your kaggle.json

!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


Saving kaggle.json to kaggle.json


In [4]:
# Spark session
def get_spark_session():
    return SparkSession.builder \
        .appName("AmazonReviewsPipeline") \
        .config("spark.sql.shuffle.partitions", "4") \
        .getOrCreate()

spark = get_spark_session()


In [5]:
@task(retries=3, retry_delay_seconds=30)
def download_dataset():
    logger = get_run_logger()
    logger.info("Downloading dataset from Kaggle...")
    path = os.path.expanduser("~/amazon_reviews_dataset")
    os.makedirs(path, exist_ok=True)

    dataset_path = kagglehub.dataset_download("kritanjalijain/amazon-reviews")
    logger.info(f"Dataset downloaded at {dataset_path}")
    return dataset_path


In [6]:
@task
def load_and_clean_data(dataset_path):
    logger = get_run_logger()
    logger.info("Loading and cleaning data...")

    train_file = f"{dataset_path}/train.csv"
    test_file = f"{dataset_path}/test.csv"

    columns = ['label', 'title', 'review']

    # Load train and test
    train_sdf = spark.read.csv(train_file, header=False, sep=",", quote='"', escape='\\',
                               multiLine=True, inferSchema=True).toDF(*columns)
    test_sdf = spark.read.csv(test_file, header=False, sep=",", quote='"', escape='\\',
                              multiLine=True, inferSchema=True).toDF(*columns)

    # Quick checks
    train_sdf.show(5, truncate=100)
    train_sdf.printSchema()
    train_sdf.groupBy('label').count().show()


    # Combine title + review and clean
    train_sdf = train_sdf.withColumn("full_review", concat_ws(" ", train_sdf.title, train_sdf.review))
    train_sdf = train_sdf.withColumn("full_review", lower(regexp_replace("full_review", "[^a-zA-Z ]", " ")))

    test_sdf = test_sdf.withColumn("full_review", concat_ws(" ", test_sdf.title, test_sdf.review))
    test_sdf = test_sdf.withColumn("full_review", lower(regexp_replace("full_review", "[^a-zA-Z ]", " ")))


    print("Train rows:", train_sdf.count())
    train_sdf.printSchema()

    # Clear Spark cache
    spark.catalog.clearCache()


    # Save cleaned data
    os.makedirs(DATA_LAKE_TRAIN, exist_ok=True)
    os.makedirs(DATA_LAKE_TEST, exist_ok=True)


    train_sdf.write.mode('overwrite').parquet(DATA_LAKE_TRAIN)
    test_sdf.write.mode('overwrite').parquet(DATA_LAKE_TEST)

    # Explicitly read back the data to ensure it's written and visible
    spark.read.parquet(DATA_LAKE_TRAIN).count()
    spark.read.parquet(DATA_LAKE_TEST).count()

    import time
    time.sleep(3)  # give Spark time to flush writes

    print("Cleaned Parquet files saved.")

    logger.info("Data cleaned and saved to parquet.")

    tokenizer = Tokenizer(inputCol="full_review", outputCol="words")
    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

    train_sdf = tokenizer.transform(train_sdf)
    train_sdf = remover.transform(train_sdf)

    test_sdf = tokenizer.transform(test_sdf)
    test_sdf = remover.transform(test_sdf)

    train_sdf.write.mode('overwrite').parquet(DATA_LAKE_TRAIN)
    test_sdf.write.mode('overwrite').parquet(DATA_LAKE_TEST)

    logger.info("Feature engineering completed.")


    return DATA_LAKE_TRAIN, DATA_LAKE_TEST

In [8]:
@task
def run_sentiment_analysis(train_path, sample_size=3000):
    logger = get_run_logger()
    logger.info("Running sentiment analysis...")

    train_sdf = spark.read.parquet(train_path)

    # Hugging Face pipeline
    sentiment_model = pipeline(
        "sentiment-analysis",
        model="distilbert-base-uncased-finetuned-sst-2-english",
        device=0  # GPU
    )

    sample_df = train_sdf.limit(sample_size)
    reviews = [row.full_review for row in sample_df.collect()]

    batch_size = 32
    results = []
    for i in range(0, len(reviews), batch_size):
        batch = reviews[i:i+batch_size]
        results.extend(sentiment_model(batch))

    rows = [Row(full_review=r, hf_sentiment=res['label']) for r, res in zip(reviews, results)]
    train_sdf_sentiment = spark.createDataFrame(rows)

    # Save parquet
    train_sdf_sentiment.write.mode('overwrite').parquet(DATA_LAKE_TRAIN_SENTIMENT)

    # Save Excel
    pd_df = train_sdf_sentiment.toPandas()
    pd_df.to_excel(SENTIMENT_EXCEL, index=False)
    logger.info(f"✅ Sentiment Excel saved at {SENTIMENT_EXCEL}")

    return DATA_LAKE_TRAIN_SENTIMENT, SENTIMENT_EXCEL


In [9]:
@flow(name="Amazon Reviews Pipeline")
def amazon_reviews_pipeline():
    dataset_path = download_dataset()
    train_path, test_path = load_and_clean_data(dataset_path)
    sentiment_parquet, sentiment_excel = run_sentiment_analysis(train_path)
    return sentiment_parquet, sentiment_excel


In [10]:
sentiment_parquet, sentiment_excel = amazon_reviews_pipeline()
print(f"Pipeline completed! Parquet: {sentiment_parquet}, Excel: {sentiment_excel}")


  next(self.gen)
  next(self.gen)


Using Colab cache for faster access to the 'amazon-reviews' dataset.


+-----+------------------------------------------------------+----------------------------------------------------------------------------------------------------+
|label|                                                 title|                                                                                              review|
+-----+------------------------------------------------------+----------------------------------------------------------------------------------------------------+
|    2|                        Stuning even for the non-gamer|This sound track was beautiful! It paints the senery in your mind so well I would recomend it eve...|
|    2|                 The best soundtrack ever to anything.|I'm reading a lot of reviews saying that this is the best 'game soundtrack' and I figured that I'...|
|    2|                                              Amazing!|"This soundtrack is my favorite music of all time, hands down. The intense sadness of ""Prisoners...|
|    2|         

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Device set to use cpu


Pipeline completed! Parquet: /content/data_lake/train_with_sentiment_sample, Excel: /content/data_lake/train_with_sentiment_sample.xlsx
