Instaliranje pypspark biblioteke

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=a706d14b56fdacfcc76c45af59f82180d7738fa46164e73bcce50676a456f357
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


Povezivanje (mount-ovanje) licnog drajva odakle ce se ucitavati podaci za rad

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Ucitavanje biblioteka i/ili funkcija koje ce biti koriscene


In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, SQLTransformer, HashingTF, IDF, NGram, Word2Vec, StringIndexer
from pyspark.sql.functions import col, udf, when
from nltk.stem import PorterStemmer
from pyspark.sql.types import ArrayType, StringType, IntegerType, StructType, StructField, LongType, StringType
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql import SparkSession, Row

Ucitavanje Spark sesije i kreiranje ili pozivanje projekta pod nazivom "BigData - Project 2", kao i kreiranje seme za kasnije ucitavanje podataka

In [None]:
spark = SparkSession.builder.appName("BigData - Project 2").getOrCreate()

schema = StructType([ \
                     StructField("tweetID", IntegerType(), True), \
                     StructField("entity", StringType(), True), \
                     StructField("sentiment", StringType(), True), \
                     StructField("content", StringType(), True)])

Definisanje f-je za ucitavanje podataka sa drajva

In [None]:
def loadData(name):
  data = spark.read.option("sep", ",").schema(schema).csv('/content/drive/MyDrive/BIGDATA/Data/twiter/' + name)
  filteredData = data.na.drop()
  return filteredData

Definisanje potrebnih transformacija za sredjivanje teksta kako bi bio prigladan za kasnije kreiranje modela

In [None]:
regexTokenizer = RegexTokenizer(inputCol="content", outputCol="words", pattern="\\W")

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
current_stop_words = remover.getStopWords()
new_stop_words = ["rt", "http", "https", "www", "com", "amp", "via", "co", "u", "ll", "ve", "re", "m", "isn", "aren", "e", "im"]
updated_stop_words = current_stop_words + new_stop_words
remover.setStopWords(updated_stop_words)

labelIndexer = StringIndexer(inputCol="sentiment", outputCol="label")

cleaningData = Pipeline(stages=[regexTokenizer,remover, labelIndexer])

Ucitavanje train i test tabele sa drajva

In [None]:
loadTrain = loadData('twitter_training.csv')
loadTest = loadData('twitter_validation.csv')

Definisanje toka podataka(pipline) i transformacija prethodno ucitanih tabela

In [None]:
pipelineCleaning = cleaningData.fit(loadTrain)
training = pipelineCleaning.transform(loadTrain)
test = pipelineCleaning.transform(loadTest)

Definisanje i pozivanje UDF funkcije za stemizaciju (trazenja infinitiva reci)

In [None]:
stemmer = PorterStemmer()
def stem_text(tokens):
    return [stemmer.stem(token) for token in tokens]

stem_text_udf = udf(stem_text, ArrayType(StringType()))
training = training.withColumn("stem", stem_text_udf("filtered"))
test = test.withColumn("stem", stem_text_udf("filtered"))

Definisanje potrebni transformacija dokumenata(reci) u zahtevane numericke vrednosti

In [None]:
hashing = HashingTF(inputCol="stem", outputCol="hash", numFeatures=30)
hashing_ng = HashingTF(inputCol="ngrams", outputCol="hash", numFeatures=30)
idf = IDF(inputCol="hash", outputCol="features")
ngram = NGram(n=2, inputCol="stem", outputCol="ngrams")
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="stem", outputCol="features")

Definisanje potrebnih nizova transformacija i modela od kojih ce se kasnije kreirati tokovi podataka sa svim mogucim kombinacijama

In [None]:
featureTransformers = [[hashing, idf], [ngram, hashing_ng, idf], [word2Vec]]

models = [LogisticRegression(maxIter=20, labelCol="label", featuresCol="features"),
          DecisionTreeClassifier(labelCol="label", featuresCol="features"),
          RandomForestClassifier(labelCol="label", featuresCol="features")]


Kreiranje svih tokova podataka od svih kombinacija, tj 9 kombinacija

