## Some Caveats
- The types of the columns disappear when running untyped transformations
- The names of the columns sometimes disappear partially or completely when typed transformation is used
- Missing value handling


DataFrame + Data Types (`case class`) = Dataset

## Untyped transformations 
- Example adding a new column to your Dataset, the result will be a DataFrame, even if you define the type of the new column. 

If you wanted to work within a Dataset environment, then the steps are the followings for untyped transformations:

* Have a Dataset
* Apply the function to it
* The result is a DataFrame
* Convert the result to Dataset by defining the types of the columns in a case class

##  Typed transformations
When we use typed transformation, the output is a Dataset with proper types. But if there is a change in the columns (fewer or more columns, or new columns created) then the name of the columns we see at the display will be valid only in DataFrame sense (“columnName”). The Dataset reference _.columnName won’t work in these cases, but you can refer to the columns with ._1 or ._2 etc. If you would like to have proper column names, use a case class again.



In [1]:
import org.apache.spark.sql.{Dataset, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql._

val sqlContext = spark.sqlContext

# 1. CREATING DATASET
## MANUALLY
- Define the data as a sequence.
- Convert the sequence to DataFrame and define the column names
- Define the type of the columns by a case class (using proper column names is a must)
- Convert to Dataset

In [2]:
//In this example we create a small Dataset with two columns: the first column contains the name of Star Wars 
//Characters and the second one lists the name of their friends.
val df = Seq(("Yoda",             "Obi-Wan Kenobi"),
             ("Anakin Skywalker", "Sheev Palpatine"),
             ("Luke Skywalker",   "Han Solo, Leia Skywalker"),
             ("Leia Skywalker",   "Obi-Wan Kenobi"),
             ("Sheev Palpatine",  "Anakin Skywalker"),
             ("Han Solo",         "Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi, Chewbacca"),
             ("Obi-Wan Kenobi",   "Yoda, Qui-Gon Jinn"),
             ("R2-D2",            "C-3PO"),
             ("C-3PO",            "R2-D2"),
             ("Darth Maul",       "Sheev Palpatine"),
             ("Chewbacca",        "Han Solo"),
             ("Lando Calrissian", "Han Solo"),
             ("Jabba",            "Boba Fett")
            ).toDF("name", "friends")

In [3]:
df.show()

+----------------+--------------------+
|            name|             friends|
+----------------+--------------------+
|            Yoda|      Obi-Wan Kenobi|
|Anakin Skywalker|     Sheev Palpatine|
|  Luke Skywalker|Han Solo, Leia Sk...|
|  Leia Skywalker|      Obi-Wan Kenobi|
| Sheev Palpatine|    Anakin Skywalker|
|        Han Solo|Leia Skywalker, L...|
|  Obi-Wan Kenobi|  Yoda, Qui-Gon Jinn|
|           R2-D2|               C-3PO|
|           C-3PO|               R2-D2|
|      Darth Maul|     Sheev Palpatine|
|       Chewbacca|            Han Solo|
|Lando Calrissian|            Han Solo|
|           Jabba|           Boba Fett|
+----------------+--------------------+



In [4]:
df.dtypes

Array((name,StringType), (friends,StringType))

In [5]:
case class Friends(name: String, friends: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) //Ensure case class is avaiable in other cells

In [6]:
val friends_ds = df.as[Friends]

In [7]:
friends_ds.show()

+----------------+--------------------+
|            name|             friends|
+----------------+--------------------+
|            Yoda|      Obi-Wan Kenobi|
|Anakin Skywalker|     Sheev Palpatine|
|  Luke Skywalker|Han Solo, Leia Sk...|
|  Leia Skywalker|      Obi-Wan Kenobi|
| Sheev Palpatine|    Anakin Skywalker|
|        Han Solo|Leia Skywalker, L...|
|  Obi-Wan Kenobi|  Yoda, Qui-Gon Jinn|
|           R2-D2|               C-3PO|
|           C-3PO|               R2-D2|
|      Darth Maul|     Sheev Palpatine|
|       Chewbacca|            Han Solo|
|Lando Calrissian|            Han Solo|
|           Jabba|           Boba Fett|
+----------------+--------------------+



## MISSING VALUES

In [8]:
case class Friends_Missing(Who: String, friends: Option[String])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

In [9]:
val ds_missing = Seq( 
                      ("Yoda",             Some("Obi-Wan Kenobi")),
                      ("Anakin Skywalker", Some("Sheev Palpatine")),
                      ("Luke Skywalker",   None),
                      ("Leia Skywalker",   Some("Obi-Wan Kenobi")),
                      ("Sheev Palpatine",  Some("Anakin Skywalker")),
                      ("Han Solo",         Some("Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi"))
                    ).toDF("Who", "friends").as[Friends_Missing]

In [36]:
ds_missing.show()

+----------------+--------------------+
|             Who|             friends|
+----------------+--------------------+
|            Yoda|      Obi-Wan Kenobi|
|Anakin Skywalker|     Sheev Palpatine|
|  Luke Skywalker|                null|
|  Leia Skywalker|      Obi-Wan Kenobi|
| Sheev Palpatine|    Anakin Skywalker|
|        Han Solo|Leia Skywalker, L...|
+----------------+--------------------+



## READING FROM CSV

The steps of the csv reading:

- Define the names and the types of the columns in a case class. Note that the names of the columns must be identical with the colum names in the header of the file!
- Read the csv into a DataFrame
- Convert into Dataset

The result of the read is a DataFrame and as we have seen earlier, the .as[Characters] at the end of the expressions converts it to Dataset.

In [1]:
%%pyspark
import os
os.system("wget https://www.balabit.com/blog/wp-content/uploads/2016/12/StarWars.csv")

--2017-10-26 23:22:25--  https://www.balabit.com/blog/wp-content/uploads/2016/12/StarWars.csv
Resolving www.balabit.com (www.balabit.com)... 52.16.164.233, 52.212.217.223
Connecting to www.balabit.com (www.balabit.com)|52.16.164.233|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 995 [application/octet-stream]
Saving to: ‘StarWars.csv’

     0K                                                       100% 85.3M=0s

2017-10-26 23:22:26 (85.3 MB/s) - ‘StarWars.csv’ saved [995/995]



0

In [12]:
case class Characters(name: String, 
                      height: Integer, 
                      weight: Option[Integer], 
                      eyecolor: Option[String], 
                      haircolor: Option[String], 
                      jedi: String,
                      species: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

In [13]:
val characters_ds: Dataset[Characters] = sqlContext.
                                                read.
                                                option("header", "true").
                                                option("delimiter", ";").
                                                option("inferSchema", "true").
                                                csv("StarWars.csv").
                                                as[Characters]

Some explanation to the read function:

- option(“header”, “true”) – the column names are defined in the first row of the file
- option(“delimiter”, “;”) – the delimiter is the ;
- option(“inferSchema”, “true”) – detect the column types automatically. The schema could also be given manually (see in the Subsidiary comment below).

In [14]:
characters_ds.show()

+----------------+------+------+--------+---------+-------+-----------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|
+----------------+------+------+--------+---------+-------+-----------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|
|   Padme Amidala|   165|    45|   brown|    brown|no_jedi|      human|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|
|    Qui-Gon Jinn|   193|    89|    blue|    brown|   jedi|      human|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|
|           C-3PO|   167|    75|    null|     null|no_jedi|      droid|
|            Yoda|    66|    17|   brown|    brown|   jedi|     

In [15]:
characters_ds.dtypes

Array((name,StringType), (height,IntegerType), (weight,IntegerType), (eyecolor,StringType), (haircolor,StringType), (jedi,StringType), (species,StringType))

In [16]:
characters_ds.filter(x => x.eyecolor == "brown").show()

+----+------+------+--------+---------+----+-------+
|name|height|weight|eyecolor|haircolor|jedi|species|
+----+------+------+--------+---------+----+-------+
+----+------+------+--------+---------+----+-------+



Doesnt work!  
This is because of `Option()` type in `case class`  
Although we don’t see which columns are Option types, we have to keep it in mind when working with them later.

In [17]:
characters_ds.filter(x => x.eyecolor == Some("brown")).show()

+----------------+------+------+--------+---------+-------+-------+
|            name|height|weight|eyecolor|haircolor|   jedi|species|
+----------------+------+------+--------+---------+-------+-------+
|   Padme Amidala|   165|    45|   brown|    brown|no_jedi|  human|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|  human|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|  human|
|            Yoda|    66|    17|   brown|    brown|   jedi|   yoda|
|           Dooku|   193|    86|   brown|    brown|   jedi|  human|
|Lando Calrissian|   178|    79|   brown|    blank|no_jedi|  human|
|       Boba Fett|   183|    78|   brown|    black|no_jedi|  human|
|      Jango Fett|   183|    79|   brown|    black|no_jedi|  human|
+----------------+------+------+--------+---------+-------+-------+



What if we have not considered null values for non-string type column? Lets see it!

In [18]:

case class Characters_BadType(name: String,
                      height: Integer, 
                      weight: Integer, 
                      eyecolor: String, 
                      haircolor: String, 
                      jedi: String,
                      species: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

In [20]:
val characters_BadType_ds: Dataset[Characters_BadType] = sqlContext.
                                                              read.
                                                              option("header", "true").
                                                              option("delimiter", ";").
                                                              option("inferSchema", "true").
                                                              csv("StarWars.csv").
                                                              as[Characters_BadType]

In [21]:
characters_BadType_ds.show()

+----------------+------+------+--------+---------+-------+-----------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|
+----------------+------+------+--------+---------+-------+-----------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|
|   Padme Amidala|   165|    45|   brown|    brown|no_jedi|      human|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|
|    Qui-Gon Jinn|   193|    89|    blue|    brown|   jedi|      human|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|
|           C-3PO|   167|    75|    null|     null|no_jedi|      droid|
|            Yoda|    66|    17|   brown|    brown|   jedi|     

In [22]:
val characters_BadType_ds2 = characters_BadType_ds.filter(x=> x.jedi=="no_jedi")

In [23]:
characters_BadType_ds2.show()

+----------------+------+------+--------+---------+-------+-----------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|
+----------------+------+------+--------+---------+-------+-----------+
|   Padme Amidala|   165|    45|   brown|    brown|no_jedi|      human|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|
|           C-3PO|   167|    75|    null|     null|no_jedi|      droid|
|      Darth Maul|   175|    80|  yellow|     none|no_jedi|dathomirian|
|       Chewbacca|   228|   112|    blue|    brown|no_jedi|    wookiee|
|           Jabba|   390|  null|  yellow|     none|no_jedi|       hutt|
|Lando Calrissian|   178|    79|   brown|    blank|no_jedi|      human|
|       Boba Fett|   183|    78|   brown|    black|no_jedi|     

In [24]:
characters_BadType_ds2.filter(x=> x.haircolor=="brown").show()

+--------------+------+------+--------+---------+-------+-------+
|          name|height|weight|eyecolor|haircolor|   jedi|species|
+--------------+------+------+--------+---------+-------+-------+
| Padme Amidala|   165|    45|   brown|    brown|no_jedi|  human|
|Leia Skywalker|   150|    49|   brown|    brown|no_jedi|  human|
|      Han Solo|   180|    80|   brown|    brown|no_jedi|  human|
|     Chewbacca|   228|   112|    blue|    brown|no_jedi|wookiee|
+--------------+------+------+--------+---------+-------+-------+



**Error expected**

In [25]:
characters_BadType_ds2.filter(x=> x.weight>79).show()


Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost, executor driver): java.lang.NullPointerException
	at scala.Predef$.Integer2int(Predef.scala:362)
	at $line74.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:43)
	at $line74.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:43)
	at org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters$$anonfun$org$apache$spark$sql$catalyst$optimizer$CombineTypedFilters$$combineFilterFunction$4.apply(objects.scala:96)
	at org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters$$anonfun$org$apache$spark$sql$catalyst$optimizer$CombineTypedFilters$$combineFilterFunction$4.apply(objects.scala:95)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(Buff

In [26]:
characters_BadType_ds2.filter(x=> x.weight!=null && x.weight>79).show()

+----------+------+------+--------+---------+-------+-----------+
|      name|height|weight|eyecolor|haircolor|   jedi|    species|
+----------+------+------+--------+---------+-------+-----------+
|  Han Solo|   180|    80|   brown|    brown|no_jedi|      human|
|Darth Maul|   175|    80|  yellow|     none|no_jedi|dathomirian|
| Chewbacca|   228|   112|    blue|    brown|no_jedi|    wookiee|
+----------+------+------+--------+---------+-------+-----------+



The conslusion here is that if you can not trust a column has all the values defined then it is safer to use Option in the case class to handle missing values. Use types without Option[] only for columns where it is 100% sure that no missing values can appear (applies for numeric, string or all other types as well).

### DEFINING THE SCHEMA OF THE DATAFRAME MANUALLY
This part is related to DataFrames rather than Datasets.
When reading the csv file into a DataFrame, we can define the schema manually. We can get the idea that we could control (or detect) missing values during the reading process if we use nullable=false in the schema. Let’s try this. The first step is to create the schema manually by defining the column names, types and whether nullable is true or false. Before creating the schema import some types.

In [27]:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

In [28]:
val DF_schema = StructType(Array(
  StructField("name", StringType, false),
  StructField("height", IntegerType, false),
  StructField("weight", IntegerType, false),
  StructField("eyecolor", StringType, false),
  StructField("haircolor", StringType, false),
  StructField("jedi", StringType, false),
  StructField("species", StringType, false)))

In [29]:
DF_schema.printTreeString

root
 |-- name: string (nullable = false)
 |-- height: integer (nullable = false)
 |-- weight: integer (nullable = false)
 |-- eyecolor: string (nullable = false)
 |-- haircolor: string (nullable = false)
 |-- jedi: string (nullable = false)
 |-- species: string (nullable = false)



In [32]:
val characters1_df = sqlContext.
  read.
  format("com.databricks.spark.csv").
  option("header", "true").
  option("delimiter", ";").
  schema(DF_schema).
  csv("StarWars.csv")

In [33]:
characters1_df.show()

+----------------+------+------+--------+---------+-------+-----------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|
+----------------+------+------+--------+---------+-------+-----------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|
|   Padme Amidala|   165|    45|   brown|    brown|no_jedi|      human|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|
|    Qui-Gon Jinn|   193|    89|    blue|    brown|   jedi|      human|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|
|           C-3PO|   167|    75|    null|     null|no_jedi|      droid|
|            Yoda|    66|    17|   brown|    brown|   jedi|     

In [34]:
characters1_df.printSchema

root
 |-- name: string (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- eyecolor: string (nullable = true)
 |-- haircolor: string (nullable = true)
 |-- jedi: string (nullable = true)
 |-- species: string (nullable = true)



Did you notice? **nullbale** in DataFrame has become **true** 

In [35]:
characters1_df.filter($"weight"<75).show()

+--------------+------+------+--------+---------+-------+-------+
|          name|height|weight|eyecolor|haircolor|   jedi|species|
+--------------+------+------+--------+---------+-------+-------+
| Padme Amidala|   165|    45|   brown|    brown|no_jedi|  human|
|Leia Skywalker|   150|    49|   brown|    brown|no_jedi|  human|
|         R2-D2|    96|    32|    null|     null|no_jedi|  droid|
|          Yoda|    66|    17|   brown|    brown|   jedi|   yoda|
+--------------+------+------+--------+---------+-------+-------+



# 2. JOINING DATASETS

We work further with the following two Datasets: the first one called friends_ds created manually and a second one called characters_ds which was read in from the csv file. Let’s join them by the name of the characters.

**Inner join**  
If we use inner join, then the result table will contain the keys included in both Datasets.
Unfortunately the default syntax of join in Spark keeps the key fields from both Datasets. Thus having “name” column in both Datsets results in a DataFrame having two identical columns with identical names, and it is impossible to work with them later on, as we get the following error:
Reference ‘name’ is ambiguous

In [37]:
val bad_join_df = characters_ds.join(friends_ds, characters_ds.col("name") === friends_ds.col("name"))

In [38]:
bad_join_df.show()

+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|            name|             friends|
+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|Anakin Skywalker|     Sheev Palpatine|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|  Luke Skywalker|Han Solo, Leia Sk...|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|  Leia Skywalker|      Obi-Wan Kenobi|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Obi-Wan Kenobi|  Yoda, Qui-Gon Jinn|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|        Han Solo|Leia Skywalker, L...|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human| Sheev Palpatine|    Anakin Skywalker|
|         

**!!! Did you notice?** two `name` column

In [40]:
bad_join_df.select($"name")

Name: org.apache.spark.sql.AnalysisException
Message: Reference 'name' is ambiguous, could be: name#50, name#5.;
StackTrace: org.apache.spark.sql.AnalysisException: Reference 'name' is ambiguous, could be: name#50, name#5.;
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:171)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$6$$anonfun$39.apply(Analyzer.scala:868)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$6$$anonfun$39.apply(Analyzer.scala:868)
  at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$6.applyOrElse(Analyzer.scala:868)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$Resolv

The solution to the problem above is to use Seq(“name”) in case the keys have the same appelation in your Datasets.

In [42]:
val sw_df = characters_ds.join(friends_ds, Seq("name"))

In [43]:
sw_df.show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|               C-3PO|
|           C-3PO|   167|    75|    null|     null|no_jedi|      droid

Although we created an inner join of two Datasets, thus the column types were all defined, the result of the join is a DataFrame.
In order to get a Dataset again, create a case class for the names and the types of the joined data and convert the DataFrame to Dataset.

In [45]:
case class SW(name: String,
              height: Integer,
              weight: Option[Integer],
              eyecolor: Option[String],
              haircolor: Option[String],
              jedi: String,
              species: String,
              friends: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

In [46]:
val sw_ds = sw_df.as[SW]

In [47]:
sw_ds.show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|               C-3PO|
|           C-3PO|   167|    75|    null|     null|no_jedi|      droid

#### Other joins
If we have to keep all the keys from one of the Datasets we can use “left_outer” or “right_outer” properly.

In [48]:
characters_ds.join(ds_missing, characters_ds.col("name") === ds_missing.col("Who"), "left_outer").show()

+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             Who|             friends|
+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|Anakin Skywalker|     Sheev Palpatine|
|   Padme Amidala|   165|    45|   brown|    brown|no_jedi|      human|            null|                null|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|  Luke Skywalker|                null|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|  Leia Skywalker|      Obi-Wan Kenobi|
|    Qui-Gon Jinn|   193|    89|    blue|    brown|   jedi|      human|            null|                null|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|            null|                null|
|        H

## 3. SELECTING COLUMNS
The first surprise was how overcomplicated is to select some columns from a Dataset. We already know the name and the type of the columns, but still if we need a part of the Dataset columns then the names or the types should be defined again. Let’s see the possibilities:

- If we use map, then the result is a Dataset so the column types are inherited but the column names are lost.
- If we use select and the column names, then the result is a DataFrame, so the type of the columns are lost.
- If we use select and provide the column names AND the column types, then the result is a Dataset with seemingly proper column names and proper types.

    - map : Dataset -> Dataset - Column Names
    - select + column names -> DataFrame - Column Types
    - select + column names & types -> Dataset

In [49]:
sw_ds.map(x => (x.name, x.weight))

[_1: string, _2: int]

In [50]:
sw_ds.map(x => (x.name, x.weight)).show()

+----------------+----+
|              _1|  _2|
+----------------+----+
|Anakin Skywalker|  84|
|  Luke Skywalker|  77|
|  Leia Skywalker|  49|
|  Obi-Wan Kenobi|  77|
|        Han Solo|  80|
| Sheev Palpatine|  75|
|           R2-D2|  32|
|           C-3PO|  75|
|            Yoda|  17|
|      Darth Maul|  80|
|       Chewbacca| 112|
|           Jabba|null|
|Lando Calrissian|  79|
+----------------+----+



In [51]:
sw_ds.select("name", "weight")

[name: string, weight: int]

In [52]:
sw_ds.select("name", "weight").show()

+----------------+------+
|            name|weight|
+----------------+------+
|Anakin Skywalker|    84|
|  Luke Skywalker|    77|
|  Leia Skywalker|    49|
|  Obi-Wan Kenobi|    77|
|        Han Solo|    80|
| Sheev Palpatine|    75|
|           R2-D2|    32|
|           C-3PO|    75|
|            Yoda|    17|
|      Darth Maul|    80|
|       Chewbacca|   112|
|           Jabba|  null|
|Lando Calrissian|    79|
+----------------+------+



In [53]:
sw_ds.select($"name".as[String], $"weight".as[Integer])

[name: string, weight: int]

In [54]:
sw_ds.select($"name".as[String], $"weight".as[Integer]).show()

+----------------+------+
|            name|weight|
+----------------+------+
|Anakin Skywalker|    84|
|  Luke Skywalker|    77|
|  Leia Skywalker|    49|
|  Obi-Wan Kenobi|    77|
|        Han Solo|    80|
| Sheev Palpatine|    75|
|           R2-D2|    32|
|           C-3PO|    75|
|            Yoda|    17|
|      Darth Maul|    80|
|       Chewbacca|   112|
|           Jabba|  null|
|Lando Calrissian|    79|
+----------------+------+



This last solution seems to work well but it has two problems:

- 1.The result is a Dataset[(String, Integer)]. Despite seeing the column names in the display these names are valid only if we use the Dataset as a DataFrame. So we can refer to the columns as “weight” in the untyped expressions (for example .select(“weight”) ), but we can not use the column names in typed expresions where _.weight is needed. For example using gropupByKey(_.weight) or .map(x=> x.weight) after this selection step will result in the following error:
error: value weight is not a member of (String, Integer)
Instead of the column names we can refer to the columns in typed operation as ._1 or ._2. So although the names are inherited in a DataFrame sense they were lost in the Dataset sense. (Does it make sense?)
- 2.When defining “weight”.as[Integer] we can not use “weight”.as[Option[Integer]] and this could lead us to the NullPointerException because there is a missing value in that column for example by using filter(x=> x._2 > 79)

Either way the select is executed you will end up creating a proper case class. We can correct all 3 ways easily by using a new case class:

In [55]:
case class NameWeight(name: String, weight: Option[Integer])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

In [57]:

//1. corrected
sw_ds.map(x => NameWeight(x.name, x.weight))

[name: string, weight: int]

In [58]:
sw_ds.map(x => NameWeight(x.name, x.weight)).show()

+----------------+------+
|            name|weight|
+----------------+------+
|Anakin Skywalker|    84|
|  Luke Skywalker|    77|
|  Leia Skywalker|    49|
|  Obi-Wan Kenobi|    77|
|        Han Solo|    80|
| Sheev Palpatine|    75|
|           R2-D2|    32|
|           C-3PO|    75|
|            Yoda|    17|
|      Darth Maul|    80|
|       Chewbacca|   112|
|           Jabba|  null|
|Lando Calrissian|    79|
+----------------+------+



In [59]:
//2. corrected
sw_ds.select("name", "weight").as[NameWeight]

[name: string, weight: int]

In [60]:
sw_ds.select("name", "weight").as[NameWeight].show()

+----------------+------+
|            name|weight|
+----------------+------+
|Anakin Skywalker|    84|
|  Luke Skywalker|    77|
|  Leia Skywalker|    49|
|  Obi-Wan Kenobi|    77|
|        Han Solo|    80|
| Sheev Palpatine|    75|
|           R2-D2|    32|
|           C-3PO|    75|
|            Yoda|    17|
|      Darth Maul|    80|
|       Chewbacca|   112|
|           Jabba|  null|
|Lando Calrissian|    79|
+----------------+------+



In [61]:
//3. corrected
sw_ds.select($"name".as[String], $"weight".as[Integer]).as[NameWeight]

[name: string, weight: int]

In [62]:
sw_ds.select($"name".as[String], $"weight".as[Integer]).as[NameWeight].show()

+----------------+------+
|            name|weight|
+----------------+------+
|Anakin Skywalker|    84|
|  Luke Skywalker|    77|
|  Leia Skywalker|    49|
|  Obi-Wan Kenobi|    77|
|        Han Solo|    80|
| Sheev Palpatine|    75|
|           R2-D2|    32|
|           C-3PO|    75|
|            Yoda|    17|
|      Darth Maul|    80|
|       Chewbacca|   112|
|           Jabba|  null|
|Lando Calrissian|    79|
+----------------+------+



## 4. RENAMING COLUMNS
By renaming some of the columns we get a DataFrame. (At least I could not find a column renamer function producing a Dataset.)

- If we use withColumnRenamed then we can rename the columns one-by-one, the result is a DataFrame.
- We can convert the Dataset to DataFrame and define all new column names in one step and the result is obviously a DataFrame.  

It doesn’t matter which way is used to rename the columns, the result is a DataFrame. Finally we have to create a case class for the new column names and types and convert the DataFrame to Dataset.

In [63]:
sw_ds.withColumnRenamed("name", "Name")

[Name: string, height: int ... 6 more fields]

In [64]:
sw_ds.withColumnRenamed("name", "Name").show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+
|            Name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|               C-3PO|
|           C-3PO|   167|    75|    null|     null|no_jedi|      droid

The output shows that the result is a DataFrame.
We can rename more columns by chianing this function.

In [66]:
sw_ds.withColumnRenamed("name", "Who").withColumnRenamed("jedi", "Religion").show()

+----------------+------+------+--------+---------+--------+-----------+--------------------+
|             Who|height|weight|eyecolor|haircolor|Religion|    species|             friends|
+----------------+------+------+--------+---------+--------+-----------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|    jedi|      human|     Sheev Palpatine|
|  Luke Skywalker|   172|    77|    blue|    blond|    jedi|      human|Han Solo, Leia Sk...|
|  Leia Skywalker|   150|    49|   brown|    brown| no_jedi|      human|      Obi-Wan Kenobi|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|    jedi|      human|  Yoda, Qui-Gon Jinn|
|        Han Solo|   180|    80|   brown|    brown| no_jedi|      human|Leia Skywalker, L...|
| Sheev Palpatine|   173|    75|    blue|      red| no_jedi|      human|    Anakin Skywalker|
|           R2-D2|    96|    32|    null|     null| no_jedi|      droid|               C-3PO|
|           C-3PO|   167|    75|    null|     null| no_jedi|

If we would like to rename all the columns, then a shorter way could be to convert the Dataset into a DataFrame by .toDF and then define the new column names.

The column names in the case class are not case sensitive. If you changed only upper case – lower case pairs in the column names, then your original case class should still work. But if the new column names are different in at least one letter, then a new case class definition is needed with proper column names.

In [67]:
sw_ds.toDF(Seq("Name", "Height", "Weight", "Eyecolor", "Haircolor", "Jedi", "Species", "Friends"): _*).as[SW]

[Name: string, Height: int ... 6 more fields]

In [69]:
sw_ds.toDF(Seq("Name", "Height", "Weight", "Eyecolor", "Haircolor", "Jedi", "Species", "Friends"): _*).as[SW].show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+
|            Name|Height|Weight|Eyecolor|Haircolor|   Jedi|    Species|             Friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|               C-3PO|
|           C-3PO|   167|    75|    null|     null|no_jedi|      droid

In [70]:
// sw_ds.toDF(Seq("WHO", "Height", "Weight", "Eyecolor", "Haircolor", "Jedi", "Species", "Friends"): _*).as[S

Name: Syntax Error.
Message: 
StackTrace: 

In [71]:
case class SW2(WHO: String,
               height: Integer, 
               weight: Option[Integer], 
               eyecolor: Option[String], 
               haircolor: Option[String], 
               jedi: String,
               species: String,
               friends: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

In [72]:
sw_ds.toDF(Seq("WHO", "Height", "Weight", "Eyecolor", "Haircolor", "Jedi", "Species", "Friends"): _*).as[SW2]

[WHO: string, Height: int ... 6 more fields]

In [73]:
sw_ds.toDF(Seq("WHO", "Height", "Weight", "Eyecolor", "Haircolor", "Jedi", "Species", "Friends"): _*).as[SW2].show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+
|             WHO|Height|Weight|Eyecolor|Haircolor|   Jedi|    Species|             Friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|               C-3PO|
|           C-3PO|   167|    75|    null|     null|no_jedi|      droid

# 5. ADDING NEW COLUMNS
There are several ways to add new columns to your Dataset based on what kind of column is created. I show the main types with examples. Independently of how you added the new column and whether the type was defined or not, the result will be a DataFrame. So if you need a Dataset output, then define a proper case class and convert the DataFrame into a Dataset.

### CONSTANT COLUMN
Adding a constant column is easy. Use the withColumn function and provide the name of the new column and the lit() with the value inside the brackets. The result is a DataFrame even if you define the type of the new colmn by sw_ds.withColumn(“count”, lit(1).as[Integer])

In [74]:
sw_ds.withColumn("count", lit(1))

[name: string, height: int ... 7 more fields]

In [75]:
sw_ds.withColumn("count", lit(1)).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|count|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|    1|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|    1|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|    1|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|    1|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|    1|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|    1|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|               C-3PO|    1|
|         

### EXPRESSION1 – TYPE1
In these expressions the function needs only one string of input, so we can simply use “colum_name”. In the example I calculated the logarithm of the weight of each character.`

In [76]:
sw_ds.withColumn("log_weight", log("weight"))

[name: string, height: int ... 7 more fields]

In [77]:
sw_ds.withColumn("log_weight", log("weight")).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|        log_weight|
+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine| 4.430816798843313|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...| 4.343805421853684|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|3.8918202981106265|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn| 4.343805421853684|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...| 4.382026634673881|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|  4.31748811

### EXPRESSION2 – TYPE2
In case when the transformation needs more than a string input, we have to use dataset_name(“colum_name”) when referring to a column of the original Dataset.

For example we can calculate the Body mass index of the characters.

In [78]:
sw_ds.withColumn("BMI", sw_ds("weight")/(sw_ds("height")*sw_ds("height")/10000))

[name: string, height: int ... 7 more fields]

In [79]:
sw_ds.withColumn("BMI", sw_ds("weight")/(sw_ds("height")*sw_ds("height")/10000)).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|               BMI|
+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine| 23.76641014033499|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|  26.0275824770146|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi| 21.77777777777778|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|23.245984784446325|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|24.691358024691358|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker| 25.05930702

### USER DEFINED FUNCTION
Finally if using expressions originally not defined for columns but for primitive types like Integer or String, then we have to create user defined functions aka UDFs.

The example I show creates a column containing a tuple made from two columns in the Dataset. We can create tuples from primitive type items, so we define an UDF and then use it on columns.

In [80]:
import scala.reflect.runtime.universe.TypeTag
def createTuple2[Type_x: TypeTag, Type_y: TypeTag] = udf[(Type_x, Type_y), Type_x, Type_y]((x: Type_x, y: Type_y) => (x, y))

In [81]:
sw_ds.withColumn("Jedi_Species", createTuple2[String, String].apply(sw_ds("jedi"), sw_ds("species")))

[name: string, height: int ... 7 more fields]

In [82]:
sw_ds.withColumn("Jedi_Species", createTuple2[String, String].apply(sw_ds("jedi"), sw_ds("species"))).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|        Jedi_Species|
+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|        [jedi,human]|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|        [jedi,human]|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|     [no_jedi,human]|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|        [jedi,human]|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|     [no_jedi,human]|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywal

We can create tuple column from columns with missing values as well.

In [83]:
sw_ds.withColumn("Name_Weight", createTuple2[String, Option[Integer]].apply(sw_ds("name"), sw_ds("weight")))

[name: string, height: int ... 7 more fields]

In [84]:
sw_ds.withColumn("Name_Weight", createTuple2[String, Option[Integer]].apply(sw_ds("name"), sw_ds("weight"))).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|         Name_Weight|
+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|[Anakin Skywalker...|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...| [Luke Skywalker,77]|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi| [Leia Skywalker,49]|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn| [Obi-Wan Kenobi,77]|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|       [Han Solo,80]|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywal

# 6. FILTERING ROWS
By filtering we can get a part of the rows of the Dataset. The good news is that the names and the types of the columns do not change at all, so the result of a filter is always a Dataset with proper column names. But we have to be very careful when working with columns containing missing values. In the filter function we have to define how to filter the defined values and how to filter the missing values. Let’s see exmaples.
Filter a string column with no missing values: select the humans in the Dataset.

In [85]:
sw_ds.filter(x => x.species=="human")

[name: string, height: int ... 6 more fields]

In [86]:
sw_ds.filter(x => x.species=="human").show()

+----------------+------+------+--------+---------+-------+-------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|species|             friends|
+----------------+------+------+--------+---------+-------+-------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|  human|     Sheev Palpatine|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|  human|Han Solo, Leia Sk...|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|  human|      Obi-Wan Kenobi|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|  human|  Yoda, Qui-Gon Jinn|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|  human|Leia Skywalker, L...|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|  human|    Anakin Skywalker|
|Lando Calrissian|   178|    79|   brown|    blank|no_jedi|  human|            Han Solo|
+----------------+------+------+--------+---------+-------+-------+--------------------+



The same syntax could be used for eyecolor (which contains missing values) without getting error or warning. But the result is empty, although there are characters with brown eyes.

In [87]:
sw_ds.filter(x => x.eyecolor== "brown").show()

+----+------+------+--------+---------+----+-------+-------+
|name|height|weight|eyecolor|haircolor|jedi|species|friends|
+----+------+------+--------+---------+----+-------+-------+
+----+------+------+--------+---------+----+-------+-------+



The reason is that == operation works on different types, but a string value won’t be equal to any option value as they are represented by Some(value). There are two ways to handle the situation:

- Use Some(value) in the filter
- use .getOrElse() function and define what should be returned in case of missing values. In the exmple I use .getOrElse(“”) which provides the value if it was defined or an empty string if there was missing value in the record

So let’s see both ways:

In [88]:
sw_ds.filter(x => x.eyecolor == Some("brown")).show()

+----------------+------+------+--------+---------+-------+-------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|species|             friends|
+----------------+------+------+--------+---------+-------+-------+--------------------+
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|  human|      Obi-Wan Kenobi|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|  human|Leia Skywalker, L...|
|            Yoda|    66|    17|   brown|    brown|   jedi|   yoda|      Obi-Wan Kenobi|
|Lando Calrissian|   178|    79|   brown|    blank|no_jedi|  human|            Han Solo|
+----------------+------+------+--------+---------+-------+-------+--------------------+



In [89]:
sw_ds.filter(x => x.eyecolor.getOrElse("") == "brown").show()

+----------------+------+------+--------+---------+-------+-------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|species|             friends|
+----------------+------+------+--------+---------+-------+-------+--------------------+
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|  human|      Obi-Wan Kenobi|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|  human|Leia Skywalker, L...|
|            Yoda|    66|    17|   brown|    brown|   jedi|   yoda|      Obi-Wan Kenobi|
|Lando Calrissian|   178|    79|   brown|    blank|no_jedi|  human|            Han Solo|
+----------------+------+------+--------+---------+-------+-------+--------------------+



Filtering numeric columns without missing values works as expected: filter charactes whose height is less than 100 cm.

In [90]:
sw_ds.filter(x =>  x.height<100).show()

+-----+------+------+--------+---------+-------+-------+--------------+
| name|height|weight|eyecolor|haircolor|   jedi|species|       friends|
+-----+------+------+--------+---------+-------+-------+--------------+
|R2-D2|    96|    32|    null|     null|no_jedi|  droid|         C-3PO|
| Yoda|    66|    17|   brown|    brown|   jedi|   yoda|Obi-Wan Kenobi|
+-----+------+------+--------+---------+-------+-------+--------------+



If there might be missing values in a numeric column (for example the type is Option[Integer]) then the syntax above gives an error.   
sw_ds.filter(x => x.weight >=79)   
would end in   
…error: value >= is not a member of Option[Integer] …   

The solution is to use pattern matching and define explicitly the filter for Some() values and for None (missing) values.   



In [91]:
sw_ds.filter(x => x.weight match {case Some(y) => y>=79
                                  case None => false} ).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|
|      Darth Maul|   175|    80|  yellow|     none|no_jedi|dathomirian|     Sheev Palpatine|
|       Chewbacca|   228|   112|    blue|    brown|no_jedi|    wookiee|            Han Solo|
|Lando Calrissian|   178|    79|   brown|    blank|no_jedi|      human|            Han Solo|
+----------------+------+------+--------+---------+-------+-----------+--------------------+



# 7. GROUPBY AND AGGREGATING
Calculate a function (mean, min, max etc.) of numeric colums by groups defined in a key column. The syntax is as expected but we have to define the type of the result columns in the aggregation function.

- the key for groupby is given in: groupByKey(_.columnname)
- the aggregation functions are given in .agg( function_name1(“columnName1”).as[new_type1], function_name2(“columnName2”).as[new_type2] )

We can define several aggregation functions for different columns withinin one aggregation.

In [92]:
sw_ds.groupByKey(_.species).agg(max($"height").as[Integer], min($"height").as[Integer], mean($"weight").as[Double], count($"species").as[Long] )

[value: string, max(height): int ... 3 more fields]

In [93]:
sw_ds.groupByKey(_.species).agg(max($"height").as[Integer], min($"height").as[Integer], mean($"weight").as[Double], count($"species").as[Long] ).show()

+-----------+-----------+-----------+-----------------+--------------+
|      value|max(height)|min(height)|      avg(weight)|count(species)|
+-----------+-----------+-----------+-----------------+--------------+
|       hutt|        390|        390|             null|             1|
|      human|        188|        150|74.42857142857143|             7|
|dathomirian|        175|        175|             80.0|             1|
|       yoda|         66|         66|             17.0|             1|
|    wookiee|        228|        228|            112.0|             1|
|      droid|        167|         96|             53.5|             2|
+-----------+-----------+-----------+-----------------+--------------+



The same works for columns with missing values. Jabba was not included in the calculation as his weight is not known.

In [94]:
sw_ds.groupByKey(_.eyecolor).agg(mean($"weight").as[Double])

[key: struct<value: string>, avg(weight): double]

In [95]:
sw_ds.groupByKey(_.eyecolor).agg(mean($"weight").as[Double]).show()

+----------+-----------+
|       key|avg(weight)|
+----------+-----------+
|  [yellow]|       80.0|
|    [null]|       53.5|
|[bluegray]|       77.0|
|   [brown]|      56.25|
|    [blue]|       87.0|
+----------+-----------+



The key can contain missing values and the missing values will form a separate group in the groupByKey. The columns in the aggregateion function might also contain missing values and they will be ignored from numerical computations.

Please note that the output is a Dataset with proper column types, but the column names can be used noly as DataFrame columns (“columnName”) and they could be referred by ._1 or ._2 etc as Dataset columns. For example .map(x => x.value) won’t work.

### GROUPBY MULTIPLE KEYS  
For using multiple keys in groupByKey create a tuple from the key columns.

In [97]:
sw_ds.groupByKey(x=>(x.species, x.jedi, x.haircolor)).agg(mean($"weight").as[Double], count($"species").as[Long])

[key: struct<_1: string, _2: string ... 1 more field>, avg(weight): double ... 1 more field]

In [98]:
sw_ds.groupByKey(x=>(x.species, x.jedi, x.haircolor)).agg(mean($"weight").as[Double], count($"species").as[Long] ).show()

+--------------------+-----------+--------------+
|                 key|avg(weight)|count(species)|
+--------------------+-----------+--------------+
| [hutt,no_jedi,none]|       null|             1|
|[human,no_jedi,bl...|       79.0|             1|
|[dathomirian,no_j...|       80.0|             1|
| [human,no_jedi,red]|       75.0|             1|
|[droid,no_jedi,null]|       53.5|             2|
|[wookiee,no_jedi,...|      112.0|             1|
|[human,no_jedi,br...|       64.5|             2|
|   [yoda,jedi,brown]|       17.0|             1|
| [human,jedi,auburn]|       77.0|             1|
|  [human,jedi,blond]|       80.5|             2|
+--------------------+-----------+--------------+



# 8. SORTING BY ROWS
Sort is easy, there is no surprise in the synatx.

In [100]:
sw_ds.orderBy($"species".desc, $"weight")

[name: string, height: int ... 6 more fields]

In [101]:
sw_ds.orderBy($"species".desc, $"weight").show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|            Yoda|    66|    17|   brown|    brown|   jedi|       yoda|      Obi-Wan Kenobi|
|       Chewbacca|   228|   112|    blue|    brown|no_jedi|    wookiee|            Han Solo|
|           Jabba|   390|  null|  yellow|     none|no_jedi|       hutt|           Boba Fett|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|
|Lando Calrissian|   178|    79|   brown|    blank|no_jedi|      human

# 9. APPENDING DATASETS
Adding two Datasets with the same case class definition is a cake-walk.

In [102]:
sw_ds.union(sw_ds)

[name: string, height: int ... 6 more fields]

In [103]:
sw_ds.union(sw_ds).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|               C-3PO|
|           C-3PO|   167|    75|    null|     null|no_jedi|      droid

# 10. OTHER USEFUL FUNCTIONS
DESCRIBE THE DATASET
- get the number of records by using count()
- get the number of columns by using .columns.size
- get the schema by using printSchema or by dtypes

In [106]:
sw_ds.count()

13

In [107]:
sw_ds.columns.size

8

In [108]:
sw_ds.printSchema

root
 |-- name: string (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- eyecolor: string (nullable = true)
 |-- haircolor: string (nullable = true)
 |-- jedi: string (nullable = true)
 |-- species: string (nullable = true)
 |-- friends: string (nullable = true)



In [109]:
sw_ds.dtypes

Array((name,StringType), (height,IntegerType), (weight,IntegerType), (eyecolor,StringType), (haircolor,StringType), (jedi,StringType), (species,StringType), (friends,StringType))

### SOME MORE AGGREGATION FUNCTIONS
Calculate correlation between columns (optionally by groups)

In [110]:
sw_ds.agg(corr($"height", $"weight").as[Double])

[corr(height, weight): double]

In [111]:
sw_ds.agg(corr($"height", $"weight").as[Double]).show()

+--------------------+
|corr(height, weight)|
+--------------------+
|  0.9823964963433257|
+--------------------+



In [112]:
sw_ds.groupByKey(_.jedi).agg(corr($"height", $"weight").as[Double]).show()

+-------+--------------------+
|  value|corr(height, weight)|
+-------+--------------------+
|no_jedi|  0.9749158985081434|
|   jedi|  0.9973783324232722|
+-------+--------------------+



In [113]:
// Get the first value by group
sw_ds.groupByKey(_.species).agg(first($"name").as[String]).show()


+-----------+------------------+
|      value|first(name, false)|
+-----------+------------------+
|       hutt|             Jabba|
|      human|  Anakin Skywalker|
|dathomirian|        Darth Maul|
|       yoda|              Yoda|
|    wookiee|         Chewbacca|
|      droid|             R2-D2|
+-----------+------------------+



### OTHER USEFUL FUNCTIONS FOR CREATING NEW COLUMNS
In the following examples we add new columns to the Dataset thus the result is a DataFrame. In order to get Dataset again create a proper case class and convert the result into Dataset.

The first example is the hash function of a column.

In [114]:
sw_ds.withColumn("hashed_hair", hash(sw_ds("haircolor"))).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|hashed_hair|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine| -519935767|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...| -519935767|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi| 1075090752|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn| 1156710799|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...| 1075090752|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker| 1862204291|
|           R2-D2|    96|    32|    null|     null|no_j

The next example is calculating the size of a collection. For example get the number friends listed in the friends column. We need two steps:

- split the friends column at string “, “. This way we get an Array of Strings. Note that map drops the column names, so after this step we have to refer to the splitted column as _2
- use the size() to get the number of items are in that Array

In [116]:
sw_ds.
  map(x => (x.name, x.friends.split(", ")) ).
  withColumn("NrOfFriendsListed", size($"_2")).show()

+----------------+--------------------+-----------------+
|              _1|                  _2|NrOfFriendsListed|
+----------------+--------------------+-----------------+
|Anakin Skywalker|   [Sheev Palpatine]|                1|
|  Luke Skywalker|[Han Solo, Leia S...|                2|
|  Leia Skywalker|    [Obi-Wan Kenobi]|                1|
|  Obi-Wan Kenobi|[Yoda, Qui-Gon Jinn]|                2|
|        Han Solo|[Leia Skywalker, ...|                4|
| Sheev Palpatine|  [Anakin Skywalker]|                1|
|           R2-D2|             [C-3PO]|                1|
|           C-3PO|             [R2-D2]|                1|
|            Yoda|    [Obi-Wan Kenobi]|                1|
|      Darth Maul|   [Sheev Palpatine]|                1|
|       Chewbacca|          [Han Solo]|                1|
|           Jabba|         [Boba Fett]|                1|
|Lando Calrissian|          [Han Solo]|                1|
+----------------+--------------------+-----------------+



Add a monotonically increasing id into a new column using the function monotonically_increasing_id.

In [117]:
sw_ds.withColumn("id", monotonically_increasing_id).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+---+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends| id|
+----------------+------+------+--------+---------+-------+-----------+--------------------+---+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|  0|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|  1|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|  2|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|  3|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|  4|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|  5|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|               C-3PO|  6|
|           C-3PO|   167|    7

In [119]:
//Create a column containing random numbers.
sw_ds.withColumn("random",rand).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|              random|
+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|   0.628230272334296|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|  0.9741523947080084|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi| 0.12308529058626771|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|  0.3266210948490569|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...| 0.18923685456926975|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywal

In [120]:
//Calculate the lenght of strings in a column. For example count the length of the character names in our Dataset.
sw_ds.withColumn("name_lenth", length(sw_ds("name"))).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+----------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|name_lenth|
+----------------+------+------+--------+---------+-------+-----------+--------------------+----------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|        16|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|        14|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|        14|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|        14|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|         8|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|        15|
|           R2-D2|    96|    32|    null|     null|no_jedi|     

In [121]:
//We can also get the levenshtein distance between two string columns:
sw_ds.withColumn("name_species_diff", levenshtein(sw_ds("name"), sw_ds("species"))).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------------+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|name_species_diff|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------------+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|               15|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|               12|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|               13|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|               12|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|                8|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|               12|
|

In [122]:
//Finally we can find the location of a substring within a string by using locate. In the example we look for the first occurrence of letter “S” in the name of the characters.
sw_ds.withColumn("Loc_y", locate("S", sw_ds("name"))).show()

+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+
|            name|height|weight|eyecolor|haircolor|   jedi|    species|             friends|Loc_y|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+
|Anakin Skywalker|   188|    84|    blue|    blond|   jedi|      human|     Sheev Palpatine|    8|
|  Luke Skywalker|   172|    77|    blue|    blond|   jedi|      human|Han Solo, Leia Sk...|    6|
|  Leia Skywalker|   150|    49|   brown|    brown|no_jedi|      human|      Obi-Wan Kenobi|    6|
|  Obi-Wan Kenobi|   182|    77|bluegray|   auburn|   jedi|      human|  Yoda, Qui-Gon Jinn|    0|
|        Han Solo|   180|    80|   brown|    brown|no_jedi|      human|Leia Skywalker, L...|    5|
| Sheev Palpatine|   173|    75|    blue|      red|no_jedi|      human|    Anakin Skywalker|    1|
|           R2-D2|    96|    32|    null|     null|no_jedi|      droid|               C-3PO|    0|
|         

## 11. FRIENDCOUNT EXAMPLE  

THERE IS NO SPARK TUTORIAL WITHOUT THE BELOVED WORDCOUNT EXAMPLE 😉

I prepared a slightly modified version of the wordcount task. Let’s calculate how many times a charater was referred as a friend in the friends column.

I solve this problem in two ways.

First solution
In the first solution I use only the friends column and do the following steps:

- map – select the column friends
- flatMap and split – split the strings in the friends column at “, ” – as a result every full name will be in a new row
- groupByKey – the key is the new (splitted) column
- count – get the counts

So the result is how many times a character was mentioned as a friend.

If you wanted to run wordcount, then split the text at spaces by using split(” “)

In [124]:
sw_ds.
  map(x => x.friends).
  flatMap(_.split(", ")).
  groupByKey(_.toString).
  count().
  show()


+----------------+--------+
|           value|count(1)|
+----------------+--------+
|           C-3PO|       1|
|        Han Solo|       3|
| Sheev Palpatine|       2|
|  Leia Skywalker|       2|
|       Boba Fett|       1|
|    Qui-Gon Jinn|       1|
|            Yoda|       1|
|  Luke Skywalker|       1|
|  Obi-Wan Kenobi|       3|
|Anakin Skywalker|       1|
|       Chewbacca|       1|
|           R2-D2|       1|
+----------------+--------+



If the friends column has missing values, i.e. the type is Option[String] then we have to use the .getOrElse(“”) to handle missing values.

In [126]:
ds_missing.
  map(x => x.friends).
  flatMap(_.getOrElse("").split(", ")).
  groupByKey(_.toString).
  count().
  show()

+----------------+--------+
|           value|count(1)|
+----------------+--------+
| Sheev Palpatine|       1|
|  Leia Skywalker|       1|
|  Luke Skywalker|       1|
|  Obi-Wan Kenobi|       3|
|Anakin Skywalker|       1|
|                |       1|
+----------------+--------+



Second solution
In the second solution I keep the name column from the original Dataset as well. Thus we will see the name – friend pairs for every friend referred in a new row. This could be useful in case of a more complex question (for exmple how many friends of the character have letter “S” in their names). We could also count the number of friends listed by each character and the number of times a character was referred as a friend from the same Dataset.

To get the name – friend pair Dataset do the following steps:

use map to select columns name and friends splitted at string “, “
use withcolumn to create a new column containig the exploded splitted friends. The explode creates a new row for every item in the splitted friend column. The first argument in the withColumn function is the name of the newly created column. If we write here _2, then we overwrite the splitted friend column.
Let’s see the code in action:

In [127]:
import org.apache.spark.sql.functions.explode

In [128]:
sw_ds.
  map(x => (x.name, x.friends.split(", ")) ).
  withColumn("friend", explode($"_2")).
  show()

+----------------+--------------------+----------------+
|              _1|                  _2|          friend|
+----------------+--------------------+----------------+
|Anakin Skywalker|   [Sheev Palpatine]| Sheev Palpatine|
|  Luke Skywalker|[Han Solo, Leia S...|        Han Solo|
|  Luke Skywalker|[Han Solo, Leia S...|  Leia Skywalker|
|  Leia Skywalker|    [Obi-Wan Kenobi]|  Obi-Wan Kenobi|
|  Obi-Wan Kenobi|[Yoda, Qui-Gon Jinn]|            Yoda|
|  Obi-Wan Kenobi|[Yoda, Qui-Gon Jinn]|    Qui-Gon Jinn|
|        Han Solo|[Leia Skywalker, ...|  Leia Skywalker|
|        Han Solo|[Leia Skywalker, ...|  Luke Skywalker|
|        Han Solo|[Leia Skywalker, ...|  Obi-Wan Kenobi|
|        Han Solo|[Leia Skywalker, ...|       Chewbacca|
| Sheev Palpatine|  [Anakin Skywalker]|Anakin Skywalker|
|           R2-D2|             [C-3PO]|           C-3PO|
|           C-3PO|             [R2-D2]|           R2-D2|
|            Yoda|    [Obi-Wan Kenobi]|  Obi-Wan Kenobi|
|      Darth Maul|   [Sheev Pal

In the example above there is a _2 column containing the result of the split and a third friend column for every item in the _2 columns. In the next example I use _2 as the name of the new column, thus overwrite the splitted friend column, and rename the columns. Then a new case class is defined an the result is converted to Dataset. We will do more transfromation on this Dataset.

In [129]:
case class NameFriend(name: String, friend: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

In [130]:
val NameFriend_df =sw_ds.
  map(x => (x.name, x.friends.split(", ")) ).
  withColumn("_2", explode($"_2")).
  toDF(Seq("name", "friend"): _*)

In [131]:
val NameFriend_ds = NameFriend_df.as[NameFriend]

In [132]:
NameFriend_ds.show()

+----------------+----------------+
|            name|          friend|
+----------------+----------------+
|Anakin Skywalker| Sheev Palpatine|
|  Luke Skywalker|        Han Solo|
|  Luke Skywalker|  Leia Skywalker|
|  Leia Skywalker|  Obi-Wan Kenobi|
|  Obi-Wan Kenobi|            Yoda|
|  Obi-Wan Kenobi|    Qui-Gon Jinn|
|        Han Solo|  Leia Skywalker|
|        Han Solo|  Luke Skywalker|
|        Han Solo|  Obi-Wan Kenobi|
|        Han Solo|       Chewbacca|
| Sheev Palpatine|Anakin Skywalker|
|           R2-D2|           C-3PO|
|           C-3PO|           R2-D2|
|            Yoda|  Obi-Wan Kenobi|
|      Darth Maul| Sheev Palpatine|
|       Chewbacca|        Han Solo|
|           Jabba|       Boba Fett|
|Lando Calrissian|        Han Solo|
+----------------+----------------+



Finally we will answer three different questions using the NameFriend_ds Dataset:

A. How many times the characters were referred as a friend?

Solution:

- groupByKey where the key is the splitted and exploded referred friends column
- count – calculate the number of occurrence of refrerred friends
- orderBy – sort the values by decreasing popularity

In [133]:
//A.
NameFriend_ds.
  groupByKey(_.friend).
  count().
  orderBy($"count(1)".desc).
  show()

+----------------+--------+
|           value|count(1)|
+----------------+--------+
|  Obi-Wan Kenobi|       3|
|        Han Solo|       3|
| Sheev Palpatine|       2|
|  Leia Skywalker|       2|
|           C-3PO|       1|
|    Qui-Gon Jinn|       1|
|            Yoda|       1|
|       Chewbacca|       1|
|       Boba Fett|       1|
|           R2-D2|       1|
|  Luke Skywalker|       1|
|Anakin Skywalker|       1|
+----------------+--------+



Han Solo and Obi-Wan Kenobi were the most popular, they were mentioned by 3 other charates as their friends.

B. How many friends were listed by the characters?

Solution:

- groupByKey where the key is name of the characters
- count – calculate the number of occurrence of a name
- orderBy – sort the values by decreasing number of listed friends

In [134]:
//B.
NameFriend_ds.
  groupByKey(_.name).
  count().
  orderBy($"count(1)".desc).
  show()

+----------------+--------+
|           value|count(1)|
+----------------+--------+
|        Han Solo|       4|
|  Obi-Wan Kenobi|       2|
|  Luke Skywalker|       2|
|           C-3PO|       1|
| Sheev Palpatine|       1|
|           Jabba|       1|
|      Darth Maul|       1|
|  Leia Skywalker|       1|
|           R2-D2|       1|
|            Yoda|       1|
|Lando Calrissian|       1|
|Anakin Skywalker|       1|
|       Chewbacca|       1|
+----------------+--------+



Han Solo listed 4 friends, Luke listed 2, etc.

C. How many friends were listed with letter “S” in their names by the characters?

Solution:

- create a case class containing a new Integer column
- withColumn – add the new column with the position of letter “S”
- convert the result into Dataset
- filter rows where the position of “S” is greater than 0 (the remaining rows contain friend with letter “S”)
- groupByKey – where the key is name of the characters
- count the number of rows by characters in the filtered Dataset
- orderBy – sort the values by decreasing number of friends with letter “S” in their names

In [135]:
case class NameFriendS_ds(name: String, friend: String, S_in_friend:Integer)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

In [136]:
//C.
NameFriend_ds.
 withColumn("S_in_friend", locate("S", (NameFriend_ds("friend"))) ).
 as[NameFriendS_ds].
 filter(x=>x.S_in_friend>0).
 groupByKey(_.name).
 count().
 orderBy($"count(1)".desc).
 show()

+----------------+--------+                                                     
|           value|count(1)|
+----------------+--------+
|        Han Solo|       2|
|  Luke Skywalker|       2|
|Anakin Skywalker|       1|
|      Darth Maul|       1|
|       Chewbacca|       1|
|Lando Calrissian|       1|
| Sheev Palpatine|       1|
+----------------+--------+



We can see for example that Han Solo and Luke Skywalker have two friends `mwhose name contain letter “S”. Characters not listed in the output have no friends with letter “S”.