In [57]:
import time
from pyspark.sql import SQLContext
from pyspark import SparkContext
#sc.stop()
sc = SparkContext()
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv').options(header ='true',inferschema ='true').load('D:/jcu/5851/A4/unit_2020.csv')
#data = sqlContext.read.format('com.databricks.spark.csv').options(header ='true',inferschema ='true').load('D:/jcu/5851/A4/unit_2020_v2.csv')
print(data.count())

1200


In [58]:
data = data.select(['unit description','Category'])
data.printSchema()

root
 |-- unit description: string (nullable = true)
 |-- Category: string (nullable = true)



In [59]:
# Show the top 20 group label and count
from pyspark.sql.functions import col
data.groupBy('Category').count().orderBy(col('count').desc()).show()

+--------+-----+
|Category|count|
+--------+-----+
|    EDST|   68|
|    MMBA|   65|
|    LAWS|   48|
|    ACCG|   44|
|    COMP|   43|
|    TRAN|   41|
|    AFCP|   41|
|    STAT|   39|
|    MGMT|   36|
|    PICT|   36|
|    SPED|   32|
|    AFIN|   32|
|    PICX|   32|
|    MEDI|   29|
|    MKTG|   28|
|    PSYN|   27|
|    AFCL|   27|
|    PHTY|   25|
|    MMCC|   25|
|    GMBA|   24|
+--------+-----+
only showing top 20 rows



In [53]:
# NLP method 1 : word count vectors
# ML method 1 : Logistic Regression
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
 
# inputCol: description
# outputCol: stop words removed
regexTokenizer2 = RegexTokenizer(inputCol='unit description', outputCol='words', pattern='\\W')
# stop words
#add_stopwords = ['unit','study','course','studies','field','students', 'faculty','staff','be','work','form','this']
add_stopwords = ['unit','study','course','studies','field','students', 'faculty','staff','be','work','form','this','that','science','learn','university','is','are']
stopwords_remover2 = StopWordsRemover(inputCol='words', outputCol='filtered').setStopWords(add_stopwords)
# words vector
count_vectors2 = CountVectorizer(inputCol='filtered', outputCol='features', vocabSize=10000, minDF=5)

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol='Category', outputCol='label')
pipeline = Pipeline(stages=[regexTokenizer2, stopwords_remover2, count_vectors2, label_stringIdx])
# fit the pipeline to documents
pipeline_fit = pipeline.fit(data)
dataset = pipeline_fit.transform(data)
#dataset.filter(dataset['Category'] == 'ACST').select(['filtered','features','Category','label']).show(10)

In [54]:
# set seed for reproducibility
# training / test setting，7:3
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print('Training Dataset Count:{}'.format(trainingData.count()))
print('Test Dataset Count:{}'.format(testData.count()))

Training Dataset Count:829
Test Dataset Count:371


In [55]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
start_time = time.time()
lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0)

lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
#predictions.filter(predictions['Category'] == 'EDST').select('Category','prediction').orderBy('probability', accending=False).show(n=10, truncate=30)

