# Spylon Kernel Test with Spark 3.4.0

This has been updated from Spark 2.4. I use a local SBT installation via /misc/build/0/classes. This is similar to the PySpark spark0 notebook.

This must use the same Scala version as Spark - which is 2.13 (it was 2.11).

I haven't recompiled the Scala source code in src - the artikus.spark classes.

Once a Spark context is instantiated, it should be accessible from http://j1:4040 if the host of this notebook is j1. This hostname is spark.driver.host

In [1]:
%%python
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

## Of no use for a Spylon notebook

## Configuration and Initialization of Spark

Note that we can set things like driver memory etc.

If `launcher._spark_home` is not set it will default to looking at the `SPARK_HOME` environment variable.

I run on a cluster owned by the hadoop user who is a member of my group devel.

I build new features for Scala and access them via /misc/build/0/classes. I have to restart the kernel to access any new classes. And must relaunch Spark to access changes.

I can't change the spark.sql.warehouse.dir

This loads external JARs - com.johnsnowlabs.nlp - and its dependencies with ivy. These go to .ivy2/cache.

In [2]:
%%init_spark
launcher.master = "yarn"
launcher.conf.spark.app.name = "spark-lda"
launcher.conf.spark.executor.cores = 8
launcher.num_executors = 4
launcher.executor_cores = 4
launcher.driver_memory = '4g'
launcher.conf.set("spark.driver.cores", 4);
launcher.conf.set("spark.executor.cores", 4);
launcher.conf.set("spark.executor.memory", "4g");
launcher.conf.set("spark.executor.instances", 4);
launcher.conf.set("spark.sql.warehouse.dir", "file:///home/hadoop/data/hive");
launcher.conf.set("spark.sql.catalogImplementation", "hive");
launcher.conf.set("spark.hadoop.fs.permissions.umask-mode", "002");
launcher.conf.set("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.4.2");
launcher.conf.set("spark.driver.extraClassPath", ":/misc/build/0/classes/:/usr/share/java/postgresql.jar");

## Spark Configuration

Some basic operations.


In [3]:
spark // spark is the SQL session

Intitializing Scala interpreter ...

Spark Web UI available at http://k1:8088/proxy/application_1684145519388_0001
SparkContext available as 'sc' (version = 3.4.0, master = yarn, app id = application_1684145519388_0001)
SparkSession available as 'spark'


res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@228596c2


In [3]:
spark.version

res1: String = 3.4.0


In [4]:
spark.conf.getAll foreach (x => println(x._1 + " --> " + x._2))

spark.sql.warehouse.dir --> file:/home/hadoop/data/hive
spark.hadoop.fs.permissions.umask-mode --> 002
spark.executor.extraJavaOptions --> -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false
spark.driver.host --> j1.host0
spark.serializer.objectSt

In [5]:
var dbs1 = spark.catalog.listDatabases()
dbs1.show
spark.catalog.listCatalogs().show()

+-------+-------------+--------------------+--------------------+
|   name|      catalog|         description|         locationUri|
+-------+-------------+--------------------+--------------------+
|default|spark_catalog|Default Hive data...|file:/misc/build/...|
+-------+-------------+--------------------+--------------------+

+-------------+-----------+
|         name|description|
+-------------+-----------+
|spark_catalog|       null|
+-------------+-----------+



dbs1: org.apache.spark.sql.Dataset[org.apache.spark.sql.catalog.Database] = [name: string, catalog: string ... 2 more fields]


In [6]:
val d0 = spark.catalog.listDatabases().take(1)
d0(0).locationUri

d0: Array[org.apache.spark.sql.catalog.Database] = Array(Database[name='default', catalog='spark_catalog', description='Default Hive database', path='file:/misc/build/0/spark-eg0/spark-warehouse'])
res4: String = file:/misc/build/0/spark-eg0/spark-warehouse


