In [1]:
import findspark
findspark.init()
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext

sc = SparkContext()
session = SparkSession.builder.appName("GeminiData").getOrCreate()

In [2]:
from pyspark.sql.types import *

def to_spark_df(fin):
    data = pd.read_csv(fin)
    data.fillna("", inplace=True)
    # Remove the html tags
    data = data.replace("<[^>]*>", "", regex=True)
    return(data)

# Load the train-test sets
train = to_spark_df("seed.csv")
test = to_spark_df("input_data.csv")
# Remove the unnamed columns
test = test.drop(test.columns[[17, 18, 19]],axis = 1)

In [3]:
# Define the  the schema for train and test datasets
mySchema = StructType([ StructField("_Id", StringType(), True)\
                       ,StructField("_PostTypeId", StringType(), True)\
                       ,StructField("_CreationDate", StringType(), True)\
                       ,StructField("_Score", StringType(), True)\
                       ,StructField("_ViewCount", StringType(), True)\
                       ,StructField("_Body", StringType(), True)\
                       ,StructField("_OwnerUserId", StringType(), True)\
                       ,StructField("_LastActivityDate", StringType(), True)\
                       ,StructField("_Title", StringType(), True)\
                       ,StructField("_Tags", StringType(), True)\
                       ,StructField("_AnswerCount", StringType(), True)\
                       ,StructField("_CommentCount", StringType(), True)\
                       ,StructField("_FavoriteCount", StringType(), True)\
                       ,StructField("_LastEditorUserId", StringType(), True)\
                       ,StructField("_AcceptedAnswerId", StringType(), True)\
                       ,StructField("_LastEditDate", StringType(), True)\
                       ,StructField("_ParentId", StringType(), True)\
                       ,StructField("_Category", StringType(), True)])
testSchema = StructType([ StructField("_Id", StringType(), True)\
                       ,StructField("_PostTypeId", StringType(), True)\
                       ,StructField("_CreationDate", StringType(), True)\
                       ,StructField("_Score", StringType(), True)\
                       ,StructField("_ViewCount", StringType(), True)\
                       ,StructField("_Body", StringType(), True)\
                       ,StructField("_OwnerUserId", StringType(), True)\
                       ,StructField("_LastActivityDate", StringType(), True)\
                       ,StructField("_Title", StringType(), True)\
                       ,StructField("_Tags", StringType(), True)\
                       ,StructField("_AnswerCount", StringType(), True)\
                       ,StructField("_CommentCount", StringType(), True)\
                       ,StructField("_FavoriteCount", StringType(), True)\
                       ,StructField("_LastEditorUserId", StringType(), True)\
                       ,StructField("_AcceptedAnswerId", StringType(), True)\
                       ,StructField("_LastEditDate", StringType(), True)\
                       ,StructField("_ParentId", StringType(), True)])
train = session.createDataFrame(train,schema=mySchema)
test = session.createDataFrame(test,schema=testSchema)

In [4]:
test.columns

['_Id',
 '_PostTypeId',
 '_CreationDate',
 '_Score',
 '_ViewCount',
 '_Body',
 '_OwnerUserId',
 '_LastActivityDate',
 '_Title',
 '_Tags',
 '_AnswerCount',
 '_CommentCount',
 '_FavoriteCount',
 '_LastEditorUserId',
 '_AcceptedAnswerId',
 '_LastEditDate',
 '_ParentId']

In [5]:
#train.show(2)

In [6]:
# drop the unnecessary columns 
drop_list = ['_Id', '_PostTypeId', '_CreationDate', '_Score', '_ViewCount', '_OwnerUserId', '_AnswerCount',
             '_CommentCount', '_FavoriteCount', '_LastEditorUserId', '_AcceptedAnswerId', '_LastEditDate',
             '_Tags', '_Title', '_ParentId', 'CreationDate', '_LastActivityDate', '_LastEditDate']

train = train.select([column for column in train.columns if column not in drop_list])
test = test.select([column for column in test.columns if column not in drop_list])
train.show(5)

+--------------------+---------+
|               _Body|_Category|
+--------------------+---------+
|Are questions rel...|   bricks|
|What is a good ta...|   bricks|
|I've asked one, s...|   bricks|
|Lego Mindstorms a...|   bricks|
|I suspect that Mi...|   bricks|
+--------------------+---------+
only showing top 5 rows



In [7]:
train.printSchema()

root
 |-- _Body: string (nullable = true)
 |-- _Category: string (nullable = true)



In [8]:
from pyspark.sql.functions import col

# group by categories
train.groupBy("_Category") \
    .count() \
    .orderBy(col("count").desc() ) \
    .show()

+--------------+-----+
|     _Category|count|
+--------------+-----+
|   arabic.meta|   11|
|           avp|   10|
|     agur.meta|   10|
|        arabic|   10|
|        bricks|   10|
|          agur|   10|
|    3dprinting|   10|
|bioinformatics|   10|
|            ai|    9|
|          beer|    9|
+--------------+-----+



# Logistic Regression using Count Vector Features

