In [1]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


In [4]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

    
val dataPath= "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

raw_sentiment.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



dataSchema = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath = /home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment = [label: int, tweet: string]


[label: int, tweet: string]

In [3]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer, StopWordsRemover}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row


// разбиватель твита на массив слов
val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

// удалитель незначащих слов
val stopWordsRemover = new StopWordsRemover()
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("cleaned_words")

// счетчик частоты слов
val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(stopWordsRemover.getOutputCol)
    .setOutputCol("features")

//val testT = tokenizer.transform(raw_sentiment)
//testT.show()

//val testWR = swRemover.transform(testT)
//testWR.show()

//val testTF = hashingTF.transform(testWR)

//testTF.select($"words", $"features").show(50, 200)

// классификатор RandomForest, кол-во деревьев - 3
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setNumTrees(3)

// соединяем все элементы в pipeline
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, stopWordsRemover, hashingTF, rf))


tokenizer = tok_0a9e34b79e07
stopWordsRemover = stopWords_4f731dd6824a
hashingTF = hashingTF_920979ae9aba
rf = rfc_fbc1893e4117


pipeline: org.apache.spark.ml.P...


rfc_fbc1893e4117

In [18]:
// обучаем модель
val model = pipeline.fit(raw_sentiment)

model = pipeline_d732f58e5dab


pipeline_d732f58e5dab

In [19]:
// сохраняем модель
model.write.overwrite().save("/home/jovyan/models/spark-random_forest-ml-model")

In [4]:
// загружаем модель в новый объект
val sameModel = PipelineModel.load("/home/jovyan/models/spark-random_forest-ml-model")

sameModel = pipeline_d732f58e5dab


pipeline_d732f58e5dab

In [5]:
// запускаем модель
val predictionsDF = sameModel.transform(raw_sentiment)

predictionsDF.select($"label", $"probability", $"prediction").groupBy($"prediction").count.show
//.where("label = 0").show(100, 200)

+----------+-------+
|prediction|  count|
+----------+-------+
|       0.0| 218339|
|       1.0|1381661|
+----------+-------+



predictionsDF = [label: int, tweet: string ... 6 more fields]


[label: int, tweet: string ... 6 more fields]

In [6]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

getProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))

In [7]:
predictionsDF.select(getProbability($"probability").alias("clean_probability")).show

+-------------------+
|  clean_probability|
+-------------------+
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5714848214956812|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5714848214956812|
| 0.5028275172582966|
|0.32599726727588957|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5028275172582966|
| 0.5028275172582966|
+-------------------+
only showing top 20 rows



In [16]:
%lsmagic

Available line magics:
%lsmagic %showtypes %showoutput %adddeps %truncation %addjar

Available cell magics:
%%sql %%html %%javascript %%dataframe %%scala

Type %<magic_name> for usage info.
         