In [20]:
var df0 = spark.sql("show databases")
df0.show()
df0 = spark.sql("show tables")
df0.show()
df0 = spark.sql("select count(*) from finalTable")

+---------+
|namespace|
+---------+
|  default|
+---------+

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|  default|finaltable|      false|
+---------+----------+-----------+



df0: org.apache.spark.sql.DataFrame = [count(1): bigint]
df0: org.apache.spark.sql.DataFrame = [count(1): bigint]
df0: org.apache.spark.sql.DataFrame = [count(1): bigint]


## Using local Scala Builds

In [2]:
import artikus.spark.U

Intitializing Scala interpreter ...

Spark Web UI available at http://j1.host0:4040
SparkContext available as 'sc' (version = 3.4.0, master = local[*], app id = local-1684091628830)
SparkSession available as 'spark'


import artikus.spark.U


In [3]:
val cl = spark.getClass().getClassLoader()
cl.asInstanceOf[java.net.URLClassLoader].getURLs.map(x => x.toString())

java.lang.ClassCastException:  class jdk.internal.loader.ClassLoaders$AppClassLoader cannot be cast to class java.net.URLClassLoader (jdk.internal.loader.ClassLoaders$AppClassLoader and java.net.URLClassLoader are in module java.base of loader 'bootstrap')

In [4]:
// These are from the /misc/build/0/classes
U.identity
U.printClass(spark)
U.alert("hello")

class org.apache.spark.sql.SparkSession
hello


In [None]:
U.classes(spark)

In [None]:
U.flist(".")

## SparkSession operations

Basic operations
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkSession.html#createDataset

In [None]:
val strings = spark.emptyDataset[String]
strings.printSchema

In [None]:
val one = spark.createDataset(Seq(1))
one.show
one.printSchema

In [None]:
// Use an implicit requires a "spark" in the namespace.
import spark.implicits._

val one = Seq(1).toDS
one.show
one.printSchema

In [None]:
// Using spark.range()
val range0 = spark.range(start = 0, end = 4, step = 2, numPartitions = 5)
range0.show

In [None]:
// More packing

In [None]:
val sc = spark.sparkContext

In [None]:
val data = Seq("a", "b", "c", "d") zip (0 to 4)

U.printClass(data)

In [None]:
val data = Seq("foo", "bar", "baz") zip 1 :: 2 :: 3 :: Nil
val data1 = Seq("foo", "bar", "bar") zip 4 :: 5 :: 6 :: Nil

In [None]:
val ds = spark.createDataset(data)

val ds1 = sc.parallelize(data)

U.printClass(ds)
U.printClass(ds1)

val ds2 = sc.parallelize(data1)

ds1.join(ds2).take(5)

In [None]:
// Local file URI
// non-existent file loads
// /misc/build/0/prog-scala-2nd-ed-code-examples
val local2 = U.local1(".")

In [None]:
val f1 = "rev-users.csv"
val file = sc.textFile(local2(f1).toString())
U.printClass(file)

In [None]:
// This file has a header row
// Take the first row, index into it, split and return a sequence
val h2 = file.take(1)(0).split(",").toSeq

// Get the remainder by using subtract
// convert the header row back to an RDD using parallelize
val r1 = file.subtract(sc.parallelize(file.take(1)))

In [None]:
// Look at the underlying row
r1.take(1)

In [None]:
// Now map over the quantities
// The transformations are only applied when we take(), use the column names from h2.
val df0 = r1.map(_.split(",")).map{case Array(a,b,c,d,e,f,g,h,i,j,k,l) => 
(a,b.toInt,c,d,e,f.toInt,g,h,i,j.toInt,k.toInt,l.toInt)}.toDF(h2:_*)
df0.take(1)

In [None]:
val f2 = "rev-devices.csv"
val file2 = sc.textFile(local2(f2).toString())
U.printClass(file2)

In [None]:
// But error results here if file does not exist
// Or returns empty array if it is empty
val lens = file.map(s => s.length)
file.take(5)
lens.take(5)