In [None]:
pipelines = [Pipeline(stages= transformers + [model])
             for model in models for transformers in featureTransformers]

In [None]:
paramGrids = {
      LogisticRegression: ParamGridBuilder() \
        .addGrid(LogisticRegression.regParam, [0.01, 0.1, 1.0]) \
        .addGrid(LogisticRegression.elasticNetParam, [0.0, 0.5, 1.0])\
        .build(),
      DecisionTreeClassifier: ParamGridBuilder() \
        .addGrid(DecisionTreeClassifier.maxDepth, [5, 10, 20]) \
        .addGrid(DecisionTreeClassifier.minInstancesPerNode, [1, 5, 10]) \
        .build(),
      RandomForestClassifier: ParamGridBuilder() \
        .addGrid(RandomForestClassifier.numTrees, [10, 20, 30]) \
        .addGrid(RandomForestClassifier.maxDepth, [5, 10, 15]) \
        .build()
      }

In [None]:
#training.show()
#0.0 Irrelevant
#1.0 POSITIVE
#2.0 NEUTRAL
#3.0 NEGATIVE

Postavljanje evaluatora metrika za modele (tj. merenje tacnosti modela). Kreiranje f-je koja uz pomoc tokova podataka i cross-validation optimizacije uz pomoc grid hiper-parametara trazi najbolji model sa najboljim performansama

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
def execute():
  results = {}
  for pipeline in pipelines:

      model=pipeline.getStages()[-1]
      model_type = type(pipeline.getStages()[-1]).__name__

      param_grid = ParamGridBuilder()
      if model_type=="LogisticRegression":
        param_grid = ParamGridBuilder() \
        .addGrid(model.regParam, [0.01, 0.1, 1.0]) \
        .addGrid(model.elasticNetParam, [0.0, 0.5, 1.0])\
        .build()
      elif model_type=="DecisionTreeClassifier":
        param_grid = ParamGridBuilder() \
        .addGrid(model.maxDepth, [5, 10, 20]) \
        .addGrid(model.minInstancesPerNode, [1, 5, 10]) \
        .build()
      elif model_type=="RandomForestClassifier":
        param_grid = ParamGridBuilder() \
        .addGrid(model.numTrees, [10, 20, 30]) \
        .addGrid(model.maxDepth, [5, 10, 15]) \
        .build()

      pipeline.getStages()[-1]=model

      crossval = CrossValidator(estimator=pipeline,
                                estimatorParamMaps=param_grid,
                                evaluator=evaluator,
                                numFolds=3)

      # Pokretanje cross-validacije
      cvModel = crossval.fit(training)

      # Dobijanje najboljeg modela i evaluacija
      bestModel = cvModel.bestModel
      performance = evaluator.evaluate(bestModel.transform(test))
      print(f"Accuracy iznosi: {performance}")
      transformers_description = ' + '.join([type(t).__name__ for t in pipeline.getStages()[:-1]])
      model_name = f"{type(pipeline.getStages()[-1]).__name__} with {transformers_description}"

      results[model_name] = {
            "model": bestModel,
            "accuracy": performance
      }

  return results

Pozivanje prethodno definisane f-je i stampanje rezultata

In [None]:
results = execute()
print(results)

