An example of a **production** model

```scala

package com.growbots.spark_models

import ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator
import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, VectorAssembler}
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics


object DomainQualification {
  val spark: SparkSession = create_spark_session()

  def main(args: Array[String]) {

    val toInt = udf[Int, String](_.toInt)
    val toDouble = udf[Double, String](_.toDouble)

    // load labeled data
    val labeled_data_raw = load_labeled_dataset()
    labeled_data_raw.createOrReplaceTempView("labeled_data")
    labeled_data_raw.printSchema()

    println("Total domains %d".format(labeled_data_raw.count()))

    val homepages = spark.read.parquet("gs://ai-bucket/company_crawl/parts.parquet")
    homepages.createOrReplaceTempView("pages")

    val company_profiles = spark.read.parquet("gs://ai-bucket/profiles/raw_company_data.parquet")
    company_profiles.createOrReplaceTempView("company_profiles")
    company_profiles.printSchema()

    val domains_labeled = spark.sql(
      """
      SELECT company_profiles.domain,
             MAX(labeled_data.label) as label,
             MAX(COALESCE(company_profiles.description,'')) as description,
             MAX(COALESCE(company_profiles.specialties,"")) as specialties,
             MAX(COALESCE(pages.html,"")) as html,
             MAX(COALESCE(company_profiles.industry,'')) as industry,
             MAX(COALESCE(company_profiles.headquarters_address_raw,'')) as address
      FROM company_profiles
           INNER JOIN labeled_data ON company_profiles.domain = labeled_data.domain
           LEFT JOIN pages ON pages.domain = labeled_data.domain
      WHERE
           (company_profiles.description is not NULL or pages.html is not NULL)
      GROUP BY 1
      """).cache

    println("Total training examples %d".format(domains_labeled.count()))

    val (pipeline, model) = createPipeline()

    println("Validating")
    val Array(domains_labeled_tr, domains_labeled_te) = domains_labeled.randomSplit(Array(0.6, 0.4))
    val pipeline_model = pipeline.fit(domains_labeled_tr)
    val preds = pipeline_model
      .transform(domains_labeled_te)
      .cache

    val preds_labels = preds
      .rdd.map(row => (row.getAs[Double]("label"), row.getAs[Seq[Float]]("probabilities")(0).toDouble))

    val metrics = new BinaryClassificationMetrics(preds_labels)
    println("Auc = " + metrics.areaUnderROC)

    println("Training on all data")
    val pipeline_model_full = pipeline.fit(domains_labeled)

    println("Predicting new profiles")
    val domains_not_labeled = spark.sql(
      """
      SELECT company_profiles.domain,
             MAX(COALESCE(company_profiles.description,'')) as description,
             MAX(COALESCE(company_profiles.specialties,"")) as specialties,
             MAX(COALESCE(pages.html,"")) as html,
             MAX(COALESCE(company_profiles.industry,'')) as industry,
             MAX(COALESCE(company_profiles.headquarters_address_raw,'')) as address
      FROM company_profiles
           LEFT JOIN labeled_data ON company_profiles.domain = labeled_data.domain
           LEFT JOIN pages ON pages.domain = labeled_data.domain
      WHERE labeled_data.domain is NULL and
            (company_profiles.description is not NULL or pages.html is not NULL)
      GROUP BY 1
      """).repartition(200)

    pipeline_model_full.transform(domains_not_labeled)
      .select("domain", "probabilities")
      .write.mode("overwrite")
      .parquet("gs://ai-bucket/notebooks-data/sdr/qualify_growbots_leads-v2/domains_not_labeled_with_probs_v2.parquet")
  }

  def create_spark_session(): SparkSession = {
    return SparkSession
      .builder
      .appName("Domain Qualification")
      .getOrCreate()
  }

  def load_labeled_dataset(): DataFrame = {
    val path = "gs://ai-bucket/notebooks-data/sdr/qualify_growbots_leads-v2/domains_qualified_v2.csv"
    val labeled_data_raw = spark.read.option("inferSchema", "true").csv(path)
      .withColumnRenamed("_c0", "domain")
      .withColumnRenamed("_c1", "label").distinct()
    val labeled_data_raw_2 = labeled_data_raw.withColumn("label", labeled_data_raw.col("label").cast("double"))
    return labeled_data_raw_2
  }

  def createPipeline(): (Pipeline, XGBoostEstimator) = {

    val tokenizer1 = new RegexTokenizer()
      .setToLowercase(true)
      .setPattern("(?u)\\b\\w\\w+\\b") // default scikit-learn
      .setGaps(false)
      .setInputCol("description")
      .setOutputCol("description_words")

    val hashingTF1 = new CountVectorizer()
      .setMinDF(5)
      .setInputCol(tokenizer1.getOutputCol)
      .setOutputCol("description_tf")

    val tokenizer2 = new RegexTokenizer()
      .setToLowercase(true)
      .setPattern("(?u)\\b\\w\\w+\\b") // default scikit-learn
      .setGaps(false)
      .setInputCol("html")
      .setOutputCol("html_words")

    val hashingTF2 = new CountVectorizer()
      .setMinDF(5)
      .setInputCol(tokenizer2.getOutputCol)
      .setOutputCol("html_tf")

    val tokenizer3 = new RegexTokenizer()
      .setToLowercase(true)
      .setPattern("(?u)\\b\\w\\w+\\b") // default scikit-learn
      .setGaps(false)
      .setInputCol("specialties")
      .setOutputCol("specialties_words")

    val hashingTF3 = new CountVectorizer()
      .setMinDF(5)
      .setInputCol(tokenizer3.getOutputCol)
      .setOutputCol("specialties_tf")

    val tokenizer4 = new RegexTokenizer()
      .setToLowercase(true)
      .setPattern("(?u)\\b\\w\\w+\\b") // default scikit-learn
      .setGaps(false)
      .setInputCol("address")
      .setOutputCol("address_words")

    val hashingTF4 = new CountVectorizer()
      .setMinDF(5)
      .setInputCol(tokenizer4.getOutputCol)
      .setOutputCol("address_tf")

    val tokenizer5 = new RegexTokenizer()
      .setToLowercase(true)
      .setPattern("(?u)\\b\\w\\w+\\b") // default scikit-learn
      .setGaps(false)
      .setInputCol("industry")
      .setOutputCol("industry_words")

    val hashingTF5 = new CountVectorizer()
      .setMinDF(5)
      .setInputCol(tokenizer5.getOutputCol)
      .setOutputCol("industry_tf")

    val va = new VectorAssembler()
      .setInputCols(Array("description_tf", "html_tf", "specialties_tf", "address_tf", "industry_tf"))
      .setOutputCol("features")

    val numRound = 50
    val numWorkers = 4
    val paramMap = List(
      "eta" -> 0.1f,
      "max_depth" -> 6,
      "min_child_weight" -> 3.0,
      "subsample" -> 1.0,
      "colsample_bytree" -> 0.82,
      "colsample_bylevel" -> 0.9,
      "base_score" -> 0.005,
      "eval_metric" -> "auc",
      "seed" -> 49,
      "silent" -> 1,
      "objective" -> "binary:logistic").toMap

    val model = new XGBoostEstimator(xgboostParams = paramMap, round = numRound, nWorkers = numWorkers)

    val pipeline = new Pipeline()
      .setStages(Array(tokenizer1, hashingTF1,
        tokenizer2, hashingTF2,
        tokenizer3, hashingTF3,
        tokenizer4, hashingTF4,
        tokenizer5, hashingTF5, va, model))

    (pipeline, model)
  }
}

```

Lucene tokenizer

```scala
package com.growbots.nlp

import org.apache.lucene.analysis.en.EnglishAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.ml.util.{DefaultParamsReadable, _}
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}

import scala.collection.mutable.ArrayBuffer

class LuceneTokenizer(override val uid: String)
  extends UnaryTransformer[String, Seq[String], LuceneTokenizer] with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("lucene_tok"))

  private def tokenize(text: String): Seq[String] = {
    val analyzer = new EnglishAnalyzer()
    val tokenStream = analyzer.tokenStream("contents", text)

    val term = tokenStream.addAttribute(classOf[CharTermAttribute])

    tokenStream.reset()

    var result = ArrayBuffer.empty[String]

    while (tokenStream.incrementToken()) {
      result += term.toString.toLowerCase
    }

    tokenStream.end()
    tokenStream.close()
    result
  }

  override protected def createTransformFunc: String => Seq[String] = tokenize

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == StringType, s"Input type must be string type but got $inputType.")
  }

  override protected def outputDataType: DataType = new ArrayType(StringType, true)
}

object LuceneTokenizer extends DefaultParamsReadable[Tokenizer] {
  override def load(path: String): Tokenizer = super.load(path)
}
```