<a href="https://colab.research.google.com/github/aleksandra1206/sentiment_analysis_nlp/blob/main/SentimentAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [None]:
from google.colab import drive
drive.mount('/content/drive')
data_path = '/content/drive/MyDrive/Domaci2BD/'

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('TwitterSentimentAnalysis').config("spark.executor.instances", "4").config("spark.executor.cores", "8").config("spark.executor.memory", "18g").config("spark.driver.memory", "8g").getOrCreate()

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Title", StringType(), True),
    StructField("Sentiment", StringType(), True),
    StructField("Sentence", StringType(), True)
])

data = spark.read.csv(data_path+"twitter_training.csv", schema=schema, header=False)

data.show(truncate=False)

+----+-----------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id  |Title      |Sentiment|Sentence                                                                                                                                                                                                                                                                                             |
+----+-----------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2401|Borderlands|Positive |im 

In [None]:
print("Broj redova gde je Sentence null:")
data.filter(data["Sentence"].isNull()).count()

Broj redova gde je Sentence null:


686

In [None]:
# izbacivanje redova gde je Sentence null
print("Broj redova gde je Sentence null nakon filtriranja:")
data = data.na.drop(subset=["Sentence"])
data.filter(data["Sentence"].isNull()).count()

Broj redova gde je Sentence null nakon filtriranja:


0

In [None]:
print("Filtrirani podaci bez null vrednosti, broj redova:")
data.count()
# dataset ima 73996 redova i moze usporiti performanse modela, zbog cega ce se koristiti stratifikovano uzorkovanje

Filtrirani podaci bez null vrednosti, broj redova:


73996

In [None]:
from pyspark.sql.functions import col
sentiment_counts = data.groupBy("Sentiment").count().withColumn("frakcija", col("count") / data.count())
print("Udeo i broj svakog sentimenta u celom dataset-u: ")
sentiment_counts.show()

Udeo i broj svakog sentimenta u celom dataset-u: 
+----------+-----+-------------------+
| Sentiment|count|           frakcija|
+----------+-----+-------------------+
|Irrelevant|12875|   0.17399589166982|
|  Positive|20655|0.27913671009243746|
|   Neutral|18108|0.24471593059084276|
|  Negative|22358| 0.3021514676468998|
+----------+-----+-------------------+



In [None]:
# SMANJIVANJE DATASET-A
rowNum = 100
fractions = {row['Sentiment']: rowNum / data.count() for row in sentiment_counts.collect()}


# Stratifikovano uzorkovanje koristeći frakcije
sampled = data.stat.sampleBy("Sentiment", fractions, seed=40)

print("Smanjen skup:")
new_sentiment = sampled.groupBy("Sentiment").count().withColumn("frakcija", col("count") / sampled.count())
new_sentiment.show()

Smanjen skup:
+----------+-----+-------------------+
| Sentiment|count|           frakcija|
+----------+-----+-------------------+
|Irrelevant|   20|0.21505376344086022|
|   Neutral|   23|0.24731182795698925|
|  Positive|   27| 0.2903225806451613|
|  Negative|   23|0.24731182795698925|
+----------+-----+-------------------+



In [None]:
data = sampled
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, size
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer, IndexToString,NGram
from nltk.stem import WordNetLemmatizer
from nltk.stem.porter import PorterStemmer
import nltk
import spacy

nltk.download('punkt')
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


True

In [None]:
# Parser za lematizaciju
spacy_nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) # disable parser i ner zbog efikasnosti
lemmatizer = WordNetLemmatizer()

def tokenize_and_lemmatize_partition(partition):
    texts = []
    sentiments = []

    for row in partition:
        tokens = nltk.word_tokenize(row['Sentence']) # od teksta u svakom redu izvuci tokene
        texts.append(' '.join(tokens))
        sentiments.append(row['Sentiment']) # dodaj sentiment za dati red tokena

    # Procesuirati tekst pomocu spacy pipe
    lemmatized_texts = []
    for doc in spacy_nlp.pipe(texts, batch_size=50):
        lemmatized_texts.append([token.lemma_ for token in doc]) # vrati lemu za svaki token

    # Vrati rezultat
    for sentiment, lemmatized in zip(sentiments, lemmatized_texts):
        yield (sentiment, lemmatized)

In [None]:
data = data.repartition(5)
data = data.rdd.mapPartitions(tokenize_and_lemmatize_partition).toDF(["Sentiment", "lemmatized"])

