In [None]:
import os
import sys

import sparknlp

from sparknlp.base import *
from sparknlp.common import *
from sparknlp.annotator import *

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession, Row

import pandas as pd

spark = sparknlp.start()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

spark

In [None]:
titleDF = spark.read \
      .option("header", True) \
      .csv("data/training_all_data.csv", sep=r'@')


In [None]:
titleDF.select('description').distinct().count()

In [None]:
titleDF.filter(titleDF.title.isNull())

In [None]:
from pyspark.sql.functions import col

titleDF.groupBy("title") \
    .count() \
    .orderBy(col("count").desc()) \
    .head(2)

In [None]:
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer

In [None]:
%%time

document_assembler = DocumentAssembler() \
      .setInputCol("description") \
      .setOutputCol("document")
    
tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")
      
normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

stemmer = Stemmer() \
      .setInputCols(["cleanTokens"]) \
      .setOutputCol("stem")

finisher = Finisher() \
      .setInputCols(["stem"]) \
      .setOutputCols(["token_features"]) \
      .setOutputAsArray(True) \
      .setCleanAnnotations(False)

countVectors = CountVectorizer(inputCol="token_features", outputCol="features", vocabSize=10000, minDF=5)

label_stringIdx = StringIndexer(inputCol = "title", outputCol = "label")

nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            countVectors,
            label_stringIdx])
nlp_model = nlp_pipeline.fit(titleDF)

processed = nlp_model.transform(titleDF)

processed.count()

In [None]:
processed.select('description','token_features').show(truncate=50)

In [None]:
processed.select('description','features','label').show()

In [None]:
# set seed for reproducibility
(trainingData, testData) = processed.randomSplit([0.8, 0.2], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [None]:
trainingData.printSchema()

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10000, regParam=0.3, elasticNetParam=0)

lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("description","title","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

evaluator.evaluate(predictions)

In [None]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
y_true = predictions.select("label")
y_true = y_true.toPandas()

y_pred = predictions.select("prediction")
y_pred = y_pred.toPandas()

In [None]:
y_pred.prediction.value_counts()

In [None]:
cnf_matrix = confusion_matrix(list(y_true.label.astype(int)), list(y_pred.prediction.astype(int)))
cnf_matrix

In [None]:
predictions

In [None]:
print(classification_report(y_true.label, y_pred.prediction))
print(accuracy_score(y_true.label, y_pred.prediction))

# Testing

In [None]:
from pyspark.ml.linalg import Vectors,Matrix

In [None]:
trainingData.select('title','label').distinct().show(50)

In [None]:
("project", "Experienced Data Scientist with a demonstrated history of working in the information technology and services industry. Skilled in Python (Programming Language), SQL and Data Science. Strong engineering professional graduated from Tbilisi State University.")
('project','We are seeking a full-time AP Clerk to support our accounting team. Job descriptions: Timely process vendor invoices for multi-entities Weekly payment process and positive pay uploading Maintain accurate vendor wire templates and process wire payments Create/import inbound reports Credit card transaction reconciliations Travel expense / employee reimbursement report Investigate aging AP and reconcile AP balance with vendors’ statements of accounts Maintain Fixed Assets log Ad hoc projects and misc. tasks Education and skills requirements: Associate degree in Accounting or Finance or high school diploma Very organized and detail oriented Positive work attitude and multi-task Proficient in Microsoft Office especially Excel; ERP experience (NetSuite) is preferred but not required. Others: This position will be in person to the office. Job Type: Full-time Pay: $40')

In [None]:
columns = ["title","description"]
data = [('project','designs and builds computer programs that power mobile devices, desktop computers, and even cars. They not only identify user needs but also create new applications for any given market while making improvements based on feedback from users.')]
rdd = spark.sparkContext.parallelize(data)
input_data = spark.createDataFrame(data).toDF(*columns)
input_data.head()

In [None]:
huge_data = titleDF.union(input_data)
nlp_model = nlp_pipeline.fit(huge_data)

In [333]:
huge_data = titleDF.union(input_data)
nlp_model = nlp_pipeline.fit(huge_data)

In [334]:
dfFromRDD2 = spark.createDataFrame(data).toDF(*columns)
test = nlp_model.transform(dfFromRDD2)

In [335]:
row = Matrix(rdd, 2)
lrModel.predict(test.head().features)

0.0