In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

import re, string

from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, HashingTF, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

from pyspark.ml.classification import LogisticRegression

In [2]:
conf = SparkConf().setMaster("local[7]").setAppName("ModelTrain")
sc = SparkContext(conf = conf)
spark = SparkSession(sc)

In [3]:
#load sample data to train/test on

input = sc.textFile("./train.csv")
dataTrain = ( input.map(lambda x: (x.split('","')[0][1],x.split('","')[5])) # get sentiment and text
    .map(lambda x: (x[0],re.sub(r'@[^\s]+','',x[1])))  #remove mentions
    .map(lambda x: (x[0],re.sub(r"\S*http?:\S*",'',x[1])))  #remove urls
    .map(lambda x: (x[0],re.sub(r'[^A-Za-z0-9 ]','',x[1])))  #remove special
    .map(lambda x: (x[0],re.sub(r'[ ]{2,}',' ',x[1])))  #remove double+ spaces
    .map(lambda x: (x[0],x[1].strip().lower())) #strip space, force to lowercase
    .map(lambda x: (x[0],x[1]))
       )

input = sc.textFile("./test.csv")
dataTest = ( input.map(lambda x: (x.split('","')[0][1],x.split('","')[5])) # get sentiment and text
    .map(lambda x: (x[0],re.sub(r'@[^\s]+','',x[1])))  #remove mentions
    .map(lambda x: (x[0],re.sub(r"\S*http?:\S*",'',x[1])))  #remove urls
    .map(lambda x: (x[0],re.sub(r'[^A-Za-z0-9 ]','',x[1])))  #remove special
    .map(lambda x: (x[0],re.sub(r'[ ]{2,}',' ',x[1])))  #remove double+ spaces
    .map(lambda x: (x[0],x[1].strip().lower())) #strip space, force to lowercase
    .map(lambda x: (x[0],x[1]))
       )


In [4]:
df = dataTrain.toDF(['sentiment','text'])
dfTrain = dataTrain.toDF(['sentiment','text'])
dfTest = dataTest.toDF(['sentiment','text'])

In [5]:
tokenizer = Tokenizer().setInputCol('text').setOutputCol('words')
remover = StopWordsRemover(inputCol='words',outputCol='clean')
TF = HashingTF(numFeatures=2**16,inputCol="clean",outputCol='tf')
IDF = IDF(inputCol='tf',outputCol="features")
StringIndex = StringIndexer(inputCol="sentiment",outputCol="label")

In [6]:
Model = LogisticRegression(maxIter=300)
pipeline = Pipeline(stages=[tokenizer,remover,TF,IDF,StringIndex,Model])

In [7]:
pipelineFit = pipeline.fit(dfTrain)

In [8]:
predict = pipelineFit.transform(dfTest)

In [9]:
type(predict)

pyspark.sql.dataframe.DataFrame

In [10]:
predict.select("text","prediction").show(100)

+--------------------+----------+
|                text|prediction|
+--------------------+----------+
|i loooooooovvvvvv...|       1.0|
|reading my kindle...|       1.0|
|ok first assesmen...|       1.0|
|youll love your k...|       1.0|
|fair enough but i...|       1.0|
|no it is too big ...|       1.0|
|fuck this economy...|       0.0|
|jquery is my new ...|       1.0|
|       loves twitter|       1.0|
|how can you not l...|       1.0|
|check this video ...|       1.0|
|i firmly believe ...|       0.0|
|house corresponde...|       0.0|
|watchin espnjus s...|       0.0|
|dear nike stop wi...|       0.0|
|lebron best athle...|       0.0|
|i was talking to ...|       0.0|
|       i love lebron|       1.0|
|lebron is a beast...|       0.0|
|  lebron is the boss|       1.0|
|lebron is a homet...|       1.0|
|lebron and zydrun...|       1.0|
|lebron is a beast...|       0.0|
|downloading apps ...|       0.0|
|good news just ha...|       1.0|
|awesome come back...|       1.0|
|in montreal f

In [11]:
pipelineFit.write().overwrite().save("./Model")

In [12]:
type(pipelineFit)

pyspark.ml.pipeline.PipelineModel