In [None]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.functions import col
from pyspark.sql import Row

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier


from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

#from pyspark.ml import PipelineModel



In [None]:
sc =SparkContext()
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv').options(header='false', inferschema='true').load('/Users/cagri/Desktop/cagridata.csv')

In [None]:
sc

In [None]:
data.show(10)

In [None]:

data.printSchema()

In [None]:




data.groupBy("_c0") \
    .count() \
    .orderBy(col("count").desc()) \
    .distinct() \
    .show()




(trainingData, testData) = data.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

regexTokenizer = RegexTokenizer(inputCol="_c5", outputCol="words", pattern="\\W")
#

add_stopwords = ["http","https","amp","rt","t","c","the","@"]
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
#

countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
#

label_stringIdx = StringIndexer(inputCol = "_c0", outputCol = "label")
#


rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100, seed=100)

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx, rf])






In [None]:
pipelineFit = pipeline.fit(trainingData)
predictions = pipelineFit.transform(testData)



In [None]:
predictions.show()

In [None]:
predictions.select("_c5","_c0","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 30, truncate = 40)



In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
print("Accuracy: %g" % (evaluator.evaluate(predictions)))

In [None]:
pipelineFit.save("logreg.model")