In [58]:
###IMPORTING THE LIBRARIES
#Session
from pyspark.sql import SparkSession

##Functions for creating the structure
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

##Functions for Data engineering
from pyspark.sql.functions import col,sum,avg,round,monotonically_increasing_id,lit

##Functions for the model
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler
from pyspark.ml import Pipeline

###Functions for NLP
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

##Importing the Machine learning models
from pyspark.ml.classification import LogisticRegression

##Importing the evaluators
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator

##Grid and cross validator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder,CrossValidator

#CREATING THE SESSION
sesion=SparkSession.builder.appName("sesion").getOrCreate()

In [59]:
schema = StructType([
        StructField("id", IntegerType()),
        StructField("text", StringType()),
        StructField("label", IntegerType())
    
])

In [61]:
df=sesion.read.csv("sms.csv",header=False,schema=schema,sep=";")
df.show(5)

+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|Sorry, I'll call ...|    0|
|  2|Dont worry. I gue...|    0|
|  3|Call FREEPHONE 08...|    1|
|  4|Win a 1000 cash p...|    1|
|  5|Go until jurong p...|    0|
+---+--------------------+-----+
only showing top 5 rows



In [62]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [76]:
##taking onlythe 10% of the raws for the training
df=df.sample(0.10)

In [71]:
# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol='terms', outputCol="hash")
idf = IDF(inputCol="hash", outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression()

##Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hasher,idf, logistic])

In [72]:
# Create parameter grid
params = ParamGridBuilder()

# Add grid for hashing trick parameters
params = params.addGrid(hasher.numFeatures,[1024,4096,16384]) \
               .addGrid(hasher.binary, [True,False])

# Add grid for logistic regression parameters
params = params.addGrid(logistic.regParam,[0.01,0.1,1,10]) \
               .addGrid(logistic.elasticNetParam,[0.0,0.5,1])

# Build parameter grid
params = params.build()


## CV and AUC Metric

In [110]:
#Build a evaluator for AUC

evaluator_auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")

# Creating a cross-validator.

cv = CrossValidator(estimator=pipeline,
          estimatorParamMaps=params,
          evaluator=evaluator_auc)

In [111]:
##Creating a training and testing dataset
sms_train,sms_test = df.randomSplit([0.8, 0.2], seed=420)

In [112]:
df.count()

42

In [113]:
##Training the model
cv=cv.fit(sms_train)
cv

CrossValidatorModel_c1c549c59a80

In [114]:
# Obtaining the best model
best_model = cv.bestModel

# Getting the classifier
rf_model = best_model.stages[-1] 

# Making predictions
predictions = best_model.transform(sms_test)

# Show the comparison
predictions.select("label","prediction").show(7)

# Calculating te AUC
auc_score =evaluator_auc.evaluate(predictions)

print(f"AUC score on the test data: {auc_score*100:.2f} %")


+-----+----------+
|label|prediction|
+-----+----------+
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
+-----+----------+
only showing top 7 rows

AUC score on the test data: 75.00 %


## CV and F1 Metric

In [115]:
#Build a evaluator for F1
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Creating a cross-validator.

cv = CrossValidator(estimator=pipeline,
          estimatorParamMaps=params,
          evaluator=evaluator_f1)

##Training the model
cv=cv.fit(sms_train)

In [116]:
# Obtaining the best model
best_model = cv.bestModel

# Getting the classifier
rf_model = best_model.stages[-1]  

# Making predictions
predictions = best_model.transform(sms_test)

# Show the comparison
predictions.select("label","prediction").show(5)

# Calculating the F1-score
f1_score =evaluator_auc.evaluate(predictions)

print(f"F1 score on the test data: {f1_score*100:.2f} %")


+-----+----------+
|label|prediction|
+-----+----------+
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 5 rows

F1 score on the test data: 75.00 %
