In [1]:
%pip install elasticsearch
%pip install numpy

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from elasticsearch import Elasticsearch
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, StringType  # Import StringType here
from sklearn.metrics import classification_report

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("My App") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", "1") \
    .getOrCreate()

# Load dataset
data = spark.read.format("csv").option("header", "true").load("./work/data.csv")

# Convert label column to numeric
indexer = StringIndexer(inputCol="label", outputCol="label_index")
data = indexer.fit(data).transform(data)

# Drop the original string label column
data = data.drop("label")

# Rename the new numeric label column to "label"
data = data.withColumnRenamed("label_index", "label")

# Tokenize the text
tokenizer = Tokenizer(inputCol="text", outputCol="words")
data = tokenizer.transform(data)

# Create TF-IDF features
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
pipeline = Pipeline(stages=[hashingTF, idf])
pipelineFit = pipeline.fit(data)
data = pipelineFit.transform(data)

# Split the dataset into training and testing sets
(trainingData, testData) = data.randomSplit([0.8, 0.2], seed=1234)

# Train logistic regression model
lr = LogisticRegression(maxIter=10, regParam=0.01, probabilityCol="probability")
lrModel = lr.fit(trainingData)

# Make predictions on the testing set
predictions = lrModel.transform(testData)

# Extract the probability for the positive class from the "probability" column
def get_positive_probability(probability):
    return probability[1].item()

get_positive_probability_udf = udf(get_positive_probability, DoubleType())

predictions = predictions.withColumn("positive_probability", get_positive_probability_udf(predictions["probability"]))

# Define a UDF to convert sentiment probability to label
def get_sentiment_label(probability):
    if probability >= 0.5:
        return "positive"
    else:
        return "negative"

sentiment_udf = udf(get_sentiment_label, StringType())

# Add a column for sentiment label based on the positive probability
predictions = predictions.withColumn("sentiment", sentiment_udf(predictions["positive_probability"]))

# Connect to Elasticsearch
es = Elasticsearch(['http://elasticsearch:9200'])

# Check if the connection is successful
if es.ping():
    print("Connected to Elasticsearch")
else:
    print("Could not connect to Elasticsearch")

# Convert Spark DataFrame to Pandas DataFrame
predictions_pandas = predictions.select("id", "sentiment").toPandas()

# Save results to Elasticsearch
for row in predictions_pandas.itertuples():
    es.index(index='sentiment_analysis_index', body={'sentiment': row.sentiment, 'id': row.id})


# Evaluate model
pred = [1 if sentiment == 'positive' else 0 for sentiment in predictions_pandas['sentiment']]
actual = [1 if label == 'pos' else 0 for label in testData.select('label').collect()]
print(classification_report(actual, pred, target_names=['negative', 'positive']))

# Stop SparkSession
spark.stop()

Connected to Elasticsearch


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


              precision    recall  f1-score   support

    negative       1.00      0.44      0.61        43
    positive       0.00      0.00      0.00         0

    accuracy                           0.44        43
   macro avg       0.50      0.22      0.31        43
weighted avg       1.00      0.44      0.61        43

