In [1]:
! pip install -q pyspark==3.2.0 spark-nlp==3.4.2

[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[K     |████████████████████████████████| 142 kB 14.6 MB/s 
[K     |████████████████████████████████| 198 kB 46.8 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import sparknlp

spark = sparknlp.start(gpu = True, spark32=True) 
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import pandas as pd
import os

print("Spark NLP version", sparknlp.version())          # check για versions sparknlp και pyspark
print("Apache Spark version:", spark.version)

spark

Spark NLP version 3.4.2
Apache Spark version: 3.2.0


In [3]:
df = spark.read.option("header", "true")\
    .option("sep", ",")\
    .option("multiLine", "true")\
    .option("quote", "\"")\
    .option("escape", "\"") \
    .option("ignoreTrailingWhiteSpace", True) \
    .csv("job_postings.csv")

df = df.select("description", "fraudulent") # κρατάμε με select εδώ τις δύο στήλες που χρειαζόμαστε
df.show()
df.printSchema() # βλέπουμε το dataset και ελέγχουμε αν φορτώθηκε σωστά το dataset

+--------------------+----------+
|         description|fraudulent|
+--------------------+----------+
|Food52, a fast-gr...|         0|
|Organised - Focus...|         0|
|Our client, locat...|         0|
|THE COMPANY: ESRI...|         0|
|JOB TITLE: Itemiz...|         0|
|Job OverviewApex ...|         0|
|Your Responsibili...|         0|
|Who is Airenvy?He...|         0|
|Implementation/Co...|         0|
|The Customer Serv...|         0|
|Position : #URL_8...|         0|
|TransferWise is t...|         0|
|The Applications ...|         0|
|Event Industry In...|         0|
|Are you intereste...|         0|
|About Vault Drago...|         0|
|We are looking fo...|         0|
|Government fundin...|         0|
|Kettle is hiring ...|         0|
|Experienced Proce...|         0|
+--------------------+----------+
only showing top 20 rows

root
 |-- description: string (nullable = true)
 |-- fraudulent: string (nullable = true)



In [4]:
from pyspark.sql.functions import *
from sparknlp.base import *
from sparknlp.annotator import *
from sklearn.metrics import classification_report, accuracy_score
# we will try oversampling on our dataset ( it is very imbalanced though since 95% are zeros on fraudulent col)

major_df = df.filter(col("fraudulent") == 0)
minor_df = df.filter(col("fraudulent") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
# duplicate the minority rows (ones)
a = range(ratio)
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')
# combine both oversampled minority rows and previous majority rows
df = major_df.unionAll(oversampled_df)
# Maybe this gives us a more balanced dataset
df.groupBy('fraudulent').count().show()   # για να δούμε τώρα αν έφτιαξε το dataset , έφτιαξε αρκετά 
# δημιουργήσαμε νέους άσους(εγγραφές με fraudulent 1 ), ωστόσο τα αποτελέσματα θα είναι λίγο πλασματικά 
# μιας και απλά αυξήσαμε σχεδόν για να φτάσουν στα ίδια μεγέθη

ratio: 19
+----------+-----+
|fraudulent|count|
+----------+-----+
|         0|17014|
|         1|16454|
+----------+-----+



In [5]:
#Oversampled dataset Word Embeddings Classifierdl 

document_assembler = DocumentAssembler()\
    .setInputCol("description")\
    .setOutputCol("document")

tokenizer = Tokenizer()\
    .setInputCols(["document"])\
    .setOutputCol("tokens")

normalizer = Normalizer()\
    .setInputCols(["tokens"])\
    .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
    .setInputCols("normalized")\
    .setOutputCol("cleanTokens")\
    .setCaseSensitive(False)

lemma = LemmatizerModel.pretrained("lemma_antbnc")\
    .setInputCols(["cleanTokens"])\
    .setOutputCol("lemma")

word_embeddings = WordEmbeddingsModel().pretrained()\
    .setInputCols(["document", "lemma"])\
    .setOutputCol("embeddings")\
    .setCaseSensitive(False)

embeddingsSentence = SentenceEmbeddings() \
    .setInputCols(["document", "embeddings"]) \
    .setOutputCol("sentence_embeddings") \
    .setPoolingStrategy("AVERAGE")

clf_dl = ClassifierDLApproach()\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("class")\
    .setLabelColumn("fraudulent")\
    .setMaxEpochs(7)\
    .setEnableOutputLogs(True)

clf_pipeline = Pipeline(stages=
        [document_assembler, 
         tokenizer, 
         normalizer, 
         stopwords_cleaner,
         lemma,
         word_embeddings,
         embeddingsSentence,
         clf_dl])



splits = df.randomSplit([0.8, 0.2])     # έχει αλλάξει το δf αυτό δεν είναι το προηγούμενο
train = splits[0]
test = splits[1]

clf_pipelineModel = clf_pipeline.fit(train)

preds = clf_pipelineModel.transform(test)
preds_df = preds.select('fraudulent','description',"class.result").toPandas()
preds_df["result"] = preds_df["result"].apply(lambda x: x[0])
print(classification_report(preds_df.fraudulent, preds_df.result))

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]
              precision    recall  f1-score   support

           0       0.91      0.95      0.93      3385
           1       0.95      0.90      0.92      3248

    accuracy                           0.93      6633
   macro avg       0.93      0.93      0.93      6633
weighted avg       0.93      0.93      0.93      6633



In [8]:
# επειδή και εδώ αναμέναμε κακά αποτελέσματα, κάναμε πρώτα oversampling στο αρχικό df και μετά κάναμε word embeddings
# ας δούμε τώρα τι επιστρλεφει το classification report στο dataset το αρχικό
# Χωρίς oversampling word embeddings


df = spark.read.option("header", "true")\
    .option("sep", ",")\
    .option("multiLine", "true")\
    .option("quote", "\"")\
    .option("escape", "\"") \
    .option("ignoreTrailingWhiteSpace", True) \
    .csv("job_postings.csv")

c = df.groupBy("fraudulent").count()
c.show()

+----------+-----+
|fraudulent|count|
+----------+-----+
|         0|17014|
|         1|  866|
+----------+-----+



In [9]:
#CLASSIFIERDL USING WORD EMBEDDINGS (PRETRAINED)

document_assembler = DocumentAssembler()\
    .setInputCol("description")\
    .setOutputCol("document")

tokenizer = Tokenizer()\
    .setInputCols(["document"])\
    .setOutputCol("tokens")

normalizer = Normalizer()\
    .setInputCols(["tokens"])\
    .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
    .setInputCols("normalized")\
    .setOutputCol("cleanTokens")\
    .setCaseSensitive(False)

lemma = LemmatizerModel.pretrained("lemma_antbnc")\
    .setInputCols(["cleanTokens"])\
    .setOutputCol("lemma")

word_embeddings = WordEmbeddingsModel().pretrained()\
    .setInputCols(["document", "lemma"])\
    .setOutputCol("embeddings")\
    .setCaseSensitive(False)

embeddingsSentence = SentenceEmbeddings() \
    .setInputCols(["document", "embeddings"]) \
    .setOutputCol("sentence_embeddings") \
    .setPoolingStrategy("AVERAGE")

clf_dl = ClassifierDLApproach()\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("class")\
    .setLabelColumn("fraudulent")\
    .setMaxEpochs(7)\
    .setEnableOutputLogs(True)

clf_pipeline = Pipeline(stages=
        [document_assembler, 
         tokenizer, 
         normalizer, 
         stopwords_cleaner,
         lemma,
         word_embeddings,
         embeddingsSentence,
         clf_dl])



splits = df.randomSplit([0.8, 0.2])     
train = splits[0]
test = splits[1]

clf_pipelineModel = clf_pipeline.fit(train)

preds = clf_pipelineModel.transform(test)
preds_df = preds.select('fraudulent','description',"class.result").toPandas()
preds_df["result"] = preds_df["result"].apply(lambda x: x[0])
print(classification_report(preds_df.fraudulent, preds_df.result))


lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]
              precision    recall  f1-score   support

           0       0.95      1.00      0.98      3384
           1       0.00      0.00      0.00       167

    accuracy                           0.95      3551
   macro avg       0.48      0.50      0.49      3551
weighted avg       0.91      0.95      0.93      3551



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