In [9]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from nltk.corpus import stopwords 
from nltk.stem import PorterStemmer 
from sparknlp.base import *
from sparknlp.annotator import *

# Raw data annotation  using document assesmber
# documentAssembler = DocumentAssembler().setInputCol("_Body").setOutputCol("document")

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="_Body", outputCol="token", pattern="\\W")

# stop words remover
stop_words = list(stopwords.words('english'))
[x.encode('utf-8') for x in stop_words]

stopwordsRemover = StopWordsRemover(inputCol="token", outputCol="filtered").setStopWords(stop_words)

# stemming the words
#stemmer = Stemmer().setInputCols(["stopFiltered"]).setOutputCol("filtered")

# to show tokens in human language
#finisher = Finisher().setInputCols(["filtered"])

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)


In [10]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "_Category", outputCol = "label")

In [11]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(train)
dataset = pipelineFit.transform(train)

In [12]:
dataset.show(5)

+--------------------+---------+--------------------+--------------------+--------------------+-----+
|               _Body|_Category|               token|            filtered|            features|label|
+--------------------+---------+--------------------+--------------------+--------------------+-----+
|Are questions rel...|   bricks|[are, questions, ...|[questions, relat...|(149,[0,1,9,14,31...|  6.0|
|What is a good ta...|   bricks|[what, is, a, goo...|[good, tag, purch...|(149,[18,20,24,40...|  6.0|
|I've asked one, s...|   bricks|[i, ve, asked, on...|[asked, one, let,...|(149,[7,10,54,55,...|  6.0|
|Lego Mindstorms a...|   bricks|[lego, mindstorms...|[lego, mindstorms...|(149,[9,10,48,78,...|  6.0|
|I suspect that Mi...|   bricks|[i, suspect, that...|[suspect, mindsto...|(149,[5,11,37,99]...|  6.0|
+--------------------+---------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [13]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 67
Test Dataset Count: 32


In [14]:
# Build the model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [15]:
predictions = lrModel.transform(testData)

predictions.select("_Body","_Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------------+------------------------------+-----+----------+
|                         _Body|     _Category|                   probability|label|prediction|
+------------------------------+--------------+------------------------------+-----+----------+
|I have a proposal - tool qu...|bioinformatics|[0.42239703411687485,0.1003...|  7.0|       0.0|
|As discussed in the proposa...|   arabic.meta|[0.2461195377708617,0.02767...|  0.0|       7.0|
|While allowing non-classica...|   arabic.meta|[0.12516369619240336,0.0810...|  0.0|       9.0|
|The recommendation I made f...|   arabic.meta|[0.09746141150971549,0.2207...|  0.0|       5.0|
|With foreign words, it seem...|        arabic|[0.09327622090526022,0.0734...|  5.0|       5.0|
|I think your suggestion of ...|     agur.meta|[0.0922003785988423,0.07016...|  3.0|       3.0|
|Since there's so many varie...|          beer|[0.0893719873598987,0.03829...|  8.0|       9.0|
|I was about to ask a beginn...|        

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.39062500000000006

In [17]:
# Apply the transformations on test dataset(input_data.csv)
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])

# Fit the pipeline to testing documents.
pipelineTrainFit = pipeline.fit(test)
testDataset = pipelineFit.transform(test)
testDataset.columns

['_Body', 'token', 'filtered', 'features']

In [19]:
# Predicting the categories for test dataset
predictions = lrModel.transform(testDataset)

predictions.filter(predictions['prediction'] != 0) \
    .select("_Body","probability","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+------------------------------+----------+
|                         _Body|                   probability|prediction|
+------------------------------+------------------------------+----------+
|When I first saw this site,...|[0.33065033137249006,0.0065...|       9.0|
|I have been thinking about ...|[0.26373920261779965,1.9255...|       9.0|
|As discussed in the proposa...|[0.2461195377708617,0.02767...|       7.0|
|There is no rule for this. ...|[0.21733248142421754,0.0592...|       5.0|
|I recently posted this ques...|[0.18769205170915432,0.0639...|       6.0|
|There are a few words that ...|[0.18247981079906478,0.1374...|       5.0|
|This question is gathering ...|[0.1682867788998153,0.06583...|       8.0|
|No, this will not insult an...|[0.16002323537979013,0.0734...|       5.0|
|I think the obsolescence cy...|[0.1455522366884405,0.06839...|       9.0|
|At the beginning of speech,...|[0.143730544523447,0.419216...|       1.0|
+------------------------

In [20]:
# evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
# evaluator.evaluate(predictions)

# Logistic Regression using TF-IDF Features

In [21]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml import Pipeline
# Add HashingTF and IDF to transformation
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

# Redo Pipeline
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

In [22]:
pipelineFit = pipeline.fit(train)
dataset = pipelineFit.transform(train)
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

# Build the model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

# Train model with Training Data
lrModel = lr.fit(trainingData)
dataset.show(5)

+--------------------+---------+--------------------+--------------------+--------------------+--------------------+-----+
|               _Body|_Category|               token|            filtered|         rawFeatures|            features|label|
+--------------------+---------+--------------------+--------------------+--------------------+--------------------+-----+
|Are questions rel...|   bricks|[are, questions, ...|[questions, relat...|(262144,[9559,152...|(262144,[9559,152...|  6.0|
|What is a good ta...|   bricks|[what, is, a, goo...|[good, tag, purch...|(262144,[15664,35...|(262144,[15664,35...|  6.0|
|I've asked one, s...|   bricks|[i, ve, asked, on...|[asked, one, let,...|(262144,[31463,53...|(262144,[31463,53...|  6.0|
|Lego Mindstorms a...|   bricks|[lego, mindstorms...|[lego, mindstorms...|(262144,[38068,57...|(262144,[38068,57...|  6.0|
|I suspect that Mi...|   bricks|[i, suspect, that...|[suspect, mindsto...|(262144,[47032,49...|(262144,[47032,49...|  6.0|
+---------------

In [23]:
predictions = lrModel.transform(testData)

predictions.select("_Body","_Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+---------+------------------------------+-----+----------+
|                         _Body|_Category|                   probability|label|prediction|
+------------------------------+---------+------------------------------+-----+----------+
|I have heard that Arabs did...|   arabic|[0.0909090909090909,0.10389...|  5.0|       2.0|
|I think this is a good idea...|      avp|[0.0909090909090909,0.10389...|  4.0|       2.0|
|I was about to ask a beginn...|      avp|[0.0909090909090909,0.10389...|  4.0|       2.0|
|I've been waiting for this ...|      avp|[0.0909090909090909,0.10389...|  4.0|       2.0|
|My advise would be that if ...|      avp|[0.0909090909090909,0.10389...|  4.0|       2.0|
|Since there's so many varie...|     beer|[0.0909090909090909,0.10389...|  8.0|       2.0|
|The Arabic alphabet lacks t...|   arabic|[0.0909090909090909,0.10389...|  5.0|       2.0|
|With foreign words, it seem...|   arabic|[0.0909090909090909,0.10389...|  5.0|       2.0|

In [24]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.007352941176470588

# Cross validation using Count Vector Features

In [25]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(train)
dataset = pipelineFit.transform(train)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

# Build the model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

# Run cross validations
# cvModel uses the best model found from the Cross Validation
cvModel = cv.fit(trainingData)

# Test the model to measure the accuracy on new data
predictions = cvModel.transform(testData)

# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.3853805916305917

# Naive Bayes

In [26]:
from pyspark.ml.classification import NaiveBayes

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1)

# train the model
model = nb.fit(trainingData)


predictions = model.transform(testData)
predictions.select("_Body","_Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-----------+------------------------------+-----+----------+
|                         _Body|  _Category|                   probability|label|prediction|
+------------------------------+-----------+------------------------------+-----+----------+
|As discussed in the proposa...|arabic.meta|[0.9999999999712739,5.34425...|  0.0|       0.0|
|While allowing non-classica...|arabic.meta|[0.9992658459820413,1.67714...|  0.0|       0.0|
|ÙØ§ Ø§ÙÙØ±Ù Ø¨ÙÙ Ø§Ù...|arabic.meta|[0.9986421823280933,2.61985...|  0.0|       0.0|
|The Arabic alphabet lacks t...|     arabic|[0.6167785581497183,5.44033...|  5.0|       0.0|
|So far most Augur SE answer...|  agur.meta|[0.24641193000535663,2.9074...|  3.0|       2.0|
|It's simply too broad. What...|arabic.meta|[0.20860792513353935,0.0055...|  0.0|       9.0|
|With foreign words, it seem...|     arabic|[0.19990976255238269,0.0079...|  5.0|       5.0|
|My advise would be that if ...|        avp|[0.10912919293256443,3.441

In [27]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.4359375

# Random Forest

In [28]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# Train model with Training Data
rfModel = rf.fit(trainingData)

predictions = model.transform(testData)
predictions.select("_Body","_Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-----------+------------------------------+-----+----------+
|                         _Body|  _Category|                   probability|label|prediction|
+------------------------------+-----------+------------------------------+-----+----------+
|As discussed in the proposa...|arabic.meta|[0.9999999999712739,5.34425...|  0.0|       0.0|
|While allowing non-classica...|arabic.meta|[0.9992658459820413,1.67714...|  0.0|       0.0|
|ÙØ§ Ø§ÙÙØ±Ù Ø¨ÙÙ Ø§Ù...|arabic.meta|[0.9986421823280933,2.61985...|  0.0|       0.0|
|The Arabic alphabet lacks t...|     arabic|[0.6167785581497183,5.44033...|  5.0|       0.0|
|So far most Augur SE answer...|  agur.meta|[0.24641193000535663,2.9074...|  3.0|       2.0|
|It's simply too broad. What...|arabic.meta|[0.20860792513353935,0.0055...|  0.0|       9.0|
|With foreign words, it seem...|     arabic|[0.19990976255238269,0.0079...|  5.0|       5.0|
|My advise would be that if ...|        avp|[0.10912919293256443,3.441

In [29]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.4359375