In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, RandomForestClassificationSummary, RandomForestClassificationModel, NaiveBayes, GBTClassifier,FMClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, ChiSqSelector, PCA
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [2]:
import warnings
warnings.filterwarnings('ignore')

# Projekt

W niniejszej analizie weźmiemy pod uwagę dane Travel z bazy danych StackOverflow. Ogólny zamysł tego zbioru danych polega na pytaniach oraz odpowiedziach użytkowników na temat podróżowania oraz turystyki wraz ze wszystkimi szczegółami.

Skupimy się na trzech tabelach takich jak Posts, Users oraz Tags. Wybierzemy te zmienne, które będą pasowały do specyfiki projektu oraz przydadzą się w modelowaniu i analizie. Dodatkowo stworzymy własne zmienne w celu ułatwienia stworzenia klasyfikatorów. Naszym zadaniem będzie zbudowanie modelu przewidującego, czy na dane pytanie zostanie udzielona zaakceptowana odpowiedź. Poniżej umieszczamy opis zmiennych, które weźmiemy pod uwagę w analizie.

### Tabela Posts (22 kolumn, 128164 wierszy)


Zawiera głównie informacje na temat nieusuniętych pytań oraz odpowiedzi. W naszym przypadku będziemy brali pod uwagę tylko pytania oraz poniższe zmienne

- Id (identyfikator posta),
- PostTypeId (1 = Pytanie, 2 = Odpowiedź),
- AcceptedAnswerId (identyfikator zaakceptowanej odpowiedzi),
- CreationDate (data utworzenia),
- ClosedDate (data zamknięcia),
- Score (różnica pomiędzy pozytywnymi, a negatywnymi reakcjami pod postem),
- Body (treść posta),
- OwnerUserId (numer identyfikacyjny twórcy),
- Tags (tagi umieszczone w pytaniu),
- CommentCount (ilość komentarzy),
- Title (tytuł pytania),
- ViewCount (liczba wyświetleń).

### Tabela Users (12 kolumn, 105401 wierszy)
zawiera liczne informacje odnośnie użytkowników portalu StackOverflow. Zmienne, które wykorzystamy w projekcie:

- Id (identyfikator),
- Location (lokalizacja użytkownika),
- UpVotes (liczba pozytywnych reakcji jakie otrzymał użytkownik),
- Views (liczba wyświetleń profilu użytkownika).

### Tabela Posts (5 kolumn, 1954 wierszy)
Posłuży nam ona jedynie do znalezienia najpopularniejszych tagów wykorzystując zmienną TagName i zliczając ilość wystąpień każdego tagu w tabeli.

In [3]:
spark = SparkSession.builder.appName("Travel").config("spark.jars.packages","com.databricks:spark-xml_2.12:0.17.0").getOrCreate()

Tworzymy trzy ramki danych ze zbioru TravelStack.

In [4]:
df_posts = spark.read.format("com.databricks.spark.xml").option("rowTag", "row").option("rootTag", "posts").load("../data/posts.xml")
df_users = spark.read.format("com.databricks.spark.xml").option("rowTag", "row").option("rootTag", "users").load("../data/users.xml")
df_tags = spark.read.format("com.databricks.spark.xml").option("rowTag", "row").option("rootTag", "tags").load("../data/tags.xml")

Poniżej przygotujemy funkcje, które będą pomocne przy tworzeniu nowych kolumn w naszych danych:

Funkcja if_NaN zwraca 1, gdy wartość wejściowa nie jest wartością brakującą (NaN), a 0 w przeciwnym przypadku. Użyjemy jej do określenia, czy pytanie posiada zaakceptowaną odpowiedź (zmienna AcceptedAnswerId).

In [5]:
@udf(returnType='int')
def if_NaN(x):
    if x is not None:
        return 1
    else:
        return 0

Funkcja slen przyjmuje argument typu string, oblicza długość ciągu znaków i zwraca tę wartość jako liczbę całkowitą. Użyjemy jej dla kolumn Body i Title, aby uczynić je bardziej przydatnymi w modelowaniu.

In [6]:
@udf(returnType='int') 
def slen(s):
    return len(s)

Funkcja if_Tag sprawdza, czy dany tag znajduje się w kolumnie Tags. Jeśli tag jest obecny w ciągu znaków, funkcja zwraca 1, w przeciwnym razie zwraca 0. Bierzemy pod uwagę pięć najpopularniejszych tagów.

