In [69]:
!pip install pyspark==3.4.1
!pip install spark-nlp==5.2.3



In [70]:
# sentiment_llm_pyspark.py

from pyspark.sql import SparkSession
import pandas as pd
from transformers import pipeline
from sklearn.metrics import classification_report, accuracy_score
import warnings

# Ignore warnings from Hugging Face
warnings.filterwarnings("ignore")

# 1. Create sample data
data = {
    "sentence": [
        "I love this product!", "This is the worst service ever.", "Absolutely fantastic experience.",
        "I'm not happy with the results.", "The movie was okay, not great.", "What a wonderful surprise!",
        "I would not recommend this.", "Such a delightful day.", "Terrible customer support.",
        "This phone is amazing!", "Very disappointing performance.", "I'm so excited about this!",
        "Could be better.", "Totally satisfied with my purchase.", "I hate how this works.",
        "It exceeded my expectations!", "Nothing special about it.", "I'm impressed by the quality.",
        "Worst purchase I've made.", "A pretty decent option."
    ],
    "label": [
        "positive", "negative", "positive", "negative", "neutral", "positive", "negative",
        "positive", "negative", "positive", "negative", "positive", "neutral", "positive",
        "negative", "positive", "neutral", "positive", "negative", "neutral"
    ]
}

# 2. Start Spark session
spark = SparkSession.builder.appName("LLM Sentiment Evaluation").getOrCreate()

# 3. Convert data to Spark DataFrame
df_pd = pd.DataFrame(data)
df_spark = spark.createDataFrame(df_pd)

# 4. Convert Spark → Pandas for inference
df = df_spark.toPandas()

# 5. Load Hugging Face sentiment analysis model
classifier = pipeline("sentiment-analysis")  # Defaults to distilbert-base-uncased-finetuned-sst-2-english

# 6. Run predictions
def map_prediction(pred):
    label = pred['label'].lower()
    if label == 'positive':
        return 'positive'
    elif label == 'negative':
        return 'negative'
    else:
        return 'neutral'

df['predicted'] = df['sentence'].apply(lambda x: map_prediction(classifier(x)[0]))

# 7. Evaluate results
print("\nClassification Report:")
print(classification_report(df['label'], df['predicted'], digits=3))

print("\nAccuracy Score:", accuracy_score(df['label'], df['predicted']))

# 8. Convert back to Spark for further processing if needed
df_result_spark = spark.createDataFrame(df)
df_result_spark.show(truncate=False)

# 9. Stop Spark
spark.stop()

No model was supplied, defaulted to distilbert/distilbert-base-uncased-finetuned-sst-2-english and revision 714eb0f (https://huggingface.co/distilbert/distilbert-base-uncased-finetuned-sst-2-english).
Using a pipeline without specifying a model name and revision in production is not recommended.
Device set to use cpu



Classification Report:
              precision    recall  f1-score   support

    negative      0.700     1.000     0.824         7
     neutral      0.000     0.000     0.000         4
    positive      0.900     1.000     0.947         9

    accuracy                          0.800        20
   macro avg      0.533     0.667     0.590        20
weighted avg      0.650     0.800     0.715        20


Accuracy Score: 0.8
+-----------------------------------+--------+---------+
|sentence                           |label   |predicted|
+-----------------------------------+--------+---------+
|I love this product!               |positive|positive |
|This is the worst service ever.    |negative|negative |
|Absolutely fantastic experience.   |positive|positive |
|I'm not happy with the results.    |negative|negative |
|The movie was okay, not great.     |neutral |negative |
|What a wonderful surprise!         |positive|positive |
|I would not recommend this.        |negative|negative |
|Suc

In [71]:
from transformers import pipeline
generator = pipeline("text-generation", model="gpt2")

Device set to use cpu


In [72]:
prompt = "Artificial intelligence is transforming"

In [None]:
output = generator(prompt, max_length=50, num_return_sequences=1)
print("Step 2:Generated Text:\n", output[0]['generated_text'])

Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Both `max_new_tokens` (=256) and `max_length`(=50) seem to have been set. `max_new_tokens` will take precedence. Please refer to the documentation for more information. (https://huggingface.co/docs/transformers/main/en/main_classes/text_generation)


In [None]:
from transformers import AutoTokenizer, AutoModel
import torch
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModel.from_pretrained("gpt2")


In [None]:
# Tokenize the prompt
tokens = tokenizer(prompt, return_tensors="pt")
print("\nToken IDs:\n", tokens['input_ids'][0].tolist())

In [None]:
from transformers import AutoTokenizer, AutoModel
import torch
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModel.from_pretrained("gpt2")

In [None]:
# Tokenize the prompt
tokens = tokenizer(prompt, return_tensors="pt")
print("\nToken IDs:\n", tokens['input_ids'][0].tolist())

In [None]:
with torch.no_grad():
    embeddings = model(**tokens).last_hidden_state
print("\nEmbeddings Shape:\n", embeddings.shape)


In [None]:
from IPython.display import Markdown
Markdown("""
### 🧠 Discussion Points

- **Tokenization**: Converts text into subword units and token IDs.
- **Embeddings**: Token IDs are mapped to dense vectors capturing meaning.
- **Transformer Layers**: Use attention to understand relationships between tokens.
- **Text Generation**: Predicts next words based on context and learned patterns.
""")


In [None]:
!pip install transformers torch scikit-learn


In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [None]:
model_name = "cardiffnlp/twitter-roberta-base-sentiment"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)


In [None]:

from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Load RoBERTa model fine-tuned for sentiment analysis
model_name = "cardiffnlp/twitter-roberta-base-sentiment"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)

