In [1]:
/* Add Deps */
%AddDeps edu.stanford.nlp stanford-corenlp 3.7.0
%AddDeps com.google.protobuf protobuf-java 2.6.1
%AddDeps com.databricks spark-csv_2.10 1.5.0 --transitive

// Non-repo dependencies 
%AddJar file:lib/corenlp-models.jar
%AddJar file:SE/target/scala-2.10/se_2.10-1.1.jar

Marking edu.stanford.nlp:stanford-corenlp:3.7.0 for download
Preparing to fetch from:
-> file:/tmp/toree_add_deps2551422517658262765/
-> https://repo1.maven.org/maven2
-> New file at /tmp/toree_add_deps2551422517658262765/https/repo1.maven.org/maven2/edu/stanford/nlp/stanford-corenlp/3.7.0/stanford-corenlp-3.7.0.jar
Marking com.google.protobuf:protobuf-java:2.6.1 for download
Preparing to fetch from:
-> file:/tmp/toree_add_deps2551422517658262765/
-> https://repo1.maven.org/maven2
-> New file at /tmp/toree_add_deps2551422517658262765/https/repo1.maven.org/maven2/com/google/protobuf/protobuf-java/2.6.1/protobuf-java-2.6.1.jar
Marking com.databricks:spark-csv_2.10:1.5.0 for download
Preparing to fetch from:
-> file:/tmp/toree_add_deps2551422517658262765/
-> https://repo1.maven.org/maven2
-> New file at /tmp/toree_add_deps2551422517658262765/https/repo1.maven.org/maven2/org/apache/commons/commons-csv/1.1/commons-csv-1.1.jar
-> New file at /tmp/toree_add_deps2551422517658262765/https/repo1

In [2]:
import com.evan.kaggle.se.FeatureEngineering._
val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(sc)
import sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import scala.collection.immutable.HashSet

In [3]:
val files = List("cooking")//, "crypto", "robotics", "biology", "travel", "diy")

/* Load and print all files */
val df_all = files.map(f => {
                        sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../dat/"+f+"_clean.csv")
                   }).reduce(_ unionAll _)

In [4]:
val clean_udf = udf( (s:String) => s.replaceAll("[^\\p{ASCII}]+", ""))

In [5]:
val mkFeat_UDF = udf((t: String, c: String, ta: String) => makeTrFeatures(5)(t, c, ta))

In [7]:
val featDF = df_all.select(explode(mkFeat_UDF($"title", $"content", $"tags")))
val featDF2 = featDF.select("col.*")

In [8]:
featDF2.show

+---------+-------+---------+------------------+--------+--------+-------+-----+
|    nGram|posTags|  depTags|            relPos|numWords|hasUpper|isTitle|isTag|
+---------+-------+---------+------------------+--------+--------+-------+-----+
|      <p>|     NN| compound|               0.0|       1|   false|  false|false|
|       my|   PRP$|nmod:poss|0.1111111111111111|       1|    true|  false|false|
|chocolate|     NN| compound|0.2222222222222222|       1|   false|  false|false|
|     chip|    NNS| compound|0.3333333333333333|       1|   false|  false|false|
|   cookie|    NNS|    nsubj|0.4444444444444444|       1|   false|  false| true|
|       be|    VBP|      cop|0.5555555555555556|       1|   false|  false|false|
|   always|     RB|   advmod|0.6666666666666666|       1|   false|  false|false|
|      too|     RB|   advmod|0.7777777777777778|       1|   false|  false|false|
|    crisp|     JJ|     root|0.8888888888888888|       1|   false|  false|false|
|      how|    WRB|   advmod

In [9]:
featDF2.count

7183278

In [10]:
featDF2.wite.save("mypar.parquet)

Name: Compile Error
Message: <console>:1: error: unclosed string literal
       featDF2.wite.save("mypar.parquet)
                         ^
StackTrace: 

In [15]:
val sub = trainingDF.sample(false, .001)

In [26]:
val replace_dash = udf( (s:String) => s.replaceAll("-", " "))

In [27]:
val data = sub.withColumn("posTags", replace_dash($"posTags")).
    withColumn("depTags", replace_dash($"depTags"))

In [None]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val tokenizer = new Tokenizer().setInputCol("posTags").
  setInputCol("depTags").setOutputCol("tags")
val wordsData = tokenizer.transform(data)
val hashingTF = new HashingTF().
  setInputCol("tags").setOutputCol("hashedTags").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
val idf = new IDF().setInputCol("hashedTags").setOutputCol("idfTags")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestClassifier().
  setLabelCol("isTag").
  setFeaturesCol("idfTags").
  .setNumTrees(10)

// Chain indexers and forest in a Pipeline
val pipeline = new Pipeline().
setStages(Array(rescaledData, rf))

// Train model.  This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator().
setLabelCol("indexedLabel").
setPredictionCol("prediction").
setMetricName("precision")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))

val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
println("Learned classification forest model:\n" + rfModel.toDebugString)