In [38]:
import time

start_time = time.time()

In [39]:
!pip install --upgrade pyspark==3.4.1 spark-nlp==5.4.0




In [40]:
 #Spark NLP Initialization

try:
    import sparknlp
    spark = sparknlp.start()
    print("‚úÖ Spark NLP started successfully")

except ModuleNotFoundError as e:
    raise ModuleNotFoundError(
        "‚ùå Spark NLP not found.\n"
        "üëâ Run: pip install spark-nlp\n"
        "üëâ Restart runtime after install."
    ) from e


except RuntimeError as e:
    if "Spark Connect server and Spark master cannot be configured together" in str(e):
        raise RuntimeError(
            "‚ùå Spark Connect conflict detected.\n"
            "üëâ Do NOT call sparknlp.start() in Databricks.\n"
            "üëâ Use existing SparkSession instead."
        ) from e
    else:
        raise


except Exception as e:
    if "Java gateway process exited" in str(e):
        raise RuntimeError(
            "‚ùå Java/JVM failed to start.\n"
            "üëâ Ensure Java 8 or 11 is installed.\n"
            "üëâ Restart the runtime."
        ) from e


    elif "spark.jsl.settings.pretrained.cache_folder" in str(e):
        raise RuntimeError(
            "‚ùå Spark NLP cache configuration error.\n"
            "üëâ Do NOT set spark.jsl.settings.pretrained.cache_folder manually.\n"
            "üëâ Use environment variable SPARK_NLP_CACHE instead."
        ) from e


    elif "OutOfMemoryError" in str(e):
        raise RuntimeError(
            "‚ùå Out of Memory error.\n"
            "üëâ Reduce batch size or increase driver memory."
        ) from e

    else:
        raise RuntimeError(f"‚ùå Spark NLP init failed: {e}") from e


‚úÖ Spark NLP started successfully


In [41]:
from sparknlp.annotator import ClassifierDLModel
from pyspark.ml import Pipeline
from pyspark.sql.functions import col


In [42]:
# Imports

try:
    from sparknlp.base import DocumentAssembler
    from sparknlp.annotator import (
        Tokenizer,
        UniversalSentenceEncoder,
        SentimentDLModel
    )
    from pyspark.ml import Pipeline
    from pyspark.sql.functions import col, current_timestamp

    print("‚úÖ Imports successful")

except ModuleNotFoundError as e:
    if "sparknlp" in str(e):
        raise ModuleNotFoundError(
            "‚ùå Spark NLP not installed.\n"
            "üëâ Fix: pip install spark-nlp\n"
            "üëâ Restart runtime after installation."
        ) from e
    elif "pyspark" in str(e):
        raise ModuleNotFoundError(
            "‚ùå PySpark not installed.\n"
            "üëâ Fix: pip install pyspark\n"
            "üëâ Restart runtime."
        ) from e
    else:
        raise


except ImportError as e:
    if "UniversalSentenceEncoder" in str(e):
        raise ImportError(
            "‚ùå UniversalSentenceEncoder not available.\n"
            "üëâ Spark NLP version too old.\n"
            "üëâ Upgrade: pip install --upgrade spark-nlp"
        ) from e
    elif "SentimentDLModel" in str(e):
        raise ImportError(
            "‚ùå SentimentDLModel not found.\n"
            "üëâ Spark NLP ML annotators not available in this version.\n"
            "üëâ Upgrade Spark NLP."
        ) from e
    else:
        raise

except Exception as e:
    if "JavaPackage" in str(e):
        raise RuntimeError(
            "‚ùå Spark NLP JVM classes not loaded.\n"
            "üëâ Spark session not initialized correctly.\n"
            "üëâ Ensure sparknlp.start() ran successfully."
        ) from e
    else:
        raise RuntimeError(f"‚ùå Import failed: {e}") from e


‚úÖ Imports successful


In [43]:
df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("/content/sliver_layer__1_ (1).csv")
)

In [44]:
df.printSchema()


