# NLP RECSYS Pipeline




In [1]:
import com.johnsnowlabs.nlp.SparkNLP
import com.johnsnowlabs.nlp.annotator._
import com.johnsnowlabs.nlp.base._
import com.johnsnowlabs.ml.tensorflow.TensorflowBert
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{udf,to_timestamp}
import org.apache.spark.storage._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.classification._
import org.apache.spark.ml.linalg.DenseVector

// Imports below are used for customer ML Transformer
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, explode, udf}
import org.apache.spark.sql.types.{DataTypes, StructType}


val dataDir = sys.env("HOME") + "/recsys2020"
val dsName = "training1m"

In [2]:
val df = spark.read.parquet(dataDir + s"/${dsName}.parquet").limit(1000)
//.limit(1000).persist(StorageLevel.MEMORY_ONLY) // Remove limit after experiments
df

[user_id: string, tweet_id: string ... 17 more fields]

In [3]:
class Exploder(override val uid: String = "EXPLODERASDGDASHASFDA") extends Transformer {
  // Example of custom estimator: https://docs.databricks.com/applications/machine-learning/mllib/advanced-mllib.html

  // Transformer Params
  // Defining a Param requires 3 elements:
  //  - Param definition
  //  - Param getter method
  //  - Param setter method
  // (The getter and setter are technically not required, but they are nice standards to follow.)

  /**
   * Param for input column name.
   */
  final val inputCol: Param[String] = new Param[String](this, "inputCol", "input column name")

  final def getInputCol: String = $(inputCol)

  final def setInputCol(value: String): Exploder = set(inputCol, value)

  /**
   * Param for output column name.
   */
  final val outputCol: Param[String] = new Param[String](this, "outputCol", "output column name")

  final def getOutputCol: String = $(outputCol)

  final def setOutputCol(value: String): Exploder = set(outputCol, value)

  /**
   * This method implements the main transformation.
   * Its required semantics are fully defined by the method API: take a Dataset or DataFrame,
   * and return a DataFrame.
   *
   * Most Transformers are 1-to-1 row mappings which add one or more new columns and do not
   * remove any columns.  However, this restriction is not required.  This example does a flatMap,
   * so we could either (a) drop other columns or (b) keep other columns, making copies of values
   * in each row as it expands to multiple rows in the flatMap.  We do (a) for simplicity.
   */
  override def transform(dataset: Dataset[_]): DataFrame = {
    dataset.withColumn($(outputCol), explode(col($(inputCol))))
  }

  /**
   * Check transform validity and derive the output schema from the input schema.
   *
   * We check validity for interactions between parameters during `transformSchema` and
   * raise an exception if any parameter value is invalid. Parameter value checks which
   * do not depend on other parameters are handled by `Param.validate()`.
   *
   * Typical implementation should first conduct verification on schema change and parameter
   * validity, including complex parameter interaction checks.
   */
  override def transformSchema(schema: StructType): StructType = {
      val inputColType = schema.fields(schema.fieldIndex($(inputCol))).dataType.asInstanceOf[ArrayType];
      // Compute output type.
      // This is important to do correctly when plugging this Transformer into a Pipeline,
      // where downstream Pipeline stages may expect use this Transformer's output as their input.
      schema.add($(outputCol), inputColType.elementType)
  }

  /**
   * Creates a copy of this instance.
   * Requirements:
   *  - The copy must have the same UID.
   *  - The copy must have the same Params, with some possibly overwritten by the `extra`
   *    argument.
   *  - This should do a deep copy of any data members which are mutable.  That said,
   *    Transformers should generally be immutable (except for Params), so the `defaultCopy`
   *    method often suffices.
   * @param extra  Param values which will overwrite Params in the copy.
   */
  override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
}



In [4]:
val doc = new DocumentAssembler()
    .setInputCol("sentence")
    .setOutputCol("document")
    .setCleanupMode("shrink")

val use = UniversalSentenceEncoder
      .pretrained()
      .setInputCols(Array("document"))
      .setOutputCol("tweet_embeddings")

val fin = new EmbeddingsFinisher()
      .setInputCols(use.getOutputCol)
      .setOutputCols("finished_tweet_embeddings")
      .setOutputAsVector(true)
      .setCleanAnnotations(false)

val exploder = new Exploder()
  .setInputCol(fin.getOutputCols(0))
  .setOutputCol("embedding_features")

val tweetTypeIndexer = new StringIndexerModel(Array("TopLevel", "Retweet", "Quote", "Reply"))
  .setInputCol("tweet_type")
  .setOutputCol("tweet_type_idx");