In [7]:
@udf(returnType='int')
def if_Tag1(x):
    tag = "<" + popular_tags[0][0] + ">"
    if tag in x:
        return 1
    else:
        return 0

@udf(returnType='int')
def if_Tag2(x):
    tag = "<" + popular_tags[1][0] + ">"
    if tag in x:
        return 1
    else:
        return 0

@udf(returnType='int')
def if_Tag3(x):
    tag = "<" + popular_tags[2][0] + ">"
    if tag in x:
        return 1
    else:
        return 0

@udf(returnType='int')
def if_Tag4(x):
    tag = "<" + popular_tags[3][0] + ">"
    if tag in x:
        return 1
    else:
        return 0

@udf(returnType='int')
def if_Tag5(x):
    tag = "<" + popular_tags[4][0] + ">"
    if tag in x:
        return 1
    else:
        return 0

Korzystając z tabeli Tags, znajdujemy najpopularniejsze tagi i przypisujemy je do zmiennej popular_tags.

In [8]:
popular_tags = df_tags.select(df_tags._TagName).sort("_Count", ascending = False).limit(5).collect()

Zaktualizowana ramka danych "Posts":
- zawiera tylko pytania (PostTypeId = 1),
- ma kolumnę logiczną AcceptedAnswerExist (1, jeśli istnieje, 0, jeśli brak danych),
- zawiera długość treści w kolumnach Body i Title,
- posiada kolumnę logiczną IfClosed (1, jeśli pytanie jest zamknięte, 0, jeśli jeszcze nie),
- posiada nowe kolumny (typu bool), które określają, czy pytanie ma adekwatny tag.

In [9]:
df_posts_updated = df_posts.select(if_NaN("_AcceptedAnswerId").alias("AcceptedAnswerExist"), df_posts._AnswerCount,
slen("_Body").alias("BodyLen"), df_posts._CommentCount, df_posts._CreationDate, if_NaN("_ClosedDate").alias("IfClosed"),
df_posts._Id, df_posts._OwnerUserId, df_posts._PostTypeId, df_posts._Score, df_posts._Tags, slen("_Title").alias("TitleLen"),
df_posts._ViewCount) \
.withColumn("TagVisas",if_Tag1("_Tags")) \
.withColumn("TagUsa", if_Tag2("_Tags")) \
.withColumn("TagUk", if_Tag3("_Tags")) \
.withColumn("TagAir-travel", if_Tag4("_Tags")) \
.withColumn("TagCustoms-and-immigration", if_Tag5("_Tags")) \
.filter("_PostTypeId == 1") \
.filter("_OwnerUserId NOT LIKE 'NaN' ")

Zaktualizowana ramka danych "Users":
- zawiera tylko kolumny Id, Location, UpVotes oraz Views.

In [10]:
df_users_new = df_users.select("_Id", "_Location", "_UpVotes", "_Views")

Łączenie wcześniej stworzonych tabel "Posts" oraz "Users".

In [11]:
df_main = df_posts_updated.join(df_users_new, df_posts_updated._OwnerUserId == df_users_new._Id, 'left_outer')

Poniżej przedstawiamy gotową do analizy tabelę.

In [12]:
df_main.toPandas()

Unnamed: 0,AcceptedAnswerExist,_AnswerCount,BodyLen,_CommentCount,_CreationDate,IfClosed,_Id,_OwnerUserId,_PostTypeId,_Score,...,_ViewCount,TagVisas,TagUsa,TagUk,TagAir-travel,TagCustoms-and-immigration,_Id.1,_Location,_UpVotes,_Views
0,1,3,504,3,2011-06-21 22:30:38.687,0,8,26,1,15,...,12989,0,0,0,0,0,26,"Oxford, United Kingdom",19507,7234
1,1,2,1034,0,2011-07-03 13:21:51.543,0,780,26,1,26,...,6276,0,0,1,0,0,26,"Oxford, United Kingdom",19507,7234
2,0,4,1399,4,2011-07-05 23:19:11.723,0,823,26,1,10,...,1742,0,0,0,0,0,26,"Oxford, United Kingdom",19507,7234
3,1,5,908,0,2011-07-09 22:26:20.707,0,886,26,1,17,...,10271,0,0,0,0,0,26,"Oxford, United Kingdom",19507,7234
4,1,3,1961,1,2011-07-10 21:07:46.663,0,910,26,1,6,...,1857,0,0,0,0,0,26,"Oxford, United Kingdom",19507,7234
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
46740,0,0,206,4,2023-10-05 06:18:47.033,0,183941,139575,1,4,...,133,0,0,0,0,0,139575,,0,0
46741,0,0,634,0,2023-11-16 14:36:11.810,1,184610,140604,1,2,...,48,1,0,0,0,0,140604,,0,0
46742,1,1,516,7,2023-05-04 12:45:11.337,0,180981,135572,1,0,...,161,0,0,0,0,0,135572,,0,2
46743,0,0,670,7,2023-06-18 12:15:38.257,0,181833,136645,1,1,...,159,0,0,0,0,1,136645,,0,0