# Create sentiment analysis pipeline
sentiment_pipeline = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)


In [None]:
test_sentences = [
    "I absolutely love this!",
    "This is so frustrating and annoying.",
    "What a beautiful day!",
    "I can't stand this anymore.",
    "Totally worth it!",
    "Worst experience ever.",
    "I'm really happy with the results.",
    "This is not what I expected.",
    "Amazing job!",
    "Terrible service."
]

# True labels based on manual annotation
true_labels = [
    "positive", "negative", "positive", "negative", "positive",
    "negative", "positive", "negative", "positive", "negative"]


In [None]:
label_mapping = {'label_0': 'negative', 'label_1': 'neutral', 'label_2': 'positive'}
predicted_labels = [label_mapping[sentiment_pipeline(text)[0]['label'].lower()] for text in test_sentences]
print(predicted_labels)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Evaluate using multiclass metrics
accuracy = accuracy_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels, average='macro')
recall = recall_score(true_labels, predicted_labels, average='macro')
f1 = f1_score(true_labels, predicted_labels, average='macro')

print("Evaluation Metrics:")
print(f"Accuracy:  {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall:    {recall:.2f}")
print(f"F1 Score:  {f1:.2f}")

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
from transformers import pipeline
from sklearn.metrics import classification_report, accuracy_score
import warnings
warnings.filterwarnings("ignore")

In [None]:
data = {
    "sentence": [
        "I love this product!", "This is the worst service ever.", "Absolutely fantastic experience.",
        "I'm not happy with the results.", "The movie was okay, not great.", "What a wonderful surprise!",
        "I would not recommend this.", "Such a delightful day.", "Terrible customer support.",
        "This phone is amazing!", "Very disappointing performance.", "I'm so excited about this!",
        "Could be better.", "Totally satisfied with my purchase.", "I hate how this works.",
        "It exceeded my expectations!", "Nothing special about it.", "I'm impressed by the quality.",
        "Worst purchase I've made.", "A pretty decent option."
    ],
    "label": [
        "positive", "negative", "positive", "negative", "neutral", "positive", "negative",
        "positive", "negative", "positive", "negative", "positive", "neutral", "positive",
        "negative", "positive", "neutral", "positive", "negative", "neutral"
    ]
}

In [None]:
spark = SparkSession.builder.appName("LLM Sentiment Evaluation").getOrCreate()


In [None]:
df_pd = pd.DataFrame(data)
df_spark = spark.createDataFrame(df_pd)

In [None]:
df = df_spark.toPandas()

In [None]:
classifier = pipeline("sentiment-analysis")  # Defaults to distilbert-base-uncased-finetuned-sst-2-english


In [None]:
def map_prediction(pred):
    label = pred['label'].lower()
    if label == 'positive':
        return 'positive'
    elif label == 'negative':
        return 'negative'
    else:
        return 'neutral'

df['predicted'] = df['sentence'].apply(lambda x: map_prediction(classifier(x)[0]))


In [None]:
print("\nClassification Report:")
print(classification_report(df['label'], df['predicted'], digits=3))

print("\nAccuracy Score:", accuracy_score(df['label'], df['predicted']))


In [None]:
df_result_spark = spark.createDataFrame(df)
df_result_spark.show(truncate=False)


In [None]:
spark.stop()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from transformers import pipeline


In [None]:
spark = SparkSession.builder.appName("SparkLLMSentiment").getOrCreate()

In [None]:
data = [
    ("I love this product!",),
    ("This is the worst service ever.",),
    ("Absolutely fantastic experience.",),
    ("I'm not happy with the results.",),
    ("The movie was okay, not great.",),
    ("What a wonderful surprise!",),
    ("I would not recommend this.",),
    ("Such a delightful day.",),
    ("Terrible customer support.",),
    ("This phone is amazing!",),
    ("Very disappointing performance.",),
    ("I'm so excited about this!",),
    ("Could be better.",),
    ("Totally satisfied with my purchase.",),
    ("I hate how this works.",),
    ("It exceeded my expectations!",),
    ("Nothing special about it.",),
    ("I'm impressed by the quality.",),
    ("Worst purchase I've made.",),
    ("A pretty decent option.",)
]

columns = ["sentence"]
df = spark.createDataFrame(data, columns)

In [None]:
def load_model():
    return pipeline("sentiment-analysis")

def predict_sentiment(text):
    global clf
    if "clf" not in globals():
        clf = load_model()
    result = clf(text)[0]['label'].lower()
    return result

In [67]:
sentiment_udf = udf(predict_sentiment, StringType())
df_with_predictions = df.withColumn("predicted_sentiment", sentiment_udf("sentence"))
df_with_predictions.show(truncate=False)


+-----------------------------------+-------------------+
|sentence                           |predicted_sentiment|
+-----------------------------------+-------------------+
|I love this product!               |positive           |
|This is the worst service ever.    |negative           |
|Absolutely fantastic experience.   |positive           |
|I'm not happy with the results.    |negative           |
|The movie was okay, not great.     |negative           |
|What a wonderful surprise!         |positive           |
|I would not recommend this.        |negative           |
|Such a delightful day.             |positive           |
|Terrible customer support.         |negative           |
|This phone is amazing!             |positive           |
|Very disappointing performance.    |negative           |
|I'm so excited about this!         |positive           |
|Could be better.                   |negative           |
|Totally satisfied with my purchase.|positive           |
|I hate how th

In [68]:
spark.stop()