In [None]:
import csv
import os
import sys
# Spark imports
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import desc
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, udf, array_contains
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.types import ArrayType, StringType, FloatType, IntegerType
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import countDistinct
from pyspark.sql.types import DoubleType

import matplotlib.pyplot as plt

from csv import reader
from collections import defaultdict

In [None]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

Investigate the original dataset (obviously, it cannot be used). Take a look at https://stackoverflow.com/questions/13793529/r-error-invalid-type-list-for-variable to see how useless the Body column information could be!

The point here is that the body information consists mostly of codes and some weird patterns that are not useful for our purpose. The most important information here is the connection between the title of the questions and tags. So, I removed the Body column from the dataset.

In [None]:
try:
    spark = init_spark()

    filename1 = "./Train.csv"
    df2 = spark.read.option("multiLine", 'true').option("escape","\'").csv(filename1, header=True)
    print(df2.count())
    print(df2.show(10))    
except:
    pass

For removing the Body column, I read all the dataset once using Pandas library. After that, I removed the column and got an export to have a concrete file as our dataset. This part has been ommited from the notebook.

In [None]:
spark = init_spark()

filename = "./TrainWithoutBody.csv"
df1 = spark.read.option("multiLine", 'true').option("escape","\'").csv(filename, header=True)
df1 = df1.drop("_c0")
df1 = df1.dropna()

rddTags = df1.select("Tags").rdd

# df1.count()

In [None]:
df1.show(5)

# Finding the 100 most used tags (one DT per each most used tag)

In [None]:
splittedTags = rddTags.filter(lambda r: r[0] != None).flatMap(lambda r: r[0].split(" ")).map(lambda r: r.replace(".", "")).map(lambda r: (r, 1)).reduceByKey(lambda x, y: x + y)

splittedTags = splittedTags.sortBy(lambda r: r[1], False) #Sorted with number of usage (you can collect and see)

### Tag v/s Count Distribution for all tags

In [None]:
tagCountDF = splittedTags.toDF(["tag", "count"])
tagCountDF.show()
cCount = tagCountDF.select("count").collect()
plt.plot([i for i in range(splittedTags.count())], cCount)
plt.title("Tag v/s Count distribution")
plt.ylabel("Tag Count")
plt.xlabel("Tag ID")
plt.grid()

### Tag v/s Count Distribution for the top 500 tags

In [None]:
plt.plot([i for i in range(500)], cCount[:500])
plt.title("Tag v/s Count distribution for first 500 tags")
plt.ylabel("Tag Count")
plt.xlabel("Tag ID")
plt.grid()

### Tag v/s Count Distribution for the top 100 tags

In [None]:
plt.plot([i for i in range(100)], cCount[:100])
plt.title("Tag v/s Count distribution for first 100 tags")
plt.ylabel("Tag Count")
plt.xlabel("Tag ID")
plt.grid()

### Tag v/s Count Distribution for the 10 best tags

In [None]:
cTag = tagCountDF.select("tag").take(10)

plt.figure(figsize=(10, 6), dpi=80)
plt.bar([i for i in range(10)], [r.__getitem__("count") for r in cCount[:10]], tick_label=[r.__getitem__("tag") for r in cTag])
plt.title("Tag v/s Count distribution for first 10 tags")
plt.ylabel("Tag Count")
plt.xlabel("Tag ID")
plt.grid()

In [None]:
splittedTagsSorted = splittedTags.map(lambda r: r[0]) #Delete this line if you want to see number of times they have been used.

mostUsedTags = splittedTagsSorted.collect()[0:50]

In [None]:
mostUsedTags

# Sampling

In [None]:
df1 = df1.sample(0.001, 42)

# Clean up tags to include only most-used tags

In [None]:
def cleanUpTags(tags):
    tags = tags.split(" ")
    tags = [tag for tag in tags if tag in mostUsedTags]
    return tags

cleanUpTagsUDF = udf(cleanUpTags, ArrayType(StringType()))

df1 = df1.withColumn("cleantags",  cleanUpTagsUDF(col("tags")))

# Remove questions that do not include the top 50 tags

In [None]:
def filterEmptyRows(tags):
    return len(tags)

filterEmptyRowsUDF = udf(filterEmptyRows, IntegerType())

df1 = df1.filter(filterEmptyRowsUDF(col("cleantags")) > 0)

df1.show()

# Subject titles to TF-IDF

In [None]:
tokenizer = Tokenizer(inputCol="Title", outputCol="transformed_tfidf")
wordsData = tokenizer.transform(df1)

hashingTF = HashingTF(inputCol="transformed_tfidf", outputCol="rawFeatures")
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
df1 = idfModel.transform(featurizedData)

## Add 0/1 column for each tag

In [None]:
for tag in mostUsedTags:
    df1 = df1.withColumn(tag, (array_contains(col("cleantags"), tag)).cast('integer'))

df.select("cleantags", "Javascript").show()

# Split data to train and test sets

In [None]:
train_data, test_data = df1.randomSplit([.7,.3],seed=1234)

# Train DecisionTreeClassifiers

<h3> Handling class imbalance using undersampling method

In [None]:
def underSample(training_data, etag):
    major_df = training_data.filter(col(etag) == 0)
    minor_df = training_data.filter(col(etag) == 1)
    ratio = int(major_df.count()/minor_df.count())
    
    sampled_majority_df = major_df.sample(False, 1/ratio)
    train_data = sampled_majority_df.unionAll(minor_df)
    
    return train_data

<h3> Handling class imbalance with assigning class weights (higher weight to minority class) 

In [None]:
def classWeight(training_data, etag):
    balancingRatio = training_data.filter(col(etag) == 1).count() / training_data.count()
    calculateWeights = udf(lambda x: 1 * balancingRatio if x == 0 else (1 * (1.0 - balancingRatio)), DoubleType())
    
    training_data = training_data.withColumn("classWeightCol", calculateWeights(col(etag)))
    
    return training_data

In [None]:
def trainWithUndersampling(training_data, etag):
    
    # training_data = underSample(training_data, etag) #Commented for now
    training_data = classWeight(training_data, etag) #Can be commented for speedup

    model = DecisionTreeClassifier(featuresCol="features", labelCol=etag, maxDepth=4, impurity="gini", weightCol="classWeightCol").fit(training_data)

    # Using model without assigning classweight
    # model = DecisionTreeClassifier(featuresCol="features", labelCol=etag, maxDepth=4, impurity="gini").fit(training_data)
    pred = model.transform(test_data)
    
    tp = pred.filter((col(etag) == 1) & (col("prediction") == 1)).count()
    fp = pred.filter((col(etag) == 0) & (col("prediction") == 1)).count()
    fn = pred.filter((col(etag) == 1) & (col("prediction") == 0)).count()
    
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1 = 2 * (precision / (precision + recall))
    
    return model, pred, precision * 100, recall * 100, f1

## Train models for each tag and obtain precision, recall, and f-measure

In [None]:
dtcs = defaultdict()
preds = defaultdict()
precisions = defaultdict()
recalls = defaultdict()
fmeasures = defaultdict()

for tag in mostUsedTags:
    if tag not in dtcs:
        dtcs[tag], preds[tag], precisions[tag], recalls[tag], fmeasures[tag] = trainWithUndersampling(train_data, tag)

In [None]:
# print(precisions)
# print(recalls)
# print(fmeasures)

# Subject titles to Word2Vec

In [None]:
# use df as your dataframe and add your column to it 