In [None]:
import sparknlp
import findspark
findspark.init()

spark = sparknlp.start()
print('spark version' + spark.version)

history = spark.read.csv('sample_history.csv', sep='|', header=True)

print(history)
print(history.schema)

history.show(3)


In [None]:
from datetime import datetime, date
from pyspark.sql import *

from pyspark.sql.functions import *
from pyspark.ml import *
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.pretrained import *
from sparknlp.base import *

history = spark.read.csv('sample_history.csv', sep='|',
                         header=True).toDF("kind", "text", "url")

document = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

# we can also use sentence detector here
# if we want to train on and get predictions for each sentence
# downloading pretrained embeddings
use = UniversalSentenceEncoder.pretrained()\
    .setInputCols(["document"])\
    .setOutputCol("sentence_embeddings")
# the classes/labels/categories are in category column
classsifierdl = ClassifierDLApproach()\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("predicted_category")\
    .setLabelColumn("category")\
    .setMaxEpochs(50)\
    .setEnableOutputLogs(True)

use_clf_pipeline = Pipeline(
    stages=[
        document,
        use,
        classsifierdl
    ])

training_data = spark.createDataFrame([
    ['How to make a python3 spark dataframe', 'work'],
    ['where to go out and eat with friends?', 'fun'],
]).toDF("text", "category")

use_pipelineModel = use_clf_pipeline.fit(training_data)


In [None]:
history_labeled = use_pipelineModel.transform(history)

first_prediction = history_labeled.predicted_category.getItem(0)
(
    history_labeled.select(
        history_labeled.text,
        first_prediction.metadata.getItem("fun").alias("fun_score"),
        first_prediction.getItem("result").alias("predicted_category")
    )
    .filter(history_labeled.kind == 'Searched')
    .filter(first_prediction.getItem("result") == 'fun')
    .orderBy(desc("fun_score")).show(300, truncate=300)
)


In [None]:
data = history_labeled.collect()


def sortkey(row):
    try:
        return float(row[5][0][4]["fun"])
    except IndexError:
        return 0


data.sort(key=sortkey)

for x in data[-20:]:
    print(x[1])
