In [1]:
!pip install pyspark



In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
 data_path = '/content/drive/MyDrive/Colab Notebooks/DATA/'

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('nlp financial sentiment - projekat').config("spark.executor.instances", "4").config("spark.executor.cores", "8").config("spark.executor.memory", "12g").getOrCreate()

In [5]:
data = spark.read.csv(data_path+"data.csv",inferSchema=True, header= True)
data.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|Sentence                                                                                                                                                                                                                       |Sentiment|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|The GeoSolutions technology will leverage Benefon 's GPS solutions by providing Location Based Search Technology , a Communities Platform , location relevant multimedia content and a new and powerful commercial model .     |positive |
|$ESI on lows, down $1.50 to $2.50 BK a real possibility

In [None]:
from pyspark.sql.functions import col

sentiment_counts =  data.groupBy("Sentiment").count()
sentiment_counts.show(truncate= False)

+--------------------------------------------------+-----+
|Sentiment                                         |count|
+--------------------------------------------------+-----+
|positive                                          |1852 |
|neutral                                           |3130 |
|negative                                          |859  |
| the damage is done. No one wants to touch it now"|1    |
+--------------------------------------------------+-----+



In [None]:
#Izbacujemo iz sentimenta The damage is done...
data = data.filter(data.Sentiment != " the damage is done. No one wants to touch it now\"")
sentiment_counts =  data.groupBy("Sentiment").count()
print("Pravilni sentimenti:")
sentiment_counts.show(truncate= False)

Pravilni sentimenti:
+---------+-----+
|Sentiment|count|
+---------+-----+
|positive |1852 |
|neutral  |3130 |
|negative |859  |
+---------+-----+



In [None]:
sentiment_frakcije = data.groupBy("Sentiment").count().withColumn("frakcija", col("count") / data.count())
#data_count = data.count()
#data_count.show() - 5841 redova
print("Udeo i broj svakog sentimenta u celom dataset-u: ")
sentiment_frakcije.show()

Udeo i broj svakog sentimenta u celom dataset-u: 
+---------+-----+------------------+
|Sentiment|count|          frakcija|
+---------+-----+------------------+
| positive| 1852|0.3170689950350967|
|  neutral| 3130|0.5358671460366375|
| negative|  859|0.1470638589282657|
+---------+-----+------------------+



In [None]:
# SMANJIVANJE DATASET-A ALI DA FRAKCIJA OSTANE ISTA, TJ. UDEO SVAKOG SENTIMENTA DA BUDE PRIBLIŽAN
broj_uzoraka = 80
frakcije = {row['Sentiment']: broj_uzoraka / data.count() for row in sentiment_frakcije.collect()}


# Stratifikovano uzorkovanje koristeći izračunate frakcije
sampled_df = data.stat.sampleBy("Sentiment", frakcije, seed=40)

#Broj svakog sentimenta u smanjenom skupu
print("Smanjen skup:")
sampled_df.groupBy("Sentiment").count().show()
sampled_df.show()

Smanjen skup:
+---------+-----+
|Sentiment|count|
+---------+-----+
| positive|   26|
|  neutral|   45|
| negative|   15|
+---------+-----+

+--------------------+---------+
|            Sentence|Sentiment|
+--------------------+---------+
|Costco: A Premier...| positive|
|Kauko-Telko 's ce...|  neutral|
|Finnish technolog...| positive|
|Finnish textiles ...|  neutral|
|There have not be...|  neutral|
|Net interest inco...| positive|
|No planned closin...|  neutral|
|Foodservice and c...|  neutral|
|The bridge is par...|  neutral|
|From Merisatama t...|  neutral|
|The combined capi...|  neutral|
|Finnish IT consul...| positive|
|Operating profit ...| negative|
|RECAP: $MSFT rumo...| positive|
|Known as Post Ban...| positive|
|Standard Chartere...| negative|
|In July-September...| positive|
|$EA points to the...| negative|
|The Polish busine...|  neutral|
|     $SPY weak close| negative|
+--------------------+---------+
only showing top 20 rows



In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer, IndexToString,NGram
from pyspark.ml.classification import LogisticRegression , DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import spacy
import nltk
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from nltk.stem import WordNetLemmatizer
from pyspark.ml.feature import Word2Vec
import nltk
from pyspark.ml.tuning import CrossValidatorModel
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [None]:
#Tokenizacija
def tokenize(text):
    return nltk.word_tokenize(text)