In [None]:
val x0 = file.take(1)

// Some arbitrary file processing - append a number to each line
val pairs = file.map(s => (s, 911))
val counts = pairs.reduceByKey((a, b) => a + b)

In [None]:
val counts1 = counts.repartition(1)

U.rmdir("counts1")
counts1.saveAsTextFile(local2("counts1").toString())

In [None]:
val pairs = file.map(x => (x.split(",")(0), x))

val pairs1 = pairs.join(pairs)

In [None]:
// Make some (K, V) tuples

println(x0(0))

val x1 = x0(0).split(",").toSeq

In [None]:
val df0 = file.map(_.split(",")).map{case Array(a,b,c,d,e,f,g,h,i,j,k,l) => 
(a,b,c,d,e,f,g,h,i,j,k,l)}.toDF(x1:_*)

In [None]:
// The x1:_* is to be preferred to this

// val fileToDf = file.map(_.split(",")).map{case Array(a,b,c,d,e,f,g,h,i,j,k,l) => 
// (a,b,c,d,e,f,g,h,i,j,k,l)}.toDF("user_id", "birth_year", "country", "city", "created_date", "user_settings_crypto_unlocked", "plan", "attributes_notifications_marketing_push", "attributes_notifications_marketing_email", "num_contacts", "num_referrals", "num_successful_referrals")

In [None]:
val df0 = file.map(_.split(",")).map{case Array(a,b,c,d,e,f,g,h,i,j,k,l) => 
(a,b.toInt,c,d,e,f,g,h,i,j,k,l)}.toDF(x1:_*)

In [None]:
fileToDf.show(3)

In [None]:
file.map(_.split(",")).take(1)

In [None]:
val df1 = file.subtract(sc.parallelize(file.take(1)))

In [None]:
U.printClass(sc)

In [None]:
df1.take(1)

In [None]:
def split(f1:String, sep:String)(implicit sc: org.apache.spark.SparkContext) : org.apache.spark.rdd.RDD[String] = {
    val f = sc.textFile(f1)
    return f
}

In [None]:
split(local2(f1).toString(), ",")(sc)

In [None]:
U.printClass(sc)

## MLLib

Using LDA.

https://medium.com/analytics-vidhya/distributed-topic-modelling-using-spark-nlp-and-spark-mllib-lda-6db3f06a4da3

In [3]:
val url = "file:///a/l/X-image/cache/data/abcnews-date-text.csv"

Intitializing Scala interpreter ...

Spark Web UI available at http://k1:8088/proxy/application_1684147981528_0001
SparkContext available as 'sc' (version = 3.4.0, master = yarn, app id = application_1684147981528_0001)
SparkSession available as 'spark'


url: String = file:///a/l/X-image/cache/data/abcnews-date-text.csv


In [4]:
val type0="csv"
val infer_schema="true"
val first_row_is_header = "true"
val delimiter=","

type0: String = csv
infer_schema: String = true
first_row_is_header: String = true
delimiter: String = ,


In [5]:
val df0 = spark.read.format(type0)
  .option("inferSchema", infer_schema)
  .option("header", first_row_is_header)
  .option("sep", delimiter)
  .load(url)

df0: org.apache.spark.sql.DataFrame = [publish_date: int, headline_text: string]


In [6]:
df0.count()

res0: Long = 1082168


In [7]:
import com.johnsnowlabs.nlp.DocumentAssembler
import com.johnsnowlabs.nlp.annotators.Tokenizer
import com.johnsnowlabs.nlp.annotators.Normalizer
import com.johnsnowlabs.nlp.annotators.StopWordsCleaner
import com.johnsnowlabs.nlp.annotators.Stemmer
import com.johnsnowlabs.nlp.Finisher

