# <center>POSREDNICI UMREŽENIH SUSTAVA<center>
<center>Ak. god. 2021./2022.<center>

    
## <center>2. laboratorijska vježba: Apache Spark<center>

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("PUS Lab 2.").getOrCreate()

### Jednostavan primjer pipeline-a i učenja modela logističke regresije na klasifikaciji teksta

1) Uvezivanje potrebnih komponenti

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

2) Kreacija DataFrame objekta za učenje i testiranje modela

In [4]:
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([(0, "a b c d e spark", 1.0),(1, "b d", 0.0),(2, "spark f g h", 1.0),(3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([(4, "spark i j k"),(5, "l m n"),(6, "spark hadoop spark"),(7, "apache hadoop")], ["id", "text"])

3) Definicija pipeline-a

In [5]:
# Separate sentences to words
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# Convert set of words to vectors (fixed len)
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),
outputCol="features")

# Set model
lr = LogisticRegression(maxIter=10, regParam=0.001)

# Pipline object 
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

4) Učenje modela na podacima za učenje

In [6]:
model = pipeline.fit(training)

5) Izvršavanje predikcija na testnom skupu i njihov ispis:

In [7]:
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.6292098489668476,0.3707901510331524], prediction=0.000000
(5, l m n) --> prob=[0.984770006762304,0.015229993237696027], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.13412348342566055,0.8658765165743394], prediction=1.000000
(7, apache hadoop) --> prob=[0.9955732114398529,0.00442678856014711], prediction=0.000000


### Učenje modela za klasifikaciju sentimenta osvrta na filmJednostavan primjer pipeline-a i učenja modela logističke regresije na klasifikaciji teksta

1. Preuzimanje skupa pod, predprocesiranje, pretvorba u mala slova, izrada dataframea

In [8]:
import os
import re

In [9]:
def preprocess(data):
    return re.sub(r'[^A-Za-z0-9 ]+', "", data.lower())

def load(path, test):
    data = []
    labels = []
    file_id = 0 
    
    cur_path = path + "pos/"
    for file in os.listdir(cur_path):
        f = open(cur_path + file, "r", encoding="utf8")
        whole = f.readlines()
        text = preprocess(whole[0])
        if test:
            data.append((file_id, text))
            labels.append(1.0)
        else:
            data.append((file_id, text, 1.0))
        file_id += 1
        
    cur_path = path + "neg/"
    for file in os.listdir(cur_path):
        f = open(cur_path + file, "r", encoding="utf8")
        whole = f.readlines()
        text = preprocess(whole[0])
        if test:
            data.append((file_id, text))
            labels.append(0.0)
        else:
            data.append((file_id, text, 0.0))
        file_id += 1
        
    if test:
        return data, labels
    else:
        return data

In [10]:
train = load("D:/datasetPus/train/", False)
test, labels = load("D:/datasetPus/test/", True)

sc = spark.sparkContext

rdd_train = sc.parallelize(train, numSlices=100)
training = rdd_train.toDF(["id", "text", "label"])
rdd_test = sc.parallelize(test, numSlices=100)
test = rdd_test.toDF(["id", "text"])

2. Izrada pipelinea i vektorizacija

In [11]:
from pyspark.ml.feature import StopWordsRemover, CountVectorizer

In [12]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")

remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="no_stopwords") #, stopWords=StopWordsRemover.loadDefaultStopWords('english'))
cv = CountVectorizer(inputCol=remover.getOutputCol(), outputCol="features", vocabSize=1000)

lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, remover, cv, lr])


3. Učenje i evaluacija modela logističke regresije (točnost, preciznost i odziv), broj FN i FP

In [13]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [14]:
def get_values(metrics):
    print("Accuracy (točnost) = ", metrics.accuracy)
    print("Precision (preciznost) = ", metrics.precision(1.0))
    print("Recall (odziv) = ", metrics.recall(1.0))
    print("Confusion matrix (matrica konfuzije) = \n", metrics.confusionMatrix().toArray())
    # TP FP
    # FN TN

def evaluate(model, test, labels):
    prediction = model.transform(test)
    selected = prediction.select("id", "text", "prediction")
    
    data = []
    for row in selected.collect():
        file_id, text, prediction = row
        data.append((prediction, labels[file_id]))
        
    predictionAndLabels = sc.parallelize(data)
                                         
    metrics = MulticlassMetrics(predictionAndLabels)
    get_values(metrics)

In [15]:
model = pipeline.fit(training)
evaluate(model, test, labels)



Accuracy (točnost) =  0.8542
Precision (preciznost) =  0.8460877042132416
Recall (odziv) =  0.86592
Confusion matrix (matrica konfuzije) = 
 [[10531.  1969.]
 [ 1676. 10824.]]


4. 2-grami kao ulaz u model

In [16]:
from pyspark.ml.feature import NGram

In [17]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")

remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="no_stopwords")
ngram = NGram(n=2, inputCol=remover.getOutputCol(), outputCol="ngrams")
cv = CountVectorizer(inputCol=ngram.getOutputCol(), outputCol="features", vocabSize=1000)

lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, remover, ngram, cv, lr])

model = pipeline.fit(training)
evaluate(model, test, labels)

Accuracy (točnost) =  0.74324
Precision (preciznost) =  0.725238906585673
Recall (odziv) =  0.7832
Confusion matrix (matrica konfuzije) = 
 [[8791. 3709.]
 [2710. 9790.]]


5. Spajanje vektora značajki jednorječne vektorizacije i 2-gram vektorizacije

In [18]:
from pyspark.ml.feature import VectorAssembler

In [19]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")

remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="no_stopwords")
ngram = NGram(n=2, inputCol=remover.getOutputCol(), outputCol="ngrams")
cv = CountVectorizer(inputCol=remover.getOutputCol(), outputCol="vectors", vocabSize=1000)
cv_n = CountVectorizer(inputCol=ngram.getOutputCol(), outputCol="vectors_n", vocabSize=1000)
vecassembler = VectorAssembler(inputCols=[cv.getOutputCol(), cv_n.getOutputCol()], outputCol="features")

lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, remover, ngram, cv, cv_n, vecassembler, lr])

model = pipeline.fit(training)
evaluate(model, test, labels)

Accuracy (točnost) =  0.85432
Precision (preciznost) =  0.8489599747872676
Recall (odziv) =  0.862
Confusion matrix (matrica konfuzije) = 
 [[10583.  1917.]
 [ 1725. 10775.]]


6. SVM i naivni Bayesov umjesto logističke regresije

In [20]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import NaiveBayes

In [21]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="no_stopwords")
cv = CountVectorizer(inputCol=remover.getOutputCol(), outputCol="features", vocabSize=1000)

In [22]:
print("1. SVM")
svm = LinearSVC(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, remover, cv, svm])
model = pipeline.fit(training)
evaluate(model, test, labels)

1. SVM
Accuracy (točnost) =  0.85656
Precision (preciznost) =  0.8443294190358467
Recall (odziv) =  0.87432
Confusion matrix (matrica konfuzije) = 
 [[10485.  2015.]
 [ 1571. 10929.]]


In [23]:
print("2. Naive Bayes")
nb = NaiveBayes(smoothing=1.0, modelType='multinomial')
pipeline = Pipeline(stages=[tokenizer, remover, cv, nb])
model = pipeline.fit(training)
evaluate(model, test, labels)

2. Naive Bayes
Accuracy (točnost) =  0.828
Precision (preciznost) =  0.8274760383386581
Recall (odziv) =  0.8288
Confusion matrix (matrica konfuzije) = 
 [[10340.  2160.]
 [ 2140. 10360.]]
