In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct
from pyspark.ml.feature import StopWordsRemover, Tokenizer
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.feature import Normalizer
from pyspark.ml import Pipeline

spark = SparkSession.builder \
.appName('code_6_of_10_data_mine_giuseppe_schintu') \
.master('local[*]') \
.config('spark.sql.execution.arrow.pyspark.enabled', True) \
.config('spark.sql.session.timeZone', 'UTC') \
.config('spark.driver.memory','8G') \
.config('spark.ui.showConsoleProgress', True) \
.config('spark.sql.repl.eagerEval.enabled', True) \
.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/24 17:01:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### => user-defined functions (lemmatize, classify_sentence, author_fullname)

In [3]:
# Tokenize sentence (lemmatize words, remove stop words and punctuations, strip off html and digits)
# Returns a struct of tokens, tokens count, punctuation count

import string
import unicodedata
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, ArrayType, IntegerType, StringType

wnl = WordNetLemmatizer()
stopwords_set = set(stopwords.words('english'))
list_punct = set(string.punctuation)
url_pattern = re.compile(r'https?.+|[^(a-zA-Z)(0-9)\s]')
number_pattern = re.compile(r'\d+')

def lemmatize(text):
    """
    param: sentence
    return: tokens, tokens count, punctuation count
    """
    punctuation_table = str.maketrans('', '', string.punctuation)
    
    punct_count = text.translate(punctuation_table).count('')
    
    text = (unicodedata.normalize('NFKD', text)
            .encode('ascii', 'ignore')
            .decode('utf-8', 'ignore')
            .lower())
    
    # remove urls
    text = url_pattern.sub(' ', text)
    # remove numbers
    text = number_pattern.sub(' ', text)
    
    words = text.split()
    # remove stopwords and strings of length <= 2
    words = [wnl.lemmatize(word) for word in words if word not in stopwords_set and len(word) > 2]
    word_count = len(words)
    
    return words, word_count, punct_count


# Register lemmatizer as an UDF
lemma_schema = StructType([
    StructField("words", ArrayType(StringType()), False),
    StructField("word_count", IntegerType(), False),
    StructField("punct_count", IntegerType(), False)
])

udf_lemmatize = F.udf(lemmatize, lemma_schema)

#Create function and register in SQL for further use
def classify_sentence(words):
    if words > 30:
        return "Wordy"
    elif words < 7:
        return "Pity"
    elif words <= 30 and words >=7:
        return "Not Wordy"

spark.udf.register("classify_sentence", classify_sentence,StringType())

def author_fullname(author):
    if author == "EAP":
        return "Edgar Allan Poe"
    elif author == "HPL":
        return "H.P. Lovecraft"
    elif author == "MWS":
        return "Mary Shelley"

spark.udf.register("author_fullname", author_fullname,StringType())


<function __main__.author_fullname(author)>

# => `ASSIGNMENT 5`

In [4]:
#may use in other places
rnd_seed = 42

from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover, Tokenizer, CountVectorizer, IDF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import concat_ws
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, 
    count,
    regexp_replace,
    row_number,
    udf,
)
from pyspark.ml.feature import Word2Vec

# Set the logging level to ERROR
spark.sparkContext.setLogLevel("ERROR")

# Read train and test data files into Spark DataFrames
train_data_path = "train.csv"
#test_data_path = ""

train_data = spark.read.csv(train_data_path, header=True, inferSchema=True, escape='"', sep=",")
#test_data = spark.read.csv(test_data_path, header=True, inferSchema=True)

# Set the seed for reproducibility
rnd_seed = 42

# Use the UDF to extract the tokens and count features
lemma_train_df = (train_data
 .withColumn('lemmatize', udf_lemmatize('text'))
)

lemma_train_df.cache()

lemma_train_df = lemma_train_df.select(F.col("id"),
                    F.col("text"),
                    F.col("author"), 
                    F.col("lemmatize.words").alias("words"),
                    F.col("lemmatize.word_count").alias("word_count"),
                    F.col("lemmatize.punct_count").alias("punct_count")
                   ).cache()

lemma_train_df = lemma_train_df.withColumn("words_str", concat_ws(" ", lemma_train_df.words))