In [13]:
df_classification = df_main.drop("_CreationDate","_Id","_OwnerUserId","_PostTypeId","_Tags","_Location")

## Preoprocessing

W pierwszym kroku na naszych danych przeprowadzimy normalizacje danych numerycznych, oraz zbudujemy wektor, którym będziemy się posługiwać przy budowaniu pipelinów.

Poniżej przedstawiamy funkcję, która buduje i ewaluuje model w oparciu o wybrany typ klasyfikatora. Zbiór danych dzielimy na 5 części i wykonujemy na nim walidację krzyżową. Ostateczne parametry modelu są wybierane na podstawie statystyki skuteczności predykcji.

In [100]:
def buildAndEvaluateModel(train, test, stages, classifierType, *args, **kwargs):
    classifier = classifierType(featuresCol="final_features", labelCol="AcceptedAnswerExist", *args, **kwargs)
    stages.append(classifier)
    pipeline = Pipeline(stages=stages)
    
    binaryEvaluator = BinaryClassificationEvaluator(labelCol="AcceptedAnswerExist", rawPredictionCol="rawPrediction",
                                                    metricName="areaUnderROC")
    multiEvaluator = MulticlassClassificationEvaluator(labelCol="AcceptedAnswerExist", predictionCol="prediction",
                                                       metricName="accuracy")
    paramGrid = ParamGridBuilder().build() 
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=multiEvaluator,  
                              numFolds=5) 

    cvModel = crossval.fit(train)
    predictions = cvModel.transform(test)
    
    auc = binaryEvaluator.evaluate(predictions)
    print("AUC:", auc)
    accuracy = multiEvaluator.evaluate(predictions)
    print("Skuteczność:", accuracy)
    predictionAndLabels = predictions.select("prediction", "AcceptedAnswerExist").rdd.map(
        lambda row: (float(row[0]), float(row[1])))
    metrics = MulticlassMetrics(predictionAndLabels)
    confusionMatrix = metrics.confusionMatrix().toArray()
    print("Macierz pomyłek:\n", confusionMatrix)
    
    TP = confusionMatrix[0, 0]
    FN = confusionMatrix[0, 1]
    FP = confusionMatrix[1, 0]
    TN = confusionMatrix[1, 1]
    
    TPR = TP / (TP + FN) if (TP + FN) != 0 else 0
    TNR = TN / (TN + FP) if (TN + FP) != 0 else 0
    
    informedness = TPR + TNR - 1
    print("Informedness:\n", informedness)

    return {"accuracy": accuracy, "AUC": auc, "confusionMatrix": confusionMatrix, "Informedness": informedness}

In [112]:
def simulations(df, assembling_vectors, model_types, *args, **kwargs):
    results = {}
    for model in model_types:
        train, test = df.randomSplit([0.8, 0.2])
        stages = assembling_vectors()
        name = model.__name__
        print("\n\nWyniki dla "+ name)
        try:
            results[name] = buildAndEvaluateModel(train, test, stages, model,  *args, **kwargs)
        except Exception as e:
            print(f"Błąd: {e}")
    return results

Na początek przeprowadźmy symulacje dla 5 wybranych klasyfikatorów. Bedzie to regresja logistyczna, klasyfikator Factorization Machines, drzewo decyzyjne, oraz oparte na drzewach lasy losowe i Gradient Boost Trees. Przyjmujemy parametry domyślne i wszystkie kolumny naszego zbioru danych.