root
 |-- id: double (nullable = true)
 |-- text: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- username: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- reply_count: integer (nullable = true)
 |-- quote_count: integer (nullable = true)
 |-- impression_count: integer (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- source: string (nullable = true)
 |-- is_retweet: boolean (nullable = true)
 |-- is_reply: integer (nullable = true)
 |-- in_reply_to_user_id: integer (nullable = true)
 |-- conversation_id: double (nullable = true)
 |-- user_followers_count: integer (nullable = true)
 |-- user_following_count: integer (nullable = true)
 |-- user_verified: boolean (nullable = true)
 |-- user_location: string (nullable = true)
 |-- possibly_sensitive: boolean (nulla

In [45]:
from pyspark.sql.functions import col


try:
    if "cleaned_text" not in df.columns and "clean_text" in df.columns:
        df = df.withColumnRenamed("clean_text", "cleaned_text")

except AttributeError:
    raise TypeError("df is not a Spark DataFrame")



REQUIRED_COL = "cleaned_text"


def validate_input_data(df):

    if REQUIRED_COL not in df.columns:
        raise ValueError(f"Missing column: {REQUIRED_COL}")


    if df.rdd.isEmpty():
        raise ValueError(" DataFrame is empty")


    null_count = df.filter(col(REQUIRED_COL).isNull()).count()
    if null_count > 0:
        print(f" {null_count} null rows dropped")

    return df.filter(col(REQUIRED_COL).isNotNull())



try:
    df = validate_input_data(df)

except ValueError as e:
    raise ValueError(e)

except Exception:
    raise RuntimeError(" Spark job failed during validation")



try:
    df.select("cleaned_text").show(5, truncate=False)
except Exception:
    raise RuntimeError("‚ùå Unable to show DataFrame (Spark issue)")


+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|cleaned_text                                                                                                                                           |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|agent every development say quality throughout beautiful databreach                                                                                    |
|night respond red information last everything cve blakeerik                                                                                            |
|here grow gas enough analysis least by infosec cybersecurity mfa                                                                                       |
|product significant world talk term herself player half have decide environ

In [46]:
# ================================
# NLP Components
# ================================
try:
    document_assembler = DocumentAssembler() \
        .setInputCol("clean_text") \
        .setOutputCol("document")

    tokenizer = Tokenizer() \
        .setInputCols(["document"]) \
        .setOutputCol("token")

    embeddings = UniversalSentenceEncoder.pretrained(
        "tfhub_use", "en"
    ).setInputCols(["document"]) \
     .setOutputCol("embeddings")

    sentiment_model = SentimentDLModel.pretrained(
        "sentimentdl_use_twitter", "en"
    ).setInputCols(["embeddings"]) \
     .setOutputCol("sentiment")

    print("NLP components initialized")

except AttributeError:
    raise RuntimeError("Spark NLP not initialized or Spark session missing")

except ValueError:
    raise RuntimeError("Invalid input or output column configuration")

except Exception as e:
    if "Model not found" in str(e):
        raise RuntimeError("Pretrained model not available or download failed")
    elif "No such file or directory" in str(e):
        raise RuntimeError("Spark NLP cache or filesystem issue")
    elif "Java gateway process exited" in str(e):
        raise RuntimeError("JVM or Java version issue")
    else:
        raise RuntimeError("NLP component initialization failed")


tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]
sentimentdl_use_twitter download started this may take some time.
Approximate size to download 11.4 MB
[OK!]
NLP components initialized


In [47]:
df = df.withColumnRenamed("cleaned_text", "clean_text")


In [48]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    embeddings,
    sentiment_model
])


In [49]:
df.printSchema()


