In [None]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.3.1",
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

sc.install_pypi_package('spark-nlp', "https://pypi.org/simple")
sc.install_pypi_package('Cython==0.29.21', "https://pypi.org/simple")
sc.install_pypi_package('pandas==1.1.5', "https://pypi.org/simple")
# sc.install_pypi_package('scipy', "https://pypi.org/simple")
# sc.install_pypi_package('scikit-learn', "https://pypi.org/simple")
# sc.install_pypi_package('sentence-transformers', "https://pypi.org/simple")

from pyspark.ml import Pipeline, PipelineModel, Transformer
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import *
from sparknlp.common import RegexRule
from sparknlp.base import *
import numpy as np
import re
from pyspark.ml.feature import VectorAssembler, StopWordsRemover, Word2Vec
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.util import Identifiable

from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

Let's start the pipeline

Firstly, I want to create a list of common stop words that include oddities from the data itself (known from previous attempts at cleaning the data.

# get stop words from PySpark NLP
stop_words = set(StopWordsRemover().getStopWords())

custom_stopwords = set(["could've", "would've", "r", "u/", "u", '/u' "/r" "r/", "t", 've', 's', 'm', 
                        'll', 'nt', 'd', 're', 'n', 'y', 'b', 'p', 'f', 'c', 'e', 'g', "say", "go", 
                        'h', 'j', 'k', 'l', 'o', 'q', 'v', 'w', 'x', 'z', 'a', 'i', "gt", "amp", "us",
                        "like", "don", "just", "kinda", "want", "know", "think", "dosnt", "couldnt", "wouldnt",
                        "get", "andor", "andme", "doesnt", "ect","soo", "sooo", "soooo", "sooooo", "though", "into"
                        "unto", "onto", "meanwhile","soooooo", "sooooooo", "eachother", "dont", "wont", "cant", 
                        "modmail", "cuz", "andnbsp", "los", "yoffe", "bc", "thier", "ou", "andnbsp", "shant", "shouldnt",  
                        "ve","alot", "atleast", "their", "thier", "yall", "notall", "noone", "eithe", "hai","tion"])
# combine the stop words to one list
stop_list = list(stop_words | custom_stopwords)

# load in the data
newer_df = spark.read.option("inferSchema", "true") \
    .parquet('s3://ethan-kozlowski-project/input.parquet')
newer_df.persist()

Let's take a little look inside

newer_df.printSchema()

print(f"Number of rows: {newer_df.count()}")
print(f"Number of columns: {len(newer_df.columns)}")

We can see that there are around 300k entries. This includes posts and comments to these posts.  This data has been somewhat precleaned and has posts with missing authors removed. Additionally, the full_text column includes all text available for a post, including the title appended to the front when applicable. Additionally there is information on the subreddit and the subject (target) of the reply/comment. For original posts, the target, author, and indirect_target will all be the same person. 

newer_df.show(truncate=15)

documentAssembler = DocumentAssembler()\
    .setInputCol("full_text")\
    .setOutputCol("document")

# tokenize text
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("tokens")

# normalize tokens
normalizer = Normalizer() \
    .setInputCols(["tokens"]) \
    .setOutputCol("norm_tokens")

# clean stopwords
stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(["norm_tokens"]) \
    .setOutputCol("clean_tokens") \
    .setStopWords(stop_list)

pos = PerceptronModel.pretrained("pos_anc", 'en')\
        .setInputCols("document", "clean_tokens")\
        .setOutputCol("pos")


# #  bert embeddings
# bert_embeddings = BertEmbeddings.pretrained('bert_base_uncased', 'en') \
#     .setInputCols(["document", "clean_tokens"]) \
#     .setOutputCol("features") \
#     .setCaseSensitive(False) \
#     .setMaxSentenceLength(512)

# # required for classifier DL later
# sentence_embeddings = SentenceEmbeddings()\
#     .setInputCols(["document", "features"])\
#     .setOutputCol("sentence_embeddings")\
#     .setPoolingStrategy("AVERAGE")

# # deep learning classifier
# classifier_dl = ClassifierDLApproach() \
#     .setInputCols(["sentence_embeddings"]) \
#     .setOutputCol("pred_cat_is_adoptee") \
#     .setLabelColumn("is_adoptee") \
#     .setMaxEpochs(5) \
#     .setEnableOutputLogs(True)



class TokenCountTransformer(Transformer):
    @keyword_only
    def __init__(self, inputCol, outputCol):
        super(TokenCountTransformer, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, ddf):
        # get count of tokens
        count_tokens = F.udf(lambda tokens: len(tokens), IntegerType())
        return ddf.withColumn(self.outputCol, count_tokens(ddf[self.inputCol]))

# count total tokens
token_count_transformer = TokenCountTransformer(
    inputCol="tokens", 
    outputCol="num_tokens")

# count norm tokens    
norm_token_count_transformer = TokenCountTransformer(
    inputCol="clean_tokens", 
    outputCol="num_norm_tokens")


finisher = Finisher() \
    .setInputCols(["clean_tokens"]) \
    .setOutputCols(["finished_tokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(True)

word2Vec = Word2Vec() \
    .setInputCol("finished_tokens") \
    .setOutputCol("features") \
    .setVectorSize(300) \
    .setMinCount(5) \
    .setWindowSize(5) \
    .setSeed(42)


vector_pipeline = Pipeline(stages=[
    documentAssembler,
    tokenizer,
    normalizer,
    stopwords_cleaner,
    pos,
    token_count_transformer,
    norm_token_count_transformer,
#     bert_embeddings,
#     sentence_embeddings,
#     classifier_dl,
    finisher,
    word2Vec,
])


feature_df = vector_pipeline.fit(newer_df).transform(newer_df)

feature_df[["is_adoptee", "features"]].show(truncate=10)

labeled_df = feature_df.filter(newer_df.is_adoptee != -1)
train_df, test_df = labeled_df.randomSplit([0.8, 0.2], seed=42)
train_df.persist()
test_df.persist()

# make the model pipeline too
assembler = VectorAssembler(inputCols=["features"], outputCol="assembled_features")


# let's try these models
lr = LogisticRegression(featuresCol="assembled_features", labelCol="is_adoptee")
rf = RandomForestClassifier(featuresCol="assembled_features", labelCol="is_adoptee")

# Create pipelines
lr_pipeline = Pipeline(stages=[assembler, lr])
rf_pipeline = Pipeline(stages=[assembler, rf])


# Define parameter grids
lr_param_grid = (ParamGridBuilder()
                 .addGrid(lr.regParam, [0.01, 0.1, 1.0])
                 .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                 .build())

rf_param_grid = (ParamGridBuilder()
                 .addGrid(rf.numTrees, [10, 50, 100])
                 .addGrid(rf.maxDepth, [5, 10, 20])
                 .build())

evaluator = BinaryClassificationEvaluator(labelCol="is_adoptee", metricName="areaUnderROC")

# Set up cross validators
lr_cv = CrossValidator(estimator=lr_pipeline,
                       estimatorParamMaps=lr_param_grid,
                       evaluator=evaluator,
                       numFolds=5)

rf_cv = CrossValidator(estimator=rf_pipeline,
                       estimatorParamMaps=rf_param_grid,
                       evaluator=evaluator,
                       numFolds=5)

# Train models with cross-validation
lr_cv_model = lr_cv.fit(train_df)
rf_cv_model = rf_cv.fit(train_df)

# Evaluate logistic regression model
lr_preds = lr_cv_model.transform(test_df)
lr_auc = evaluator.evaluate(lr_preds)
print(f"Logistic Regression AUC: {lr_auc}")

# Evaluate random forest model
rf_preds = rf_cv_model.transform(test_df)
rf_auc = evaluator.evaluate(rf_preds)
print(f"Random Forest AUC: {rf_auc}")

# Save the best models
lr_cv_model.bestModel.save("s3://ethan-kozlowski-project/lr_best_model")
rf_cv_model.bestModel.save("s3://ethan-kozlowski-project/rf_best_model")

# Save DataFrame to S3 in Parquet format
result.write.mode("overwrite").parquet("s3://ethan-kozlowski-project/output")