# predictionCol: prediction column
evaluator2 = MulticlassClassificationEvaluator(predictionCol='prediction')
# accuracy
print("Accuracy: " + str(evaluator2.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.6020698463852643
Running time: 5.412540435791016


In [30]:
# NLP method 1 : word count vectors
# ML method 2 : Naive Bayes
from pyspark.ml.classification import NaiveBayes
start_time = time.time()
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
#predictions.filter(predictions['prediction'] == 16) \
#     .select( 'Category', 'probability', 'label', 'prediction') \
#     .orderBy('probability', ascending=False) \
#     .show(n=10, truncate=30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.3633177066932591
Running time: 1.7193336486816406


In [10]:
# NLP method 1 : word count vectors
# ML method 3 : Random Forest
from pyspark.ml.classification import RandomForestClassifier
start_time = time.time()
rf = RandomForestClassifier(labelCol='label', \
                             featuresCol='features', \
                             numTrees=100, \
                             maxDepth=10, \
                             maxBins=64)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
#predictions.filter(predictions['prediction'] == 16) \
#     .select('Category','probability','label','prediction') \
#     .orderBy('probability', ascending=False) \
#     .show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.36603501384644005
Running time: 9.490335464477539


In [11]:
# NLP method 2 : TF-IDF
# ML method 1 : Logistic Regression
from pyspark.ml.feature import HashingTF, IDF
start_time = time.time()
hashingTF = HashingTF(inputCol='filtered', outputCol='rawFeatures', numFeatures=10000)
idf = IDF(inputCol='rawFeatures', outputCol='features', minDocFreq=5)
pipeline = Pipeline(stages=[regexTokenizer2, stopwords_remover2, hashingTF, idf, label_stringIdx])
pipeline_fit = pipeline.fit(data)
dataset = pipeline_fit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
 
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lr_model = lr.fit(trainingData)
predictions = lr_model.transform(testData)
#predictions.filter(predictions['Category'] == 'EDST').select('Category','prediction').\
#orderBy('probability', ascending=False).show(n=10, truncate=30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.4568939323939996
Running time: 8.052201986312866


In [12]:
# NLP method 2 : TF-IDF
# ML method 2 : Naive Bayes
#from pyspark.ml.classification import NaiveBayes
start_time = time.time()
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
#predictions.filter(predictions['prediction'] == 16) \
#     .select( 'Category', 'probability', 'label', 'prediction') \
#     .orderBy('probability', ascending=False) \
#     .show(n=10, truncate=30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.44377322242332906
Running time: 3.4365761280059814


In [13]:
# NLP method 2 : TF-IDF
# ML method 3 : Random Forest
#from pyspark.ml.classification import RandomForestClassifier
start_time = time.time()
rf = RandomForestClassifier(labelCol='label', \
                             featuresCol='features', \
                             numTrees=100, \
                             maxDepth=10, \
                             maxBins=64)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
#predictions.filter(predictions['prediction'] == 16) \
#     .select('Category','probability','label','prediction') \
#     .orderBy('probability', ascending=False) \
#     .show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.3558465402119936
Running time: 11.208366632461548


In [18]:
# NLP method 3 : tagging + TF-IDF
# ML method 1 : Logistic Regression
import nltk
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.feature import HashingTF, IDF
#from nltk.stem.porter import *

tags = set(['NN','NNS','NNP','NNPS','JJ','VB','VBG','VBN'])

def pos_tag(text):
    text2 = nltk.word_tokenize(text.lower())
    pos_tags = nltk.pos_tag(text2)
    ret = []
    for word,pos in pos_tags:
        if (pos in tags and word not in add_stopwords):
            ret.append(word)
    ret= sorted(set(ret))
    return ret
udfValueToList = udf(pos_tag, ArrayType(StringType()))
data = data.withColumn('filtered2', udfValueToList('unit description'))


hashingTF = HashingTF(inputCol='filtered2', outputCol='rawFeatures', numFeatures=10000)
idf = IDF(inputCol='rawFeatures', outputCol='features', minDocFreq=5)
pipeline = Pipeline(stages=[hashingTF,idf, label_stringIdx])
pipeline_fit = pipeline.fit(data)
dataset = pipeline_fit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)

In [19]:
# NLP method 3 : tagging + TF-IDF
# ML method 1 : Logistic Regression
start_time = time.time()
lr = LogisticRegression(maxIter=30, regParam=0.3, elasticNetParam=0)
lr_model = lr.fit(trainingData)
predictions = lr_model.transform(testData)
#predictions.filter(predictions['Category'] == 'EDST').select('Category','prediction').\
#orderBy('probability', ascending=False).show(n=10, truncate=30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.4462921180450076
Running time: 217.89178943634033


In [16]:
# NLP method 3 : tagging + TF-IDF
# ML method 2 : Naive Bayes
start_time = time.time()
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
#predictions.filter(predictions['Category'] == 'EDST').select('Category','prediction').\
#orderBy('probability', ascending=False).show(n=10, truncate=30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.42009022259059275
Running time: 195.4695110321045


In [17]:
# NLP method 3 : tagging + TF-IDF
# ML method 3 : Random Forest
start_time = time.time()
rf = RandomForestClassifier(labelCol='label', \
                             featuresCol='features', \
                             numTrees=100, \
                             maxDepth=10, \
                             maxBins=64)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
#predictions.filter(predictions['Category'] == 'EDST').select('Category','prediction').\
#orderBy('probability', ascending=False).show(n=10, truncate=30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.3451141350948708
Running time: 343.14731669425964


In [32]:
# NLP method 4 : word2vec
# ML method 1 : Logistic Regression
from pyspark.ml.feature import Word2Vec
#from pyspark.ml.classification import LogisticRegression
 
word2vec = Word2Vec(inputCol="filtered", outputCol="features")
#label_stringIdx = StringIndexer(inputCol='Category', outputCol='label')
pipeline = Pipeline(stages=[regexTokenizer2,stopwords_remover2,word2vec, label_stringIdx])
pipeline_fit = pipeline.fit(data)
dataset = pipeline_fit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)

