In [1]:
import time

notebook_start_time = time.perf_counter()

### Uninstalling default pyspark and spark connect.

In [2]:
!pip uninstall -y dataproc-spark-connect pyspark

Found existing installation: dataproc-spark-connect 1.0.1
Uninstalling dataproc-spark-connect-1.0.1:
  Successfully uninstalled dataproc-spark-connect-1.0.1
Found existing installation: pyspark 4.0.1
Uninstalling pyspark-4.0.1:
  Successfully uninstalled pyspark-4.0.1


### Installing the required Libraries

In [3]:
!pip install -q pyspark==3.4.1 spark-nlp==5.2.1 transformers torch

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.1/55.1 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m547.3/547.3 kB[0m [31m36.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


### Importing the needed Modules

In [5]:
import sparknlp
from pyspark.sql.functions import *
from pyspark.sql.types import *
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer, ViveknSentimentModel
from pyspark.ml import Pipeline
from transformers import pipeline as hf_pipeline
import torch, pandas as pd

### Starting the spark Application

In [6]:
spark = sparknlp.start()
spark

### Mounting the google Drive for data persistent

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# drive.flush_and_unmount()

### spark nlp Sentiment Analyzer

In [8]:
document = DocumentAssembler().setInputCol("cleaned_text").setOutputCol("document")
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
sentiment = ViveknSentimentModel.pretrained("sentiment_vivekn","en")\
    .setInputCols(["document","token"]).setOutputCol("sentiment")

pipeline = Pipeline(stages=[document, tokenizer, sentiment])

sentiment_vivekn download started this may take some time.
Approximate size to download 873.6 KB
[OK!]


### Hugging Face Emotion Detection Transformer

In [9]:
emotion_pipe = hf_pipeline(
    "text-classification",
    model="j-hartmann/emotion-english-distilroberta-base",
    top_k=1,
    device=0 if torch.cuda.is_available() else -1
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/329M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/329M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/294 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Device set to use cuda:0


### Validating the GPU and Device Usage

In [10]:
torch.cuda.is_available()

True

In [11]:
print(emotion_pipe.device)

cuda:0


### Set the dataset_is_batched either True or False

In [13]:
dataset_is_batched = False   # 🔁 switch here

### One time pipeline fit

In [14]:
if dataset_is_batched:
    fit_df = spark.read.parquet(
        "/content/drive/MyDrive/Sentimental_analysis/ml_source_batch"
    ).limit(5)
else:
    fit_df = spark.read.parquet(
        "/content/drive/MyDrive/Sentimental_analysis/ml_source_single"
    ).limit(5)

# Fit once
sentiment_model = pipeline.fit(fit_df)


### Set the Value for dataset_is_batched is either True or False based on your source dataset

In [15]:
# ---------------------------
# Config
# ---------------------------
BATCH_COUNT = 100
BATCH_SIZE = 32

SILVER_BATCH_PATH = "/content/drive/MyDrive/Sentimental_analysis/ml_source_batch"
SILVER_SINGLE_PATH = "/content/drive/MyDrive/Sentimental_analysis/ml_source_single"

GOLD_BATCH_PATH = "/content/drive/MyDrive/Sentimental_analysis/ml_output_batch"
GOLD_SINGLE_PATH = "/content/drive/MyDrive/Sentimental_analysis/ml_output_single"


def process_dataframe(df):
    """
    Applies sentiment (Spark NLP) + emotion (HF) inference
    Returns Spark DataFrame
    """

    # Spark NLP sentiment
    sent_df = sentiment_model.transform(df) \
        .withColumn("sentiment_label", col("sentiment")[0]["result"]) \
        .select("row_id", "cleaned_text", "created_date", "sentiment_label")

    # Move only required data to Pandas for GPU emotion model
    pdf = sent_df.select("row_id", "cleaned_text").toPandas()

    # HuggingFace emotion inference (GPU efficient)
    results = emotion_pipe(
        pdf["cleaned_text"].tolist(),
        batch_size=BATCH_SIZE,
        truncation=True
    )

    pdf["emotion_label"] = [r[0]["label"] for r in results]

    emotion_sdf = spark.createDataFrame(
        pdf[["row_id", "emotion_label"]]
    )

    # Join back to Spark DF
    final_df = sent_df.join(emotion_sdf, on="row_id", how="left")

    return final_df


In [16]:
if dataset_is_batched:
    for i in range(BATCH_COUNT):
        print(f"Processing batch {i}")
        start = time.perf_counter()

        df = spark.read.parquet(f"{SILVER_BATCH_PATH}/batch_{i}")

        final_df = process_dataframe(df)

        final_df.select(
            "row_id",
            "cleaned_text",
            "created_date",
            "sentiment_label",
            "emotion_label"
        ).write.mode("overwrite").parquet(
            f"{GOLD_BATCH_PATH}/batch_{i}.parquet"
        )

        end = time.perf_counter()
        print(f"Batch {i} completed in {end - start:.2f} seconds")
else:
    print("Processing single dataset")
    start = time.perf_counter()

    df = spark.read.parquet(SILVER_SINGLE_PATH)

    final_df = process_dataframe(df)

    final_df.select(
        "row_id",
        "cleaned_text",
        "created_date",
        "sentiment_label",
        "emotion_label"
    ).write.mode("overwrite").parquet(GOLD_SINGLE_PATH)

    end = time.perf_counter()
    print(f"Single dataset completed in {end - start:.2f} seconds")


Processing single dataset
Single dataset completed in 503.25 seconds


In [17]:
notebook_end_time = time.perf_counter()

In [18]:
total_notebook_processing_time = notebook_end_time - notebook_start_time
print(total_notebook_processing_time)

958.50956643