root
 |-- id: double (nullable = true)
 |-- text: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- username: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- reply_count: integer (nullable = true)
 |-- quote_count: integer (nullable = true)
 |-- impression_count: integer (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- source: string (nullable = true)
 |-- is_retweet: boolean (nullable = true)
 |-- is_reply: integer (nullable = true)
 |-- in_reply_to_user_id: integer (nullable = true)
 |-- conversation_id: double (nullable = true)
 |-- user_followers_count: integer (nullable = true)
 |-- user_following_count: integer (nullable = true)
 |-- user_verified: boolean (nullable = true)
 |-- user_location: string (nullable = true)
 |-- possibly_sensitive: boolean (nulla

In [50]:
document_assembler = DocumentAssembler() \
    .setInputCol("cleaned_text") \
    .setOutputCol("document")


In [51]:
nlp_cols = [
    "document",
    "token",
    "embeddings",
    "sentiment",
    "sentiment_label",
    "_prediction_timestamp"
]

base_df = df.drop(*[c for c in nlp_cols if c in df.columns])
base_df.columns


['id',
 'text',
 'created_at',
 'username',
 'user_id',
 'language',
 'retweet_count',
 'like_count',
 'reply_count',
 'quote_count',
 'impression_count',
 'hashtags',
 'mentions',
 'source',
 'is_retweet',
 'is_reply',
 'in_reply_to_user_id',
 'conversation_id',
 'user_followers_count',
 'user_following_count',
 'user_verified',
 'user_location',
 'possibly_sensitive',
 'ingestion_time',
 'clean_text',
 'is_reply_flag']

In [52]:
model = pipeline.fit(base_df)
prediction_df = model.transform(base_df)


In [53]:
# ================================
# Run Prediction
# ================================
try:
    model = pipeline.fit(base_df)
    prediction_df = model.transform(base_df)

    print("Sentiment prediction completed")

except ValueError:
    raise RuntimeError("Invalid pipeline configuration or input data")

except AttributeError:
    raise RuntimeError("Pipeline or DataFrame not initialized")

except Exception as e:
    if "Job aborted" in str(e):
        raise RuntimeError("Spark job failed during model execution")
    elif "OutOfMemoryError" in str(e):
        raise RuntimeError("Insufficient memory during prediction")
    elif "AnalysisException" in str(e):
        raise RuntimeError("Schema or column mismatch during prediction")
    else:
        raise RuntimeError("Prediction step failed")


Sentiment prediction completed


In [54]:
from pyspark.sql.functions import col, current_timestamp

# ================================
# Extract Sentiment Label (FIXED)
# ================================
try:
    df = (
        prediction_df
        .withColumn("sentiment_label", col("sentiment")[0]["result"])
        .withColumn("_prediction_timestamp", current_timestamp())
    )

    df.select(
        "clean_text",
        "sentiment_label"
    ).show(10, truncate=False)

    print("‚úÖ Sentiment label extracted successfully")

except AttributeError:
    raise RuntimeError("‚ùå Prediction DataFrame not available")

except IndexError:
    raise RuntimeError("‚ùå Sentiment result is empty or missing")

except Exception as e:
    if "UNRESOLVED_COLUMN" in str(e):
        raise RuntimeError("‚ùå Column name mismatch (check clean_text)")
    elif "Job aborted" in str(e):
        raise RuntimeError("‚ùå Spark job failed during sentiment extraction")
    else:
        raise RuntimeError(f"‚ùå Sentiment extraction failed: {e}")


+-------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|clean_text                                                                                                                                             |sentiment_label|
+-------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|agent every development say quality throughout beautiful databreach                                                                                    |positive       |
|night respond red information last everything cve blakeerik                                                                                            |positive       |
|here grow gas enough analysis least by infosec cybersecurity mfa                                                                                     

In [55]:
df.printSchema()

root
 |-- id: double (nullable = true)
 |-- text: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- username: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- reply_count: integer (nullable = true)
 |-- quote_count: integer (nullable = true)
 |-- impression_count: integer (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- source: string (nullable = true)
 |-- is_retweet: boolean (nullable = true)
 |-- is_reply: integer (nullable = true)
 |-- in_reply_to_user_id: integer (nullable = true)
 |-- conversation_id: double (nullable = true)
 |-- user_followers_count: integer (nullable = true)
 |-- user_following_count: integer (nullable = true)
 |-- user_verified: boolean (nullable = true)
 |-- user_location: string (nullable = true)
 |-- possibly_sensitive: boolean (nulla

In [56]:
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import (
    Tokenizer,
    UniversalSentenceEncoder,
    ClassifierDLModel
)
from pyspark.ml import Pipeline
from pyspark.sql.functions import col


In [57]:
document_assembler = (
    DocumentAssembler()
    .setInputCol("clean_text")
    .setOutputCol("document")
)


In [58]:
tokenizer = (
    Tokenizer()
    .setInputCols(["document"])
    .setOutputCol("token")
)


In [59]:
embeddings = (
    UniversalSentenceEncoder.pretrained(
        "tfhub_use",
        "en"
    )
    .setInputCols(["document"])
    .setOutputCol("sentence_embeddings")
)


tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]


In [60]:
emotion_model = (
    ClassifierDLModel.pretrained(
        "classifierdl_use_emotion",
        "en"
    )
    .setInputCols(["sentence_embeddings"])
    .setOutputCol("emotion")
)


classifierdl_use_emotion download started this may take some time.
Approximate size to download 21.3 MB
[OK!]


In [61]:
emotion_pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    embeddings,
    emotion_model
])


In [62]:
emotion_df = emotion_pipeline.fit(df).transform(df)


In [63]:
from pyspark.sql.functions import col

emotion_df = emotion_df.withColumn(
    "emotion_label",
    col("emotion")[0]["result"]
)


In [64]:
emotion_df = emotion_df.withColumn(
    "emotion_confidence",
    col("emotion")[0]["metadata"].getItem("confidence").cast("double")
)


In [65]:
emotion_df.printSchema()


root
 |-- id: double (nullable = true)
 |-- text: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- username: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- reply_count: integer (nullable = true)
 |-- quote_count: integer (nullable = true)
 |-- impression_count: integer (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- source: string (nullable = true)
 |-- is_retweet: boolean (nullable = true)
 |-- is_reply: integer (nullable = true)
 |-- in_reply_to_user_id: integer (nullable = true)
 |-- conversation_id: double (nullable = true)
 |-- user_followers_count: integer (nullable = true)
 |-- user_following_count: integer (nullable = true)
 |-- user_verified: boolean (nullable = true)
 |-- user_location: string (nullable = true)
 |-- possibly_sensitive: boolean (nulla

In [66]:
emotion_df.select(
    "clean_text",
    "emotion_label",
    "emotion_confidence"
).show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------+
|clean_text                                                                                                                                                     |emotion_label|emotion_confidence|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------+
|agent every development say quality throughout beautiful databreach                                                                                            |joy          |null              |
|night respond red information last everything cve blakeerik                                                                                                    |fear         |null              |
|here grow gas enough ana

In [68]:
# ================================
# Prepare CSV Output (Sentiment + Emotion)
# ================================
output_df = emotion_df.select(
    "id",
    "text",
    "clean_text",
    "sentiment_label",
    "emotion_label",
    "emotion_confidence",
    "_prediction_timestamp"
)


In [69]:
# ================================
# Write ALL CSV files
# ================================
try:
    output_df \
        .write \
        .mode("overwrite") \
        .option("header", "true") \
        .csv("/content/final_ml_predictions")

    print("All CSV files written successfully")

except Exception as e:
    raise RuntimeError(f"CSV write failed: {e}")


All CSV files written successfully


In [70]:
output_path = "/content/final_ml_predictions"




In [71]:
import os
import shutil

final_csv_path = "/content/final_ml_predictions.csv"

# Find the part file
for file in os.listdir(output_path):
    if file.startswith("part-") and file.endswith(".csv"):
        shutil.move(
            os.path.join(output_path, file),
            final_csv_path
        )

print("Single CSV created:", final_csv_path)


Single CSV created: /content/final_ml_predictions.csv


In [72]:
from google.colab import files
files.download(final_csv_path)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [73]:
end_time = time.time()

print(f"Total runtime: {round(end_time - start_time, 2)} seconds")

Total runtime: 2065.11 seconds


In [74]:
#validation
df.count()

503456