In [99]:
def normalization():
    columns_to_normalize = ["_AnswerCount", "_CommentCount", "BodyLen", "_Score", "TitleLen", "_ViewCount", "_UpVotes",
                            "_Views"]
    num_assembler = VectorAssembler(inputCols=columns_to_normalize, outputCol="numeric_features")
    scaler = StandardScaler(inputCol="numeric_features", outputCol="scaled_features")
    
    
    final_columns = ["scaled_features", "TagVisas", "TagUsa", "TagUk", "TagAir-travel", "TagCustoms-and-immigration",
                     "IfClosed"]
    final_assembler = VectorAssembler(inputCols=final_columns, outputCol="final_features")
    
    stages = [num_assembler, scaler, final_assembler]
    return stages



Wyniki dla LogisticRegression


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Users\barcha\sparkVenv\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\barcha\sparkVenv\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\barcha\AppData\Local\Programs\Python\Python38\lib\socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Błąd dla LogisticRegression


Wyniki dla FMClassifier
Błąd dla FMClassifier


Wyniki dla DecisionTreeClassifier
Błąd dla DecisionTreeClassifier


Wyniki dla RandomForestClassifier
Błąd dla RandomForestClassifier


Wyniki dla GBTClassifier
Błąd dla GBTClassifier


In [17]:
model_types = [LogisticRegression, FMClassifier, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier]
results_of_simulations = simulations(df_classification, normalization, model_types)



Wyniki dla LogisticRegression
AUC: 0.7425692459126626
Skuteczność: 0.6894776359540921
Macierz pomyłek:
 [[4925.  709.]
 [2186. 1503.]]
Informedness:
 0.28158439163222404


Wyniki dla FMClassifier
AUC: 0.7567089909239982
Skuteczność: 0.6951625013407702
Macierz pomyłek:
 [[4761.  873.]
 [1969. 1720.]]
Informedness:
 0.3112989398583301


Wyniki dla DecisionTreeClassifier
AUC: 0.7521847036248283
Skuteczność: 0.7948085380242411
Macierz pomyłek:
 [[4297. 1337.]
 [ 576. 3113.]]
Informedness:
 0.6065509305168355


Wyniki dla RandomForestClassifier
AUC: 0.8483989906382017
Skuteczność: 0.7937359219135471
Macierz pomyłek:
 [[4277. 1357.]
 [ 566. 3123.]]
Informedness:
 0.6057118164865314


Wyniki dla GBTClassifier
AUC: 0.8624501571558576
Skuteczność: 0.7974900783009761
Macierz pomyłek:
 [[4341. 1293.]
 [ 595. 3094.]]
Informedness:
 0.6092102099007182


In [18]:
import pickle as pkl

In [19]:
with open('results.pickle', 'wb') as plik:
    pkl.dump(results_of_simulations, plik)

In [111]:
def selecting_columns():
    columns_to_normalize = ["_AnswerCount", "_CommentCount", "BodyLen", "_Score", "TitleLen", "_ViewCount", "_UpVotes",
                            "_Views"]
    num_assembler = VectorAssembler(inputCols=columns_to_normalize, outputCol="numeric_features")
    scaler = StandardScaler(inputCol="numeric_features", outputCol="scaled_features")
    
    
    binaryFeatures = ['IfClosed', 'TagVisas', 'TagUsa', 'TagUk', 'TagAir-travel', 'TagCustoms-and-immigration']
    binaryFeaturesDF = df_classification.select(*binaryFeatures, "AcceptedAnswerExist")
    
    binaryAssembler = VectorAssembler(inputCols=binaryFeatures, outputCol="binaryFeatures")
    df_binary = binaryAssembler.transform(binaryFeaturesDF)
    
    selector = ChiSqSelector(numTopFeatures=3, featuresCol="binaryFeatures", labelCol="AcceptedAnswerExist", outputCol="selectedBinaryFeatures")
    selectedBinaryFeatures = [binaryFeatures[i] for i in selector.fit(df_binary).selectedFeatures]
    
    final_columns = ["scaled_features"]
    final_columns.extend(selectedBinaryFeatures)
    final_assembler_selected = VectorAssembler(inputCols=final_columns, outputCol="final_features")
    
    stages = [num_assembler, scaler, binaryAssembler, selector, final_assembler]
    return stages

