# Predictive Analysis

In [1]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import when, col, explode, max, avg, count, udf, expr, size, datediff
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, ArrayType, StringType

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
spark = SparkSession.builder.appName("Forum Question Analyzer") \
    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.17.0")\
    .getOrCreate()

posts = spark.read \
    .format("com.databricks.spark.xml") \
    .option("rootTag", "posts") \
    .option("rowTag", "row") \
    .load("tex.stackexchange.com/Posts.xml") \
    .alias('posts')
users = spark.read \
    .format("com.databricks.spark.xml") \
    .option("rootTag", "users") \
    .option("rowTag", "row") \
    .load("tex.stackexchange.com/Users.xml") \
    .alias('users')
tags = spark.read \
    .format("com.databricks.spark.xml") \
    .option("rootTag", "tags") \
    .option("rowTag", "row") \
    .load("tex.stackexchange.com/Tags.xml") \
    .alias('tags')

### Feature Extraction

In [6]:
posts = posts.filter(posts._PostTypeId==1)
questions = posts.withColumn("_Tags", expr("split(substring(_Tags, 2, length(_Tags) - 2), '><')"))\
            .withColumn("_Body", size(expr("split(_Body, ' ')")))\
            .withColumn("_Title", size(expr("split(_Title, ' ')")))

questions = questions.join(users, questions._OwnerUserId == users._Id).select(
    questions._Id.alias("QuestionId"),
    questions._Body.alias("BodyLength"),
    questions._Title.alias("TitleLength"),
    questions._Tags.alias("TagsCounts"),
    size(questions._Tags).alias("NumberOfTags"),
    users._Id.alias("OwnerId"),
    users._DownVotes.alias("OwnerDownVotes"),
    users._UpVotes.alias("OwnerUpVotes"),
    users._Reputation.alias("OwnerReputation"),
    users._Views.alias("OwnerViews"),
    (questions._CreationDate - users._CreationDate).cast("integer").alias("UserExperience"),
    when(col("_AcceptedAnswerId").isNull(), 0).otherwise(1).alias("Accepted")
)

In [13]:
#Fast way to change tag names to tag counts from Tags table

tag_counts = tags.select("_TagName", "_Count").rdd.collectAsMap()

def replace_tags_with_counts(tags):
    return [tag_counts.get(tag, 0) for tag in tags]
print(tag_counts)
print(replace_tags_with_counts(["enumerate", "geometry"]))
replace_tags_with_counts_udf = udf(replace_tags_with_counts, IntegerType())

questions = questions.withColumn("TagsCounts", max(replace_tags_with_counts_udf(questions.TagsCounts)))

[2364, 1093]