#test
'''
#Word2Vec does not work in this scenario as it returns negative values, and it is incompatible with NaiveBayes. But, we tried...
# Define text preprocessing pipeline
tokenizer = Tokenizer(inputCol="words_str", outputCol="tokens")
lemma_train_df = tokenizer.transform(lemma_train_df)

stopwords_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_tokens")
lemma_train_df = stopwords_remover.transform(lemma_train_df)

word2vec = Word2Vec(vectorSize=500, minCount=0, inputCol="filtered_tokens", outputCol="features")
model = word2vec.fit(lemma_train_df)
lemma_train_df = model.transform(lemma_train_df)

lemma_train_df.show()
'''

#test end



# Define text preprocessing pipeline
tokenizer = Tokenizer(inputCol="words_str", outputCol="tokens")
stopwords_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_tokens")

# TF-IDF vectorization
cv = CountVectorizer(inputCol="filtered_tokens", outputCol="raw_features")
idf = IDF(inputCol=cv.getOutputCol(), outputCol="features_idf")
normalizer = Normalizer(inputCol=idf.getOutputCol(), outputCol="features")
label_indexer = StringIndexer(inputCol = "author", outputCol = "label").setHandleInvalid("keep")

preprocessing_pipeline = Pipeline(stages=[tokenizer, stopwords_remover, cv, idf, normalizer, label_indexer])

# Apply text preprocessing pipeline on train and test data
preprocessed_train_data = preprocessing_pipeline.fit(lemma_train_df).transform(lemma_train_df)


# Convert author_label from float to int
#processed_data = preprocessed_train_data.withColumn("label", col("label").cast("integer"))

#Balance data technique ..
'''
# Add an ID column to the data
windowSpec = Window.partitionBy("author").orderBy("id")
processed_data_with_id = preprocessed_train_data.withColumn("id", row_number().over(windowSpec))

# Group the data by author and get the count of each author
author_counts = processed_data_with_id.groupBy("author").count()
total_test_proportion = 0.20
test_sizes = author_counts.select("author", (col("count") * total_test_proportion).alias("holdout"))
data_with_test_sizes = processed_data_with_id.join(test_sizes, "author")

# Split the data into train and test based on the calculated test sizes
preprocessed_train_data = data_with_test_sizes.filter(col("id") > col("holdout")).drop("holdout", "id")
preprocessed_test_data = data_with_test_sizes.filter(col("id") <= col("holdout")).drop("holdout", "id")
preprocessed_train_data = preprocessed_train_data.withColumn("label", col("label").cast("double"))
preprocessed_test_data = preprocessed_test_data.withColumn("label", col("label").cast("double"))
print("Author counts in the train set:")
preprocessed_train_data.groupBy("author").count().show()
print("Author counts in the test set:")
preprocessed_test_data.groupBy("author").count().show()

'''
#random split seems to work better the commented out balance Data Technique
preprocessed_train_data, preprocessed_test_data = preprocessed_train_data.randomSplit([0.90, 0.10], seed=42)


# Prepare data for Naive Bayes classification
train_dataset = preprocessed_train_data.select("features", "label")
test_dataset = preprocessed_test_data.select("features", "label")

from pyspark.ml.classification import LogisticRegression


# Train Naive Bayes model
naive_bayes = NaiveBayes()
naive_bayes_model = naive_bayes.fit(train_dataset)

# Make predictions on test data using Naive Bayes model
predictions = naive_bayes_model.transform(test_dataset)

# Evaluate the predictions
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy (No Hyperparameters): {:.2%}".format(accuracy))

#hyperparameters

# Use ParamGridBuilder to construct a grid of parameters to search over
paramGrid = ParamGridBuilder() \
    .addGrid(naive_bayes.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]) \
    .build()

# Define cross-validation
crossval = CrossValidator(estimator=naive_bayes,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cv_model = crossval.fit(train_dataset)

# Make predictions on test data using the model with the best set of parameters.
cv_predictions = cv_model.transform(test_dataset)

# Evaluate the model
cv_accuracy = evaluator.evaluate(cv_predictions)
print("Accuracy with CV and ParamGrid: {:.2%}".format(cv_accuracy))


                                                                                

Accuracy (No Hyperparameters): 83.05%


                                                                                

Accuracy with CV and ParamGrid: 84.57%