import com.johnsnowlabs.nlp.DocumentAssembler
import com.johnsnowlabs.nlp.annotators.Tokenizer
import com.johnsnowlabs.nlp.annotators.Normalizer
import com.johnsnowlabs.nlp.annotators.StopWordsCleaner
import com.johnsnowlabs.nlp.annotators.Stemmer
import com.johnsnowlabs.nlp.Finisher


In [10]:
// Split sentence to tokens(array)
val document_assembler = new DocumentAssembler().setInputCol("headline_text").setOutputCol("document").setCleanupMode("shrink") 

// clean unwanted characters and garbage
val tokenizer = new Tokenizer().setInputCols(Array("document")).setOutputCol("token")

val normalizer = new Normalizer().setInputCols(Array("token")).setOutputCol("normalized")

// remove stopwords
val stopwords_cleaner = new StopWordsCleaner().setInputCols("normalized").setOutputCol("cleanTokens").setCaseSensitive(false)

// stem the words to bring them to the root form.
val stemmer = new Stemmer().setInputCols(Array("cleanTokens")).setOutputCol("stem")

// Finisher is the most important annotator. 
// Spark NLP adds its own structure when we convert each row in the dataframe to document. 
// Finisher helps us to bring back the expected structure viz. array of tokens.
val finisher = new Finisher().setInputCols(Array("stem")).setOutputCols(Array("tokens"))
    .setOutputAsArray(true).setCleanAnnotations(false)

document_assembler: com.johnsnowlabs.nlp.DocumentAssembler = document_c879f8ee263e
tokenizer: com.johnsnowlabs.nlp.annotators.Tokenizer = REGEX_TOKENIZER_84c9b7813d48
normalizer: com.johnsnowlabs.nlp.annotators.Normalizer = NORMALIZER_37d0d62bb66f
stopwords_cleaner: com.johnsnowlabs.nlp.annotators.StopWordsCleaner = STOPWORDS_CLEANER_e6ad3685cf9e
stemmer: com.johnsnowlabs.nlp.annotators.Stemmer = STEMMER_075cdcf04187
finisher: com.johnsnowlabs.nlp.Finisher = finisher_02ed352a6956


In [14]:
import org.apache.spark.ml.Pipeline

import org.apache.spark.ml.Pipeline


In [17]:
// We build a ml pipeline so that each phase can be executed in sequence. 
// This pipeline can also be used to test the model. 
// train the pipeline
val stages=Array(document_assembler, tokenizer, normalizer, stopwords_cleaner, stemmer, finisher)
val nlp_pipeline = new Pipeline().setStages(stages)

stages: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable] = Array(document_c879f8ee263e, REGEX_TOKENIZER_84c9b7813d48, NORMALIZER_37d0d62bb66f, STOPWORDS_CLEANER_e6ad3685cf9e, STEMMER_075cdcf04187, finisher_02ed352a6956)
nlp_pipeline: org.apache.spark.ml.Pipeline = pipeline_80f149cb34b5


In [19]:
//  apply the pipeline to transform dataframe.
val nlp_model = nlp_pipeline.fit(df0) 
val processed_df0  = nlp_model.transform(df0)

nlp_model: org.apache.spark.ml.PipelineModel = pipeline_80f149cb34b5
processed_df0: org.apache.spark.sql.DataFrame = [publish_date: int, headline_text: string ... 6 more fields]


In [28]:
//  nlp pipeline create intermediary columns that we dont need. So lets select the columns that we need
val tokens_df0 = processed_df0.select("publish_date","tokens").limit(10000)
tokens_df0.show()

