In [0]:
try:
  import databricks.koalas as ks
except:
  import pyspark.pandas as ks

import re
from pyspark.sql import DataFrame as sdf

# Reading the Data

In [0]:
# read from the koalas/spark
air_line = ks.read_table("airline_csv")
air_line.head()

Unnamed: 0,label,review
0,ham,"Go until jurong point, crazy.. Available only ..."
1,ham,Ok lar... Joking wif u oni...
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...
3,ham,U dun say so early hor... U c already then say...
4,ham,"Nah I don't think he goes to usf, he lives aro..."


In [0]:
# read directly into the spark dataframe
spark_df = spark.read.table("airline_csv")
spark_df.show()

+-----+--------------------+
|label|              review|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if that�s th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



# Start Preprocessing with koalas

In [0]:
df_clean = (
air_line
[['label','review']]
# convert to lower case
.assign(clean_document= lambda x: x.review.str.lower())
# remove punctuation : first way
.assign(clean_document= lambda x: x.clean_document.str.replace(r"[^\w\s]","",regex=True))
# remove numbers: second way, with compile
.assign(clean_document= lambda x: x.clean_document.str.replace(re.compile("\d"),"",regex=True))
# remove leading & trainling spaces 
.assign(clean_document = lambda x: x.clean_document.str.strip())
# remove all the '\n' values with space
.assign(clean_document= lambda x: x.clean_document.str.replace(re.compile(r"\n"),"",regex=True ))
# tokenize 
.assign(tokenized = lambda x: x.clean_document.str.split(" "))
# lastly, convert spam and ham into integers
.pipe(lambda x: ks.sql("select * , case when label = 'spam' then 1 else 0 end as label_coded from {x}"))
)



In [0]:
df_clean.head()

Unnamed: 0,label,review,clean_document,tokenized,label_coded
0,ham,"Go until jurong point, crazy.. Available only ...",go until jurong point crazy available only in ...,"[go, until, jurong, point, crazy, available, o...",0
1,ham,Ok lar... Joking wif u oni...,ok lar joking wif u oni,"[ok, lar, joking, wif, u, oni]",0
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...,free entry in a wkly comp to win fa cup final...,"[free, entry, in, , a, wkly, comp, to, win, fa...",1
3,ham,U dun say so early hor... U c already then say...,u dun say so early hor u c already then say,"[u, dun, say, so, early, hor, u, c, already, t...",0
4,ham,"Nah I don't think he goes to usf, he lives aro...",nah i dont think he goes to usf he lives aroun...,"[nah, i, dont, think, he, goes, to, usf, he, l...",0


# Apply Stop Words

In [0]:
# to remove stopword , we need to come back to spark dataframe
spark_df = df_clean.to_spark()

In [0]:
spark_df.show()

+-----+--------------------+--------------------+--------------------+-----------+
|label|              review|      clean_document|           tokenized|label_coded|
+-----+--------------------+--------------------+--------------------+-----------+
|  ham|Go until jurong p...|go until jurong p...|[go, until, juron...|          0|
|  ham|Ok lar... Joking ...|ok lar joking wif...|[ok, lar, joking,...|          0|
| spam|Free entry in 2 a...|free entry in  a ...|[free, entry, in,...|          1|
|  ham|U dun say so earl...|u dun say so earl...|[u, dun, say, so,...|          0|
|  ham|Nah I don't think...|nah i dont think ...|[nah, i, dont, th...|          0|
| spam|FreeMsg Hey there...|freemsg hey there...|[freemsg, hey, th...|          1|
|  ham|Even my brother i...|even my brother i...|[even, my, brothe...|          0|
|  ham|As per your reque...|as per your reque...|[as, per, your, r...|          0|
| spam|WINNER!! As a val...|winner as a value...|[winner, as, a, v...|          1|
| sp

In [0]:
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import NGram

In [0]:
# Instantiate stop words 
stop = StopWordsRemover(inputCols=['tokenized'],outputCols=['SW_removed'])
# Instantiate ngram 
ngr = NGram(n=2,inputCol='SW_removed',outputCol='ngrammed',)

# Apply tf-idf

In [0]:
# lets get the verctor assambler
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

In [0]:
# instantiate the tf-idf
tf = CountVectorizer(inputCol='ngrammed',outputCol='tf_column')
idf = IDF(inputCol='tf_column',outputCol='tf_idf')

# Get the Vector Assembler

In [0]:
# vector assembler is the last building block for the feature transformation
from pyspark.ml.feature import VectorAssembler
Vec_assembler = VectorAssembler(inputCols=['tf_idf',],outputCol='features')

# Build the pipeline

In [0]:
from pyspark.ml import Pipeline

In [0]:
# apply the pipe, learn from the data (.fit) and then transform the data
pipe = Pipeline(stages=[stop,ngr,tf,idf,Vec_assembler])
fitted_pipe =  pipe.fit(spark_df)
transformed_data = fitted_pipe.transform(spark_df)

In [0]:
transformed_data.show()

+-----+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|              review|      clean_document|           tokenized|label_coded|          SW_removed|            ngrammed|           tf_column|              tf_idf|            features|
+-----+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  ham|Go until jurong p...|go until jurong p...|[go, until, juron...|          0|[go, jurong, poin...|[go jurong, juron...|(31876,[7332,1279...|(31876,[7332,1279...|(31876,[7332,1279...|
|  ham|Ok lar... Joking ...|ok lar joking wif...|[ok, lar, joking,...|          0|[ok, lar, joking,...|[ok lar, lar joki...|(31876,[1435,4419...|(31876,[1435,4419...|(31876,[1435,4419...|
| spam|Free entry in 2 a...|free entry in  a ...|[free, entr

# Apply the modeling

In [0]:
from pyspark.ml.classification import LogisticRegression

In [0]:
# start the model
lr = LogisticRegression(featuresCol='features',labelCol='label_coded')

In [0]:
# train the model
fitted_model = lr.fit(transformed_data)

In [0]:
# predict the model
prediction = fitted_model.transform(transformed_data)

In [0]:
prediction.show()

+-----+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|              review|      clean_document|           tokenized|label_coded|          SW_removed|            ngrammed|           tf_column|              tf_idf|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  ham|Go until jurong p...|go until jurong p...|[go, until, juron...|          0|[go, jurong, poin...|[go jurong, juron...|(31876,[7332,1279...|(31876,[7332,1279...|(31876,[7332,1279...|[24.5231203318958...|[0.99999999997762...|       0.0|
|  ham|Ok lar... Joking ...|ok lar j

# Evaluate your model

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
# instantiate evaluator
mc = MulticlassClassificationEvaluator(labelCol='label_coded',metricName='weightedRecall',)

In [0]:
# run the evaluator
mc.evaluate(prediction)

Out[32]: 0.9996411912450665

# you can put everything togather in the pipeline

In [0]:
pipe = Pipeline(stages=[stop,ngr,tf,idf,Vec_assembler,lr])
fitted_pipe =  pipe.fit(spark_df)
transformed_data = fitted_pipe.transform(spark_df)

In [0]:
# you can now evaluate on transformed data !!!!!