Accuracy iznosi: 0.341
Accuracy iznosi: 0.303
Accuracy iznosi: 0.379
Accuracy iznosi: 0.667
Accuracy iznosi: 0.597
Accuracy iznosi: 0.56
Accuracy iznosi: 0.66
Accuracy iznosi: 0.612
Accuracy iznosi: 0.536
{'LogisticRegression with HashingTF + IDF': {'model': PipelineModel_cfda652fbe50, 'accuracy': 0.341}, 'LogisticRegression with NGram + HashingTF + IDF': {'model': PipelineModel_8bb49be265a9, 'accuracy': 0.303}, 'LogisticRegression with Word2Vec': {'model': PipelineModel_c9195b9e2465, 'accuracy': 0.379}, 'DecisionTreeClassifier with HashingTF + IDF': {'model': PipelineModel_1342f1023d3e, 'accuracy': 0.667}, 'DecisionTreeClassifier with NGram + HashingTF + IDF': {'model': PipelineModel_6e09f143b786, 'accuracy': 0.597}, 'DecisionTreeClassifier with Word2Vec': {'model': PipelineModel_a947789bd220, 'accuracy': 0.56}, 'RandomForestClassifier with HashingTF + IDF': {'model': PipelineModel_a89973d12ecb, 'accuracy': 0.66}, 'RandomForestClassifier with NGram + HashingTF + IDF': {'model': Pipeli

In [None]:
for key, value in results.items():
    print(f"Model: {key}")
    print(f"Accuracy je: {value}")
    print("\n")

Model: LogisticRegression with HashingTF + IDF
Accuracy je: {'model': PipelineModel_cfda652fbe50, 'accuracy': 0.341}


Model: LogisticRegression with NGram + HashingTF + IDF
Accuracy je: {'model': PipelineModel_8bb49be265a9, 'accuracy': 0.303}


Model: LogisticRegression with Word2Vec
Accuracy je: {'model': PipelineModel_c9195b9e2465, 'accuracy': 0.379}


Model: DecisionTreeClassifier with HashingTF + IDF
Accuracy je: {'model': PipelineModel_1342f1023d3e, 'accuracy': 0.667}


Model: DecisionTreeClassifier with NGram + HashingTF + IDF
Accuracy je: {'model': PipelineModel_6e09f143b786, 'accuracy': 0.597}


Model: DecisionTreeClassifier with Word2Vec
Accuracy je: {'model': PipelineModel_a947789bd220, 'accuracy': 0.56}


Model: RandomForestClassifier with HashingTF + IDF
Accuracy je: {'model': PipelineModel_a89973d12ecb, 'accuracy': 0.66}


Model: RandomForestClassifier with NGram + HashingTF + IDF
Accuracy je: {'model': PipelineModel_fdd495c4b84c, 'accuracy': 0.612}


Model: RandomForestC

In [None]:
#NAS ORIGIGI
feature_alg=['idf','idf_ng', 'w2v']
results={}
for feature in feature_alg:
  classifiers = {
    "Logistic Regression": LogisticRegression(labelCol="label", featuresCol=feature),
    "Decision Tree": DecisionTreeClassifier(labelCol="label", featuresCol=feature),
    "Random Forest": RandomForestClassifier(labelCol="label", featuresCol=feature)
    }

  evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName='accuracy')

  for key, value in classifiers.items():
    param_grid = ParamGridBuilder()
    model= value
    if key=="Logistic Regression":
      param_grid = ParamGridBuilder() \
      .addGrid(model.regParam, [0.01, 0.1, 1.0]) \
      .addGrid(model.elasticNetParam, [0.0, 0.5, 1.0])\
      .build()
    elif key=="Decision Tree":
      param_grid = ParamGridBuilder() \
      .addGrid(model.maxDepth, [5, 10, 20]) \
      .addGrid(model.minInstancesPerNode, [1, 5, 10]) \
      .build()
    else:
      param_grid = ParamGridBuilder() \
      .addGrid(model.numTrees, [10, 20, 30]) \
      .addGrid(model.maxDepth, [5, 10, 15]) \
      .build()

  crossval = CrossValidator(estimator=model,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)
  best_model = crossval.fit(training)
  predictions = best_model.transform(training)
  predictions= best_model.transform(training)
  ## ZASTO U LINIJI IZNAD KORISTIMO ISTE PODATKE ZA FITOVANJE NA BEST MODELU, ZAR TU NE TREBAJU TEST PODACI A NE TRAINING
  multiclass_evaluator = MulticlassClassificationEvaluator(labelCol="label")

  accuracy = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "accuracy"})
  precision = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName: "weightedPrecision"})

  print("Accuracy: ", accuracy)
  print("Prec: ", precision)

  results[(key, feature)] = {
            "model": best_model,
            "accuracy": accuracy,
            "precision": precision
      }
for key, value in alg.items():
    print(f"Ključ: {key}")
    print(f"Vrednost: {value}")
    print("\n")