val tweetTypeEncoder = new OneHotEncoder()
  .setInputCol(tweetTypeIndexer.getOutputCol)
  .setOutputCol("tweet_type_onehot")

val scaleAss = new VectorAssembler()
  .setInputCols(Array("author_follower_count", "author_following_count", "user_follower_count", "user_following_count"))
  .setOutputCol("count_features")

val scaler = new StandardScaler()
  .setInputCol(scaleAss.getOutputCol)
  .setOutputCol("count_features_scaled")
  .setWithStd(true)
  .setWithMean(false)

val ass = new VectorAssembler()
  .setInputCols(Array(
      "embedding_features",
      tweetTypeEncoder.getOutputCol,
      scaler.getOutputCol,
      "author_is_verified",
      "user_is_verified",
      "follows"
    )).setOutputCol("features")

// You need to setProbabilityCol,  setPredictionCol and setRawPredictionCol
// otherwise you get a name conflict in the pipeline

val classNames = Array(
  "retweet",
  "retweet_with_comment",
  "like",
  "reply")

val classifiers = for (className <- classNames) yield new GBTClassifier()
                        .setLabelCol("has_" + className)
                        .setFeaturesCol("features")
                        .setProbabilityCol("prob_" + className)
                        .setPredictionCol("pred_" + className)
                        .setRawPredictionCol("predraw_" + className)
                        .setMaxIter(10)
                        .setFeatureSubsetStrategy("auto");

val pred_pipeline = new Pipeline().setStages(Array(
  doc, 
  use,
  fin,
  exploder,
  scaleAss, 
  scaler, 
  tweetTypeIndexer, 
  tweetTypeEncoder, 
  ass) ++ classifiers)

In [5]:
val fitted_pipeline = pred_pipeline.fit(df)

In [6]:
val val_df = spark.read.parquet(dataDir + s"/val.parquet")


In [7]:
var finalDf = fitted_pipeline.transform(df)
finalDf.show()

+--------------------+--------------------+----------+-------------------------------------+---------------------+----------------------+------------------+-------------------+--------------------+----------------+-------+---------------+--------------------+-------------+---------------+-----------+------------------------+--------+---------+--------------------+--------------------+-------------------------+--------------------+--------------------+---------------------+--------------+-----------------+--------------------+--------------------+--------------------+------------+----------------------------+-------------------------+-------------------------+--------------------+--------------------+---------+--------------------+--------------------+----------+
|             user_id|            tweet_id|tweet_type|                             sentence|author_follower_count|author_following_count|author_is_verified|user_follower_count|user_following_count|user_is_verified|follows|tweet

In [8]:
// only get relevant columns
val toArr: Any => Double = _.asInstanceOf[DenseVector].toArray(1)
val toArrUdf = udf(toArr)

var finalFinalDf = finalDf;
val outputNames = for (className <- classNames) yield {
    val inputName = "prob_" + className;
    val outputName = "out_" + className;
    finalFinalDf = finalFinalDf.withColumn(outputName, toArrUdf(col(inputName)));
    outputName
}

finalFinalDf.selectExpr((Array("user_id", "tweet_id") ++ outputNames):_*).show()

+--------------------+--------------------+-------------------+------------------------+-------------------+-------------------+
|             user_id|            tweet_id|        out_retweet|out_retweet_with_comment|           out_like|          out_reply|
+--------------------+--------------------+-------------------+------------------------+-------------------+-------------------+
|D0EA9DDFE93EDA782...|FB6304C97F6CC05AF...|0.07695635807215107|     0.06587782434721734| 0.3139076565453959|0.06636797904544467|
|5CD17BD84873464C2...|8A025814338D4CAB5...|0.07194803392575488|     0.06587782434721712|0.20260358490988395|0.06636797904544467|
|6B8C5EFC300170EF8...|8E8EC11B9492B6C09...|0.09454389040798361|     0.06587782434721712|0.06064579290864147|0.06636797904544467|
|2F5D9BFEBEB24F290...|D59F5C535C7C892D7...| 0.0780199614242677|     0.06587782434721734| 0.7364543359391639|0.06663946585831226|
|073770D7360B4F399...|7D6700DA02116E216...|0.07404961521109132|     0.06587782434721712| 0.716254

In [9]:
for (className <- classNames) {
    finalFinalDf.select($"tweet_id", $"user_id", col("out_" + className)).write.format("csv").option("header", "false").save(dataDir + "/" + className + ".csv")
}