# Napraviti indeks od kolone Sentiment
stringIndexer = StringIndexer(inputCol="Sentiment", outputCol="Index")
data = stringIndexer.fit(data).transform(data)

# Ciscenje stop reci
stopWordsRemover = StopWordsRemover(inputCol="lemmatized", outputCol="filtered")
data = stopWordsRemover.transform(data)

# izbaci sve redove gde nakon uklanjanja stop reci ne ostaje ni jedna rec u filter koloni
data = data.filter(size(col("filtered")) > 0)

# Prikazi rezultat
print("Dataset nakon tokenizacije, lemitizacije i uklanjanja stop reci:")
data.select("Sentiment", "lemmatized", "Index", "filtered").show()
data.count()

Dataset nakon tokenizacije, lemitizacije i uklanjanja stop reci:
+----------+--------------------+-----+--------------------+
| Sentiment|          lemmatized|Index|            filtered|
+----------+--------------------+-----+--------------------+
|Irrelevant|             [S4, c]|  3.0|             [S4, c]|
|  Positive|[I, ', m, actuall...|  0.0|[', m, actually, ...|
|   Neutral|[nice, video, by,...|  2.0|[nice, video, @, ...|
|   Neutral|[I, do, not, know...|  2.0|[know, ,, somehow...|
|  Positive|[lol, I, be, look...|  0.0|[lol, look, somet...|
|  Negative|            [wtf, ?]|  1.0|            [wtf, ?]|
|  Negative|[pilot, see, when...|  1.0|[pilot, see, usel...|
|  Positive|       [handy, work]|  0.0|       [handy, work]|
|   Neutral|[<, unk, >, just,...|  2.0|[<, unk, >, get, ...|
|  Negative|[@, Verizon, @, y...|  1.0|[@, Verizon, @, y...|
|  Negative|[sweet, !, more, ...|  1.0|[sweet, !, recycl...|
|Irrelevant|[FICK, you, @, sc...|  3.0|[FICK, @, scroffy...|
|Irrelevant|[HELLS, 

93

In [None]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=43)

print("Trening podaci:")
train_data.show(truncate=False)

print("Test podaci:")
test_data.show(truncate=False)

Trening podaci:
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------+
|Sentiment |lemmatized                                                                                                                                                                                      |Index|filtered                                                                                                                                      |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+---------------------------------------------------------------

In [None]:
from pyspark.ml.feature import HashingTF, IDF, NGram, Word2Vec
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml import Pipeline

def create_pipeline(text_method, model):
    stages = []
    if text_method == "hashingTF idf":
        hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
        idf = IDF(inputCol="rawFeatures", outputCol="features")
        stages.extend([hashingTF, idf])
    elif text_method == "ngram hashingTF idf":
        ngram = NGram(n=2, inputCol="filtered", outputCol="bigrams")
        hashingTF = HashingTF(inputCol="bigrams", outputCol="rawFeaturesBigrams")
        idf = IDF(inputCol="rawFeaturesBigrams", outputCol="features")
        stages.extend([ngram, hashingTF, idf])
    elif text_method == "word2vec":
        word2Vec = Word2Vec(inputCol="filtered", outputCol="features", vectorSize=3, minCount=0)
        stages.append(word2Vec)

    stages.append(model)

    pipeline = Pipeline(stages=stages)
    return pipeline

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="Index", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="Index", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="Index", predictionCol="prediction", metricName="weightedRecall")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="Index", predictionCol="prediction", metricName="f1")

def train(text_method, models, train_data):
    trained_models = []
    for model_name, model in models.items():
        print(f"Training {model_name} with {text_method}...")

        # Kreiranje pipeline-a
        pipeline = create_pipeline(text_method, model)

        # Definisan parametarski grid za hiperparametre
        paramGrid = ParamGridBuilder()
        if model_name == "LogisticRegression":
            paramGrid.addGrid(model.regParam, [0.1, 0.01])
            paramGrid.addGrid(model.elasticNetParam, [0.0, 0.5])
        elif model_name == "DecisionTreeClassifier":
            paramGrid.addGrid(model.maxDepth, [5, 10])
            paramGrid.addGrid(model.minInstancesPerNode, [1, 2])
        elif model_name == "RandomForestClassifier":
            paramGrid.addGrid(model.numTrees, [20, 50])

        paramGrid = paramGrid.build()

        # Definisan CrossValidator
        crossval = CrossValidator(estimator=pipeline,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=evaluator_accuracy,
                                  numFolds=3)

        # Obuka modela
        cv_model = crossval.fit(train_data)
        trained_models.append((model_name + " with " + text_method, cv_model.bestModel))

        print(f"Training completed for {model_name} with {text_method}")

    return trained_models

def learn(trained_models, test_data):
    print("Evaluating models...")
    results = {}
    for model_name, bestModel in trained_models:
        if model_name not in results:
            results[model_name] = []
        # Predikcija na test podacima
        predictions = bestModel.transform(test_data)

        # Evaluacija performansi modela
        f1 = evaluator_f1.evaluate(predictions)
        accuracy = evaluator_accuracy.evaluate(predictions)
        precision = evaluator_precision.evaluate(predictions)
        recall = evaluator_recall.evaluate(predictions)
        bestModelParams = {param.name: value for param, value in bestModel.stages[-1].extractParamMap().items()}
        results[model_name].append({
            "Accuracy": accuracy,
            "Precision": precision,
            "Recall": recall,
            "F1 Score": f1,
            "Parameters": bestModelParams
        })
    return results
# Lista metoda obrade teksta
text_methods = ["hashingTF idf", "ngram hashingTF idf", "word2vec"]

# Lista modela
models = {
    "DecisionTreeClassifier": DecisionTreeClassifier(featuresCol="features", labelCol="Index"),
    "LogisticRegression": LogisticRegression(featuresCol="features", labelCol="Index"),
    "RandomForestClassifier": RandomForestClassifier(featuresCol="features", labelCol="Index")
}

In [None]:
# Iteracija kroz metode obrade teksta
def train_models(text_methods, models, train_data):
  trained_models = []
  for text_method in text_methods:
      # Treniranje svih modela za dati tekst metod i cuvanje u dictionary
      trained_models=trained_models + train(text_method, models, train_data)
  return trained_models

In [None]:
trained_models = train_models(text_methods, models, train_data)

Training DecisionTreeClassifier with hashingTF idf...
Training completed for DecisionTreeClassifier with hashingTF idf
Training LogisticRegression with hashingTF idf...
Training completed for LogisticRegression with hashingTF idf
Training RandomForestClassifier with hashingTF idf...
Training completed for RandomForestClassifier with hashingTF idf
Training DecisionTreeClassifier with ngram hashingTF idf...
Training completed for DecisionTreeClassifier with ngram hashingTF idf
Training LogisticRegression with ngram hashingTF idf...
Training completed for LogisticRegression with ngram hashingTF idf
Training RandomForestClassifier with ngram hashingTF idf...
Training completed for RandomForestClassifier with ngram hashingTF idf
Training DecisionTreeClassifier with word2vec...
Training completed for DecisionTreeClassifier with word2vec
Training LogisticRegression with word2vec...
Training completed for LogisticRegression with word2vec
Training RandomForestClassifier with word2vec...
Trainin

In [None]:
# Evaluacija svih treniranih modela
result = learn(trained_models, test_data)
result

Evaluating models...


{'DecisionTreeClassifier with hashingTF idf': [{'Accuracy': 0.2,
   'Precision': 0.10428571428571429,
   'Recall': 0.2,
   'F1 Score': 0.13444444444444445,
   'Parameters': {'cacheNodeIds': False,
    'checkpointInterval': 10,
    'featuresCol': 'features',
    'impurity': 'gini',
    'labelCol': 'Index',
    'leafCol': '',
    'maxBins': 32,
    'maxDepth': 5,
    'maxMemoryInMB': 256,
    'minInfoGain': 0.0,
    'minInstancesPerNode': 1,
    'minWeightFractionPerNode': 0.0,
    'predictionCol': 'prediction',
    'probabilityCol': 'probability',
    'rawPredictionCol': 'rawPrediction',
    'seed': 5165298090080506173}}],
 'LogisticRegression with hashingTF idf': [{'Accuracy': 0.15,
   'Precision': 0.08787878787878788,
   'Recall': 0.15000000000000002,
   'F1 Score': 0.11058823529411765,
   'Parameters': {'aggregationDepth': 2,
    'elasticNetParam': 0.5,
    'family': 'auto',
    'featuresCol': 'features',
    'fitIntercept': True,
    'labelCol': 'Index',
    'maxBlockSizeInMB': 0.0,