AnalysisException: [MISSING_GROUP_BY] The query does not include a GROUP BY clause. Add GROUP BY or turn it into the window functions using OVER clauses.;
Aggregate [QuestionId#458L, BodyLength#459, TitleLength#460, max(replace_tags_with_counts(TagsCounts#640)#657) AS TagsCounts#659, NumberOfTags#462, OwnerId#463L, OwnerDownVotes#464L, OwnerUpVotes#465L, OwnerReputation#466L, OwnerViews#467L, UserExperience#468, Accepted#469]
+- Project [QuestionId#458L, BodyLength#459, TitleLength#460, replace_tags_with_counts(TagsCounts#550)#639 AS TagsCounts#640, NumberOfTags#462, OwnerId#463L, OwnerDownVotes#464L, OwnerUpVotes#465L, OwnerReputation#466L, OwnerViews#467L, UserExperience#468, Accepted#469]
   +- Project [QuestionId#458L, BodyLength#459, TitleLength#460, replace_tags_with_counts(TagsCounts#461)#549 AS TagsCounts#550, NumberOfTags#462, OwnerId#463L, OwnerDownVotes#464L, OwnerUpVotes#465L, OwnerReputation#466L, OwnerViews#467L, UserExperience#468, Accepted#469]
      +- Project [_Id#9L AS QuestionId#458L, _Body#344 AS BodyLength#459, _Title#367 AS TitleLength#460, _Tags#321 AS TagsCounts#461, size(_Tags#321, true) AS NumberOfTags#462, _Id#49L AS OwnerId#463L, _DownVotes#48L AS OwnerDownVotes#464L, _UpVotes#53L AS OwnerUpVotes#465L, _Reputation#52L AS OwnerReputation#466L, _Views#54L AS OwnerViews#467L, cast((_CreationDate#7 - _CreationDate#46) as int) AS UserExperience#468, CASE WHEN isnull(_AcceptedAnswerId#0L) THEN 0 ELSE 1 END AS Accepted#469]
         +- Join Inner, (_OwnerUserId#15L = _Id#49L)
            :- Project [_AcceptedAnswerId#0L, _AnswerCount#1L, _Body#344, _ClosedDate#3, _CommentCount#4L, _CommunityOwnedDate#5, _ContentLicense#6, _CreationDate#7, _FavoriteCount#8L, _Id#9L, _LastActivityDate#10, _LastEditDate#11, _LastEditorDisplayName#12, _LastEditorUserId#13L, _OwnerDisplayName#14, _OwnerUserId#15L, _ParentId#16L, _PostTypeId#17L, _Score#18L, _Tags#321, size(split(_Title#20,  , -1), true) AS _Title#367, _ViewCount#21L]
            :  +- Project [_AcceptedAnswerId#0L, _AnswerCount#1L, size(split(_Body#2,  , -1), true) AS _Body#344, _ClosedDate#3, _CommentCount#4L, _CommunityOwnedDate#5, _ContentLicense#6, _CreationDate#7, _FavoriteCount#8L, _Id#9L, _LastActivityDate#10, _LastEditDate#11, _LastEditorDisplayName#12, _LastEditorUserId#13L, _OwnerDisplayName#14, _OwnerUserId#15L, _ParentId#16L, _PostTypeId#17L, _Score#18L, _Tags#321, _Title#20, _ViewCount#21L]
            :     +- Project [_AcceptedAnswerId#0L, _AnswerCount#1L, _Body#2, _ClosedDate#3, _CommentCount#4L, _CommunityOwnedDate#5, _ContentLicense#6, _CreationDate#7, _FavoriteCount#8L, _Id#9L, _LastActivityDate#10, _LastEditDate#11, _LastEditorDisplayName#12, _LastEditorUserId#13L, _OwnerDisplayName#14, _OwnerUserId#15L, _ParentId#16L, _PostTypeId#17L, _Score#18L, split(substring(_Tags#19, 2, (length(_Tags#19) - 2)), ><, -1) AS _Tags#321, _Title#20, _ViewCount#21L]
            :        +- Filter (_PostTypeId#17L = cast(1 as bigint))
            :           +- Filter (_PostTypeId#17L = cast(1 as bigint))
            :              +- SubqueryAlias posts
            :                 +- Relation [_AcceptedAnswerId#0L,_AnswerCount#1L,_Body#2,_ClosedDate#3,_CommentCount#4L,_CommunityOwnedDate#5,_ContentLicense#6,_CreationDate#7,_FavoriteCount#8L,_Id#9L,_LastActivityDate#10,_LastEditDate#11,_LastEditorDisplayName#12,_LastEditorUserId#13L,_OwnerDisplayName#14,_OwnerUserId#15L,_ParentId#16L,_PostTypeId#17L,_Score#18L,_Tags#19,_Title#20,_ViewCount#21L] XmlRelation(com.databricks.spark.xml.DefaultSource$$Lambda$1152/0x0000021e1082ccf0@7783adf,Some(tex.stackexchange.com/Posts.xml),Map(roottag -> posts, rowtag -> row, path -> tex.stackexchange.com/Posts.xml),null)
            +- SubqueryAlias users
               +- Relation [_AboutMe#44,_AccountId#45L,_CreationDate#46,_DisplayName#47,_DownVotes#48L,_Id#49L,_LastAccessDate#50,_Location#51,_Reputation#52L,_UpVotes#53L,_Views#54L,_WebsiteUrl#55] XmlRelation(com.databricks.spark.xml.DefaultSource$$Lambda$1152/0x0000021e1082ccf0@153d8f9c,Some(tex.stackexchange.com/Users.xml),Map(roottag -> users, rowtag -> row, path -> tex.stackexchange.com/Users.xml),null)


In [9]:
questions.show(5, truncate=False)
# posts_tags = posts.select(col("_Id").alias("_Id"),
#         explode(col('_Tags')).alias("tag"))\
#         .filter(posts._PostTypeId == 1)

# posts_tags_score = \
#         posts_tags.join(tags, posts_tags.tag == tags._TagName)\
#         .select(posts_tags._Id, tags._Count).groupby(posts_tags._Id)\
#                 .agg(max(tags._Count).alias("max_tag_count"),
#                      avg(tags._Count).alias("avg_tag_count"),
#                      count(tags._Count).alias("number_of_tags"))


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Users\jurek\AppData\Local\Temp\ipykernel_1084\1988812052.py", line 6, in replace_tags_with_counts
  File "C:\Spark\spark-3.5.0-bin-hadoop3\python\lib\pyspark.zip\pyspark\sql\utils.py", line 174, in wrapped
    return f(*args, **kwargs)
  File "C:\Spark\spark-3.5.0-bin-hadoop3\python\lib\pyspark.zip\pyspark\sql\functions.py", line 687, in max
    return _invoke_function_over_columns("max", col)
  File "C:\Spark\spark-3.5.0-bin-hadoop3\python\lib\pyspark.zip\pyspark\sql\functions.py", line 105, in _invoke_function_over_columns
    return _invoke_function(name, *(_to_java_column(col) for col in cols))
  File "C:\Spark\spark-3.5.0-bin-hadoop3\python\lib\pyspark.zip\pyspark\sql\functions.py", line 105, in <genexpr>
    return _invoke_function(name, *(_to_java_column(col) for col in cols))
  File "C:\Spark\spark-3.5.0-bin-hadoop3\python\lib\pyspark.zip\pyspark\sql\column.py", line 65, in _to_java_column
    raise PySparkTypeError(
pyspark.errors.exceptions.base.PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got list.


In [6]:
features = ['title_length', 'question_length', 'author_reputation',
            'max_tag_count', 'avg_tag_count', 'number_of_tags', 'proper_title']
assembler = VectorAssembler(inputCols=features, outputCol="features")

### Data Preparation

### Model Training

In [7]:
train, test = questions.randomSplit([0.7, 0.3], seed=12345)

In [8]:
# Logistic Regression model
lr = LogisticRegression(labelCol="accepted", featuresCol="features")
lr_pipeline = Pipeline(stages=[assembler, lr])
lr_model = lr_pipeline.fit(train)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "c:\Users\jurek\anaconda3\lib\socket.py", line 707, in readinto
    raise
socket.timeout: timed out


In [None]:
# Random Forest model
rf = RandomForestClassifier(labelCol="accepted", featuresCol="features", numTrees=10)
rf_pipeline = Pipeline(stages=[assembler, rf])
rf_model = rf_pipeline.fit(train)

In [None]:
# Gradient Boosting model
gbt = GBTClassifier(labelCol="accepted", featuresCol="features", maxIter=10)
gbt_pipeline = Pipeline(stages=[assembler, gbt])
gbt_model = gbt_pipeline.fit(train)

In [None]:
# Neural Network model
layers = [len(features), 10, 5, 2]  # Adjust layer sizes as needed
nn = MultilayerPerceptronClassifier(labelCol="accepted", featuresCol="features", layers=layers, blockSize=128, seed=1234)
nn_pipeline = Pipeline(stages=[assembler, nn])
nn_model = nn_pipeline.fit(train)

### Predictions

In [None]:
lr_predictions = lr_model.transform(test)
rf_predictions = rf_model.transform(test)
gbt_predictions = gbt_model.transform(test)
nn_predictions = nn_model.transform(test)

### Model Evaluation

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="accepted", metricName="accuracy")
print('===== Accuracy =====')
print('Logistic Regression:', evaluator.evaluate(lr_predictions))
print('Random Forest:      ', evaluator.evaluate(rf_predictions))
print('Gradient Boosting:  ', evaluator.evaluate(gbt_predictions))
print('Neural Network:     ', evaluator.evaluate(nn_predictions))

===== Accuracy =====
Logistic Regression: 0.6023578712851592