In [33]:
# NLP method 4 : word2vec
# ML method 1 : Logistic Regression
start_time = time.time()
lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0)
lr_model = lr.fit(trainingData)
predictions = lr_model.transform(testData)
#predictions.filter(predictions['Category'] == 'EDST').select('Category','prediction').\
#orderBy('probability', ascending=False).show(n=10, truncate=30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.11781491525923361
Running time: 5.182934045791626


In [48]:
''' Error due to minus value
# NLP method 4 : word2vec
# ML method 2 : Naive Bayes
start_time = time.time()
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
#predictions.filter(predictions['Category'] == 'EDST').select('Category','prediction').\
#orderBy('probability', ascending=False).show(n=10, truncate=30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))
'''

' Error due to minus value\n# NLP method 4 : word2vec\n# ML method 2 : Naive Bayes\nstart_time = time.time()\nnb = NaiveBayes(smoothing=1)\nmodel = nb.fit(trainingData)\npredictions = model.transform(testData)\n#predictions.filter(predictions[\'Category\'] == \'EDST\').select(\'Category\',\'prediction\').#orderBy(\'probability\', ascending=False).show(n=10, truncate=30)\nevaluator = MulticlassClassificationEvaluator(predictionCol=\'prediction\')\nprint("Accuracy: " + str(evaluator.evaluate(predictions)))\nend_time = time.time()\nprint("Running time: " + str(end_time - start_time))\n'

In [34]:
# NLP method 4 : word2vec
# ML method 3 : Random Forest
start_time = time.time()
rf = RandomForestClassifier(labelCol='label', \
                             featuresCol='features', \
                             numTrees=100, \
                             maxDepth=10, \
                             maxBins=64)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
#predictions.filter(predictions['Category'] == 'EDST').select('Category','prediction').\
#orderBy('probability', ascending=False).show(n=10, truncate=30)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.23010140579723676
Running time: 60.165497064590454


In [49]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

pipeline = Pipeline(stages=[regexTokenizer2, stopwords_remover2, count_vectors2, label_stringIdx])

pipeline_fit = pipeline.fit(data)
dataset = pipeline_fit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0)

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
             .addGrid(lr.maxIter, [10, 50, 100])
#              .addGrid(idf.numFeatures, [10, 100, 1000])
             .build())


evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
cv = CrossValidator(estimator=lr,\
                   estimatorParamMaps=paramGrid,\
                   evaluator=evaluator2,\
                   numFolds=10)
cv_model = cv.fit(trainingData)
predictions = cv_model.transform(testData)
 
# model assessment

print("Accuracy: " + str(evaluator.evaluate(predictions)))
end_time = time.time()
print("Running time: " + str(end_time - start_time))

Accuracy: 0.4794474837038655
Running time: 2362.891200065613
