In [49]:
# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from tensorflow.keras.preprocessing.text import Tokenizer as KerasTokenizer
#import tensorflow as tf
#from tensorflow.keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Embedding, Conv1D, GlobalMaxPooling1D, Dense
import numpy as np


In [50]:
# Create Spark session
appName = "Sentiment Analysis with CNN in Spark"
spark = SparkSession.builder.appName(appName).getOrCreate()


In [51]:
# Read CSV file into DataFrame with automatically inferred schema
tweets_csv = spark.read.csv(r"C:\Users\DELL\Downloads\sparkproject\sparkproject\project\dataset\tweets.csv", inferSchema=True, header=True)

In [52]:
# Select only "SentimentText" and "Sentiment" column, and cast "Sentiment" column data into integer
data = tweets_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))


In [53]:
# Select only "SentimentText" and "Sentiment" column, and cast "Sentiment" column data into integer
data = tweets_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))


In [54]:
# Divide data, 70% for training, 30% for testing
dividedData = data.randomSplit([0.7, 0.3])
trainingData = dividedData[0]  
testingData = dividedData[1]  


In [55]:
# Define the Keras tokenizer
keras_tokenizer = KerasTokenizer()
# Collect the text data from the DataFrame column and pass it to the tokenizer
texts = trainingData.select("SentimentText").rdd.flatMap(lambda x: x).collect()
keras_tokenizer.fit_on_texts(texts)

In [56]:
# Define vocabulary size
vocab_size = len(keras_tokenizer.word_index) + 1

# Convert text data to sequences
train_sequences = keras_tokenizer.texts_to_sequences(texts)
test_sequences = keras_tokenizer.texts_to_sequences(testingData.select("SentimentText").rdd.flatMap(lambda x: x).collect())

In [57]:
# Pad sequences to ensure uniform length
maxlen = max(len(seq) for seq in train_sequences)
train_data = pad_sequences(train_sequences, maxlen=maxlen)
test_data = pad_sequences(test_sequences, maxlen=maxlen)


In [58]:
# Define the CNN model
embedding_dim = 50
filters = 250
kernel_size = 3

In [64]:
model = Sequential()
model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim))
model.add(Conv1D(filters, kernel_size, activation='relu'))
model.add(GlobalMaxPooling1D())
model.add(Dense(1, activation='sigmoid'))
model.summary()

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])


In [None]:
# Train the model
history=model.fit(train_data, np.array(trainingData.select("label").collect()), epochs=5, batch_size=64, validation_split=0.2)

In [None]:
import matplotlib.pyplot as plt

# Plot loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot accuracy
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
# Evaluate the model
loss, accuracy = model.evaluate(test_data, np.array(testingData.select("label").collect()))

print(f"Test Loss: {loss}, Test Accuracy: {accuracy}")

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Generate confusion matrix
cm = confusion_matrix(true_labels, predictions)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='d', xticklabels=['Negative', 'Positive'], yticklabels=['Negative', 'Positive'])
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()


In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report


# Evaluate the model on test data
predictions_proba = model.predict(test_data)
predictions = (predictions_proba > 0.5).astype(int)

# Get the true labels
true_labels = np.array(testingData.select("label").collect())

# Generate confusion matrix
cm = confusion_matrix(true_labels, predictions)
print("Confusion Matrix:")
print(cm)

# Generate classification report
report = classification_report(true_labels, predictions)
print("Classification Report:")
print(report)




In [None]:
from keras.preprocessing.sequence import pad_sequences

# Define the input text data
input_text = ["This is a great movie!",
              "I didn't like this film at all."]

# Convert text data to sequences using the Keras tokenizer
input_sequences = keras_tokenizer.texts_to_sequences(input_text)

# Pad sequences to ensure uniform length
maxlen = max(len(seq) for seq in input_sequences)
padded_input_sequences = pad_sequences(input_sequences, maxlen=maxlen)

# Make predictions using the trained model
predictions_proba = model.predict(padded_input_sequences)
predictions = (predictions_proba > 0.5).astype(int)

# Print the predictions
for i, text in enumerate(input_text):
    print(f"Input Text: {text}")
    print(f"Predicted Sentiment: {'Positive' if predictions[i] == 1 else 'Negative'} (Probability: {predictions_proba[i][0]:.4f})")
    print()


In [None]:
# Stop Spark session
spark.stop()