<a href="https://colab.research.google.com/github/Q00/ml_category_classifier/blob/master/use_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")# SPARK_HOME
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv').options(header='false', inferschema='true').load("./drive/My\ Drive/ColabNotebooks/train.csv")

In [0]:
#data.map(lambda line : line.split(',')).map(lambda field: (field[0],field[1])).keys().countByValue()
data.printSchema()


root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [0]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="_c1", outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

In [0]:
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)


_c0 : 카테고리가 label index로 색인시작 ( 다섯가지)


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "_c0", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(10)

+-------------+--------------------+--------------------+--------------------+--------------------+-----+
|          _c0|                 _c1|               words|            filtered|            features|label|
+-------------+--------------------+--------------------+--------------------+--------------------+-----+
|         tech|tv future in the ...|[tv, future, in, ...|[tv, future, in, ...|(9447,[0,1,2,3,4,...|  3.0|
|     business|worldcom boss  le...|[worldcom, boss, ...|[worldcom, boss, ...|(9447,[0,1,2,3,4,...|  1.0|
|        sport|tigers wary of fa...|[tigers, wary, of...|[tigers, wary, of...|(9447,[0,1,2,3,4,...|  0.0|
|        sport|yeading face newc...|[yeading, face, n...|[yeading, face, n...|(9447,[0,1,2,3,4,...|  0.0|
|entertainment|ocean s twelve ra...|[ocean, s, twelve...|[ocean, s, twelve...|(9447,[0,1,2,3,4,...|  4.0|
|     politics|howard hits back ...|[howard, hits, ba...|[howard, hits, ba...|(9447,[0,1,2,3,4,...|  2.0|
|     politics|blair prepares to...|[blair, pr

In [0]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 1776
Test Dataset Count: 449


LogisticRegression 사용

In [0]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
print(lrModel)

LogisticRegressionModel: uid = LogisticRegression_33019ad0b1fe, numClasses = 5, numFeatures = 9447


In [0]:
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 4.0) \
    .select("_c1","_c0","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 100)


+----------------------------------------------------------------------------------------------------+-------------+----------------------------------------------------------------------------------------------------+-----+----------+
|                                                                                                 _c1|          _c0|                                                                                         probability|label|prediction|
+----------------------------------------------------------------------------------------------------+-------------+----------------------------------------------------------------------------------------------------+-----+----------+
|uganda bans vagina monologues uganda s authorities have banned the play the vagina monologues  du...|entertainment|  [0.22142210008450433,0.06926105589862376,0.1771293046813308,0.09238374822022723,0.439803791115314]|  4.0|       4.0|
|soul sensation ready for awards south west teenage singing 

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9733977646698799

TF-IDF


In [0]:
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("_c1","_c1","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+------------------------------+------------------------------+-----+----------+
|                           _c1|                           _c1|                   probability|label|prediction|
+------------------------------+------------------------------+------------------------------+-----+----------+
|all black magic: new zealan...|all black magic: new zealan...|[0.9999128693829448,1.76278...|  0.0|       0.0|
|britain boosted by holmes d...|britain boosted by holmes d...|[0.9996848851904876,3.03167...|  0.0|       0.0|
|celts savour grand slam pro...|celts savour grand slam pro...|[0.9931376466001954,0.00124...|  0.0|       0.0|
|johnson edges out rival sot...|johnson edges out rival sot...|[0.9922290391087946,0.00204...|  0.0|       0.0|
|parry firm over gerrard lis...|parry firm over gerrard lis...|[0.9896240446379182,7.72761...|  0.0|       0.0|
|safin cool on wimbledon new...|safin cool on wimbledon new...|[0.9891740344748176,0.00166...|  0.0|    

In [0]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9688322104184124

cross-validation

In [0]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)

In [89]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.968762659639567

Naive Bayes

In [93]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("_c1","_c0","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-----+------------------------------+-----+----------+
|                           _c1|  _c0|                   probability|label|prediction|
+------------------------------+-----+------------------------------+-----+----------+
|bates seals takeover ken ba...|sport|[1.0,1.4405670836794528E-17...|  0.0|       0.0|
|pountney handed ban and fin...|sport|[1.0,1.5114820337199625E-28...|  0.0|       0.0|
|hereford 1-1 doncaster here...|sport|[1.0,2.1723135146840322E-33...|  0.0|       0.0|
|collins banned in landmark ...|sport|[1.0,2.0294308220167036E-33...|  0.0|       0.0|
|benitez  to launch moriente...|sport|[1.0,1.7173545683502412E-35...|  0.0|       0.0|
|tulu to appear at caledonia...|sport|[1.0,2.357169485349786E-36,...|  0.0|       0.0|
|beattie return calms attack...|sport|[1.0,3.827751047511699E-39,...|  0.0|       0.0|
|sprinter walker quits athle...|sport|[1.0,6.04513995079047E-42,5...|  0.0|       0.0|
|sociedad set to rescue mlad...|sport|[1.0,

In [94]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9711195012749292

Random Forest

In [96]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("_c1","_c0","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-----+------------------------------+-----+----------+
|                           _c1|  _c0|                   probability|label|prediction|
+------------------------------+-----+------------------------------+-----+----------+
|mido makes third apology ah...|sport|[0.44695067375157593,0.1507...|  0.0|       0.0|
|man utd stroll to cup win w...|sport|[0.4384247371503748,0.16193...|  0.0|       0.0|
|giggs handed wales leading ...|sport|[0.438297074131562,0.164717...|  0.0|       0.0|
|federer claims dubai crown ...|sport|[0.4381960984622244,0.17055...|  0.0|       0.0|
|sella wants michalak recall...|sport|[0.436927967182787,0.160053...|  0.0|       0.0|
|unclear future for striker ...|sport|[0.4325539543138061,0.17860...|  0.0|       0.0|
|dominici backs lacklustre f...|sport|[0.4317524002345162,0.17805...|  0.0|       0.0|
|scotland v italy (sat) murr...|sport|[0.4277126386948783,0.17995...|  0.0|       0.0|
|france v wales (sat) stade ...|sport|[0.42

In [97]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.8448518863348249