+------------+--------------------+
|publish_date|              tokens|
+------------+--------------------+
|    20030219|[aba, decid, comm...|
|    20030219|[act, fire, wit, ...|
|    20030219|[g, call, infrast...|
|    20030219|[air, nz, staff, ...|
|    20030219|[air, nz, strike,...|
|    20030219|[ambiti, olsson, ...|
|    20030219|[antic, delight, ...|
|    20030219|[aussi, qualifi, ...|
|    20030219|[aust, address, u...|
|    20030219|[australia, lock,...|
|    20030219|[australia, contr...|
|    20030219|[barca, take, rec...|
|    20030219|[bathhous, plan, ...|
|    20030219|[big, hope, launc...|
|    20030219|[big, plan, boost...|
|    20030219|[blizzard, buri, ...|
|    20030219|[brigadi, dismiss...|
|    20030219|[british, combat,...|
|    20030219|[bryant, lead, la...|
|    20030219|[bushfir, victim,...|
+------------+--------------------+
only showing top 20 rows



tokens_df0: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [publish_date: int, tokens: array<string>]


## Features

To generate features from textual data. Latent Dirichlet Allocation requires a data-specific vocabulary to perform topic modeling.

In [29]:
import org.apache.spark.ml.feature.CountVectorizer

import org.apache.spark.ml.feature.CountVectorizer


In [34]:
val cv = new CountVectorizer().setInputCol("tokens").setOutputCol("features").setVocabSize(500).setMinTF(3.0)

// train the model
val cv_model = cv.fit(tokens_df0)
// transform the data. Output column name will be features.
val vectorized_tokens = cv_model.transform(tokens_df0)

cv: org.apache.spark.ml.feature.CountVectorizer = cntVec_91a623e3720b
cv_model: org.apache.spark.ml.feature.CountVectorizerModel = CountVectorizerModel: uid=cntVec_91a623e3720b, vocabularySize=500
vectorized_tokens: org.apache.spark.sql.DataFrame = [publish_date: int, tokens: array<string> ... 1 more field]


## Build Model


In [58]:
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.sql.Row

import org.apache.spark.ml.clustering.LDA
import org.apache.spark.sql.Row


In [37]:
val num_topics = 3
val lda = new LDA().setK(num_topics).setMaxIter(10)
val model = lda.fit(vectorized_tokens)

val ll = model.logLikelihood(vectorized_tokens)
val lp = model.logPerplexity(vectorized_tokens)

num_topics: Int = 3
lda: org.apache.spark.ml.clustering.LDA = lda_310b7cc9f6b5
model: org.apache.spark.ml.clustering.LDAModel = LocalLDAModel: uid=lda_310b7cc9f6b5, k=3, numFeatures=500
ll: Double = -443.6715451674518
lp: Double = 147.89051505581727


In [38]:
println("The lower bound on the log likelihood of the entire corpus: " + ll.toString())
println("The upper bound on perplexity: " + lp.toString())

The lower bound on the log likelihood of the entire corpus: -443.6715451674518
The upper bound on perplexity: 147.89051505581727


## Visualization

The results from the algorithm need to restructured.

In [43]:
val vocab = cv_model.vocabulary
val topics = model.describeTopics()
val topics_rdd = topics.rdd

vocab: Array[String] = Array(u, iraq, war, polic, govt, man, plan, new, sai, iraqi, council, win, fire, claim, call, charg, protest, warn, kill, report, mai, back, nsw, baghdad, urg, world, court, face, death, fund, water, anti, take, troop, get, crash, forc, cup, continu, qld, set, hope, rain, un, attack, mp, pm, hospit, meet, miss, open, concern, lead, bomb, hit, wa, ban, australia, aust, boost, two, air, final, support, group, murder, health, seek, secur, deni, vic, probe, missil, coast, farmer, car, end, welcom, consid, fight, year, investig, sa, drought, move, still, offer, elect, help, green, first, sar, union, australian, saddam, home, minist, korea, woman, dead, go, coalit, defend, oil, howard, chang, case, make, arrest, work, begin, jail, top, hous, strike, act, race, rise, com...


In [177]:
// Define the schema and make a data frame with it

import org.apache.spark.sql.types._

val schema = new StructType()
  .add(StructField("id", IntegerType, false))
  .add(StructField("indices", ArrayType(IntegerType, true)))
  .add(StructField("scores", ArrayType(DoubleType, true)))

import org.apache.spark.sql.types._
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false),StructField(indices,ArrayType(IntegerType,true),true),StructField(scores,ArrayType(DoubleType,true),true))


In [178]:
import spark.implicits._
// import org.apache.spark.sql.functions.explode

val df1 = spark.createDataFrame(topics_rdd, schema)

df1.printSchema()

root
 |-- id: integer (nullable = false)
 |-- indices: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- scores: array (nullable = true)
 |    |-- element: double (containsNull = true)



import spark.implicits._
df1: org.apache.spark.sql.DataFrame = [id: int, indices: array<int> ... 1 more field]


In [179]:
// Using the column names create a case class that has the Arrays in it
case class tab1(id: Int, indices: WrappedArray[Int], scores: WrappedArray[Double])

defined class tab1


In [183]:
// Cast the dataframe to be of that type.
val df2 = df1.as[tab1]

df2: org.apache.spark.sql.Dataset[tab1] = [id: int, indices: array<int> ... 1 more field]


In [217]:
// Then the columns can be accessed as members and with types.
// this uses the indices into the vocab 

val df3 = df2.map(x => x.indices.map(vocab).zip(x.scores)).collect.toList.map {
    _.map(x => (x._1, x._2))
}

df3: List[scala.collection.mutable.WrappedArray[(String, Double)]] = List(WrappedArray((candid,0.002735799497557659), (deni,0.0025591722001258605), (near,0.0025502052891865985), (nat,0.002516747150826247), (ga,0.002501361246567455), (develop,0.002480011336368051), (east,0.002455541382486463), (help,0.0024241943218035176), (investig,0.0024160843277695725), (leagu,0.0024157930401469477)), WrappedArray((test,0.009028235749467671), (accid,0.0025592625162857503), (franc,0.0025456931174624147), (labor,0.002532968828928002), (women,0.00250200850524915), (woman,0.002499764010434832), (candid,0.0024880345393256766), (rate,0.0024847397012931613), (council,0.002472794651023874), (crisi,0.0024608628428185626)), WrappedArray((low,0.002682624379971457), (ta,0.0025324007707322785), (trial,0.0025110323...


In [222]:
df3.map(y => { println("::"); y.map(x => println(x._1 + " :: " + x._2) ) } )

::
candid :: 0.002735799497557659
deni :: 0.0025591722001258605
near :: 0.0025502052891865985
nat :: 0.002516747150826247
ga :: 0.002501361246567455
develop :: 0.002480011336368051
east :: 0.002455541382486463
help :: 0.0024241943218035176
investig :: 0.0024160843277695725
leagu :: 0.0024157930401469477
::
test :: 0.009028235749467671
accid :: 0.0025592625162857503
franc :: 0.0025456931174624147
labor :: 0.002532968828928002
women :: 0.00250200850524915
woman :: 0.002499764010434832
candid :: 0.0024880345393256766
rate :: 0.0024847397012931613
council :: 0.002472794651023874
crisi :: 0.0024608628428185626
::
low :: 0.002682624379971457
ta :: 0.0025324007707322785
trial :: 0.0025110323104668604
championship :: 0.0025025121433745346
send :: 0.0024781076031899284
farmer :: 0.002456418441272121
question :: 0.0024508460402524797
doubl :: 0.00244639243175977
shoot :: 0.0024322606979841842
season :: 0.0024320392453472716


res138: List[scala.collection.mutable.WrappedArray[Unit]] = List(WrappedArray((), (), (), (), (), (), (), (), (), ()), WrappedArray((), (), (), (), (), (), (), (), (), ()), WrappedArray((), (), (), (), (), (), (), (), (), ()))


In [105]:
df1.createOrReplaceTempView("topics")

In [110]:
spark.sql("show tables").show
spark.sql("select count(*) from topics").show

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|  default|finaltable|      false|
|         |    topics|       true|
+---------+----------+-----------+

+--------+
|count(1)|
+--------+
|       3|
+--------+