# funkcija Lemizacije
def lemmatize(tokens):
    spacy_nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
    return [token.lemma_ for token in spacy_nlp(' '.join(tokens))]

# UDF
tokenize_udf = udf(tokenize, ArrayType(StringType()))
lemmatize_udf = udf(lemmatize, ArrayType(StringType()))

# Tokenizacija i lemizacija
data = sampled_df.withColumn("tokens", tokenize_udf("Sentence"))
data = data.withColumn("lemmatized", lemmatize_udf("tokens"))

data = data.drop('Sentence')
data = data.drop('tokens')


stringIndexer = StringIndexer(inputCol="Sentiment", outputCol="labelIndex")
data = stringIndexer.fit(data).transform(data)


stopWordsRemover = StopWordsRemover(inputCol="lemmatized", outputCol="filtered")
data = stopWordsRemover.transform(data)

In [None]:
print("Podaci nakon encodinga i stopWordsRemovera:")
data.show()

Podaci nakon encodinga i stopWordsRemovera:
+---------+--------------------+----------+--------------------+
|Sentiment|          lemmatized|labelIndex|            filtered|
+---------+--------------------+----------+--------------------+
| positive|[costco, :, a, Pr...|       1.0|[costco, :, Premi...|
|  neutral|[Kauko, -, Telko,...|       0.0|[Kauko, -, Telko,...|
| positive|[finnish, technol...|       1.0|[finnish, technol...|
|  neutral|[finnish, textile...|       0.0|[finnish, textile...|
|  neutral|[there, have, not...|       0.0|[previous, share,...|
| positive|[net, interest, i...|       1.0|[net, interest, i...|
|  neutral|[no, planned, clo...|       0.0|[planned, closing...|
|  neutral|[foodservice, and...|       0.0|[foodservice, con...|
|  neutral|[the, bridge, be,...|       0.0|[bridge, part, hi...|
|  neutral|[from, Merisatama...|       0.0|[Merisatama, far,...|
|  neutral|[the, combined, c...|       0.0|[combined, capita...|
| positive|[finnish, IT, con...|       1.0|[fi

In [None]:
#PODELA NA TRENING I TEST
train_data, test_data = data.randomSplit([0.8, 0.2], seed=43)


print("Trening podaci:")
train_data.show()

print("Test podaci:")
test_data.show()

Trening podaci:
+---------+--------------------+----------+--------------------+
|Sentiment|          lemmatized|labelIndex|            filtered|
+---------+--------------------+----------+--------------------+
| negative|[$, ACOM, http, :...|       2.0|[$, ACOM, http, :...|
| negative|[$, EA, point, to...|       2.0|[$, EA, point, tw...|
| negative|[$, QIHU, totally...|       2.0|[$, QIHU, totally...|
| negative|[$, RNN, More, bl...|       2.0|[$, RNN, bleed, M...|
| negative|[EMSA, Deputy, Ch...|       2.0|[EMSA, Deputy, Ch...|
| negative|[Finnish, Vaahto,...|       2.0|[Finnish, Vaahto,...|
| negative|[Standard, Charte...|       2.0|[Standard, Charte...|
| negative|[on, the, other, ...|       2.0|[hand, ,, finnish...|
| negative|[operate, profit,...|       2.0|[operate, profit,...|
| negative|[shell, share, pr...|       2.0|[shell, share, pr...|
| negative|[the, payment, of...|       2.0|[payment, 2.779, ...|
|  neutral|[Amanda, say, tha...|       0.0|[Amanda, say, alr...|
|  neutra

In [None]:
from pyspark.ml.tuning import CrossValidatorModel

In [None]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
ngramHashingTF = HashingTF(inputCol="ngrams", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
ngram = NGram(n=2, inputCol="filtered", outputCol="ngrams")
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered", outputCol="features")

models = [LogisticRegression(maxIter=20, featuresCol="features", labelCol="labelIndex"),
          DecisionTreeClassifier(labelCol="labelIndex", featuresCol="features"),
          RandomForestClassifier(labelCol="labelIndex", featuresCol="features")]

featureTransformers = [[hashingTF, idf], [ngram, ngramHashingTF, idf], [word2Vec]]

pipelines = [Pipeline(stages= transformers + [model])
             for model in models for transformers in featureTransformers]

evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedRecall")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="f1")

In [None]:
# Definisanje paramGrid-a za svaki model posebno
paramGrids = {
    LogisticRegression: ParamGridBuilder()
        .addGrid(LogisticRegression.regParam, [0.0,0.1, 0.01])
        .build(),
    DecisionTreeClassifier: ParamGridBuilder()
        .addGrid(DecisionTreeClassifier.maxDepth, [4,5,6])
        .build(),
    RandomForestClassifier: ParamGridBuilder()
        .addGrid(RandomForestClassifier.numTrees, [5,10,20])
        .build()
}

results = {}

model_names = {
    LogisticRegression: "LogisticRegression",
    DecisionTreeClassifier: "DecisionTreeClassifier",
    RandomForestClassifier: "RandomForestClassifier"
}

In [None]:
def learn():
    for i, pipeline in enumerate(pipelines):

        # Identifikacija modela u trenutnom pipeline-u
        model_type = type(pipeline.getStages()[-1])

        # Odabir odgovarajućeg paramGrid-a za model
        paramGrid = paramGrids[model_type]

        print("proba1")
        crossval = CrossValidator(estimator=pipeline,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=evaluator,
                                  numFolds=3)

        # Pokretanje cross-validacije
        print("proba2")
        cvModel = crossval.fit(train_data)

        # Dobijanje najboljeg modela i evaluacija
        bestModel = cvModel.bestModel
        accuracy = evaluator.evaluate(bestModel.transform(test_data))
        precision = evaluator_precision.evaluate(bestModel.transform(test_data))
        recall = evaluator_recall.evaluate(bestModel.transform(test_data))
        f1 = evaluator_f1.evaluate(bestModel.transform(test_data))

        transformers_description = ' + '.join([type(t).__name__ for t in pipeline.getStages()[:-1]])
        model_name = f"{model_names[model_type]} with {transformers_description}"

        # Dobijanje parametara najboljeg modela
        bestModelParams = {param.name: value for param, value in bestModel.stages[-1].extractParamMap().items()}
        print("proba3")
        # Dinamičko dodavanje rezultata u rečnik
        if model_name not in results:
            results[model_name] = []  # Inicijalizacija liste ako ključ ne postoji

        # Dodavanje informacija u rezultate
        results[model_name].append({
            "Accuracy": accuracy,
            "Precision": precision,
            "Recall": recall,
            "F1 Score": f1,
            "Parameters": bestModelParams
        })
    return results

In [None]:
rezultat = learn()

proba1
proba2
proba3
proba1
proba2
proba3
proba1
proba2
proba3
proba1
proba2
proba3
proba1
proba2
proba3
proba1
proba2
proba3
proba1
proba2
proba3
proba1
proba2
proba3
proba1
proba2
proba3


In [None]:
rezultat

{'LogisticRegression with HashingTF + IDF': [{'Accuracy': 0.4117647058823529,
   'Precision': 0.34313725490196084,
   'Recall': 0.4117647058823529,
   'F1 Score': 0.37433155080213915,
   'Parameters': {'aggregationDepth': 2,
    'elasticNetParam': 0.0,
    'family': 'auto',
    'featuresCol': 'features',
    'fitIntercept': True,
    'labelCol': 'labelIndex',
    'maxBlockSizeInMB': 0.0,
    'maxIter': 20,
    'predictionCol': 'prediction',
    'probabilityCol': 'probability',
    'rawPredictionCol': 'rawPrediction',
    'regParam': 0.0,
    'standardization': True,
    'threshold': 0.5,
    'tol': 1e-06}}],
 'LogisticRegression with NGram + HashingTF + IDF': [{'Accuracy': 0.5882352941176471,
   'Precision': 0.36764705882352944,
   'Recall': 0.5882352941176471,
   'F1 Score': 0.45248868778280543,
   'Parameters': {'aggregationDepth': 2,
    'elasticNetParam': 0.0,
    'family': 'auto',
    'featuresCol': 'features',
    'fitIntercept': True,
    'labelCol': 'labelIndex',
    'maxBlockS