In [113]:
model_types = [LogisticRegression, FMClassifier, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier]
results_of_simulations_selected_features = simulations(df_classification, selecting_columns, model_types)



Wyniki dla LogisticRegression
AUC: 0.748041939313625
Skuteczność: 0.6928104575163399
Macierz pomyłek:
 [[4973.  716.]
 [2151. 1493.]]
Informedness:
 0.28385768248429044


Wyniki dla FMClassifier
AUC: 0.7592655911307917
Skuteczność: 0.6952835188129306
Macierz pomyłek:
 [[4869.  830.]
 [2045. 1691.]]
Informedness:
 0.3069835404460679


Wyniki dla DecisionTreeClassifier
AUC: 0.7415387111733964
Skuteczność: 0.7915683329757562
Macierz pomyłek:
 [[4327. 1364.]
 [ 579. 3052.]]
Informedness:
 0.6008631137182836


Wyniki dla RandomForestClassifier
AUC: 0.8509638996338359
Skuteczność: 0.7961962394640155
Macierz pomyłek:
 [[4295. 1316.]
 [ 570. 3073.]]
Informedness:
 0.608996249817706


Wyniki dla GBTClassifier
Błąd: An error occurred while calling o71164.evaluate.
: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark 

In [116]:
model_types = [GBTClassifier]
results_of_simulations_selected_GBT = simulations(df_classification, selecting_columns, model_types)



Wyniki dla GBTClassifier
Błąd: An error occurred while calling o73227.fit.
: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.notEnoughMemoryToBuildAndBroadcastTableError(QueryExecutionErrors.scala:2212)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:187)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:223)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6

In [114]:
with open('resultsSelected.pickle', 'wb') as plik:
    pkl.dump(results_of_simulations_selected_features, plik)

In [ ]:
def prepareData(df, inputCols, outputCol="features", pcaK=3):
    
    assembler = VectorAssembler(inputCols=inputCols, outputCol="assembledFeatures")
    scaler = StandardScaler(inputCol="assembledFeatures", outputCol="scaledFeatures", withStd=True, withMean=True)
    pca = PCA(k=pcaK, inputCol="scaledFeatures", outputCol=outputCol)
    
    pipeline = Pipeline(stages=[assembler, scaler, pca])
    transformedData = pipeline.fit(df).transform(df)
    train, test = transformedData.randomSplit([0.8, 0.2])
    
    return train, test

## Wyniki

Najpierw spójrzmy na wyniki dla modeli, które uwzględniają wszystkie zmienne, które zostały po wstępnej selekcji. Widać, że  modele oparte na drzewach są o wiele lepsze w zasadzie w każdym z kryteriów. Skuteczność i krytrium Informedness dają podobne rezultaty, choć w każdym z nich najlepsze są modele GBT. Natomiast las losowy daje wyjątkowow dużą wartość jeśli chodzi o pole pod krzywą ROC. 

Tabela metryk, dla modelu z  uwzględnieniem wszystkich zmiennych: 

| Metryka     | Regresja Logistyczna | Factorial Matrix | Drzewo Decyzyjne   | Las Losowy         | Gradient-Boosted Trees  |
|-------------|----------------------|------------------|--------------------|--------------------|-------------------------|
| Skuteczność | 0.6895               | 0.6952           | 0.7948 | 0.7937| 0.7975                  |
| AUC         | 0.7426 | 0.7567 | 0.7521 | 0.8484 | 0.7974                  |
| Informedness| 0.2816 | 0.3113 | 0.6066 | 0.6057 | 0.6092                  |

Tabela metryk, dla modelu z wybranymi zmiennymi binarnymi: 

| Metryka     | Regresja Logistyczna | Factorial Matrix | Drzewo Decyzyjne   | Las Losowy | Gradient-Boosted Trees  |
|-------------|----------------------|------------------|--------------------|------------|-------------------------|
| Skuteczność | 0.6928               | 0.6953           | 0.7415 | 0.7962     | 0.7975                  |
| AUC         | 0.7426 | 0.7567           | 0.7521 | 0.8484     | 0.7974                  |
| Informedness| 0.2816 | 0.3113           | 0.6066 | 0.6057     | 0.6092                  |