In [0]:
%pip install nltk
%pip install mlflow

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover, IDF, HashingTF
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.types import StringType,ArrayType
from sklearn.feature_extraction.text import TfidfVectorizer,CountVectorizer
import hashlib
import sys
from pyspark.sql.types import IntegerType

In [0]:
train = spark.read.format("csv").load('/FileStore/tables/train.csv',HEADER=True)
test = spark.read.format("csv").load('/FileStore/tables/test.csv',HEADER=True)

In [0]:
train_label = spark.read.format("csv").load('/FileStore/tables/train_label.csv',HEADER=True)
test_label = spark.read.format("csv").load('/FileStore/tables/test_label.csv',HEADER=True)

In [0]:
train = train.join(train_label, on="Id", how="inner")
test = test.join(test_label, on="Id", how="inner")

train = train.withColumnRenamed("Category", "label")
test = test.withColumnRenamed("Category", "label")

from pyspark.sql.types import IntegerType
train = train.withColumn("label", train["label"].cast(IntegerType()))
test = test.withColumn("label", test["label"].cast(IntegerType()))

In [0]:
#dimension of test set
print((test.count(), len(test.columns)))

In [0]:
#dimension of train set
print((train.count(), len(test.columns)))

In [0]:
test.show()

In [0]:
train.show()

In [0]:
# Clean text
train_clean = train.select("Id", (lower(regexp_replace('description', "[^a-zA-Z\\s]", "")).alias('description')),"gender","label")

# Tokenize text
tokenizer2 = Tokenizer(inputCol='description', outputCol='words_token')
df_words_token2 = tokenizer2.transform(train_clean)

# Remove stop words
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
stemmed_words_train = remover.transform(df_words_token2)

# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens])
df_stemmed2 = stemmed_words_train.withColumn("words_stemmed", stemmer_udf("words_clean"))

# Filter length word > 3
filter_length_udf = udf(lambda row: [x for x in row if len(x) >= 3], ArrayType(StringType()))
document_cleaned_train = df_stemmed2.withColumn('words', filter_length_udf(col('words_stemmed')))

hashingTF = HashingTF(inputCol="words",outputCol='Term_frequency')
term_frequency_train = hashingTF.transform(document_cleaned_train)

In [0]:
# Clean text
test_clean = test.select("Id", (lower(regexp_replace('description', "[^a-zA-Z\\s]", "")).alias('description')),"gender", "label")

# Tokenize text
tokenizer2 = Tokenizer(inputCol='description', outputCol='words_token')
df_words_token2 = tokenizer2.transform(test_clean)

# Remove stop words
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
stemmed_words_train = remover.transform(df_words_token2)

# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens])
df_stemmed2 = stemmed_words_train.withColumn("words_stemmed", stemmer_udf("words_clean"))

# Filter length word > 3
filter_length_udf = udf(lambda row: [x for x in row if len(x) >= 3], ArrayType(StringType()))
document_cleaned_test = df_stemmed2.withColumn('words', filter_length_udf(col('words_stemmed')))

hashingTF = HashingTF(inputCol="words",outputCol='Term_frequency')
term_frequency_test = hashingTF.transform(document_cleaned_test)

In [0]:
#term frequency vector for train set
term_frequency_train.show()

In [0]:
term_frequency_train = term_frequency_train.withColumnRenamed('Term_frequency', 'features')
term_frequency_test = term_frequency_test.withColumnRenamed('Term_frequency', 'features')

In [0]:
term_frequency_train.dtypes

In [0]:
term_frequency_train.show()

In [0]:
#First Model : Logistic Regression Classifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
#lr = LogisticRegression(maxIter = 10)

#paramGrid_lr = ParamGridBuilder() \
#    .addGrid(lr.regParam, np.linspace(0.3, 0.01, 10)) \
#    .addGrid(lr.elasticNetParam, np.linspace(0.3, 0.8, 6)) \
#    .build()
#crossval_lr = CrossValidator(estimator=lr,
#                          estimatorParamMaps=paramGrid_lr,
#                          evaluator=BinaryClassificationEvaluator(),
#                          numFolds= 5)  
#cvModel_lr = crossval_lr.fit(term_frequency_train)
#best_model_lr = cvModel_lr.bestModel.summary
#best_model_lr.predictions.columns

In [0]:
#fit logistic regression
lr = LogisticRegression(maxIter = 10)
model = lr.fit(term_frequency_train)

In [0]:
#prediction
predictions = model.transform(term_frequency_test)

In [0]:
#show prediction
predictions.select("prediction").show()

In [0]:
#get F1 score
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
my_mc_lr = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='f1')
my_mc_lr.evaluate(predictions)