In [1]:
spark

Intitializing Scala interpreter ...

Spark Web UI available at http://localhost:4040
SparkContext available as 'sc' (version = 3.0.2, master = local[*], app id = local-1623684744015)
SparkSession available as 'spark'


res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2b845953


# Pokedex DataFrame



the dataframe API in spark is a dynamically typed API which allows pandas/SQL
like requests in a distributed (and possibly noisy) environment.
It compiles to RDD operations on the low level, but the high level is
pretty straightforward.

Most of the time, you will only need to use that API, however, sometimes you will
need statically typed code and some other times (typically on custom aggregations)
you will need to create the low level RDD code yourself. Most of the time though,
you will use DataFrames and DataSets

## Loading the data

In [2]:
import org.apache.spark.sql.{functions => F}

import org.apache.spark.sql.{functions=>F}


In [3]:
// Your spark session has already been created

val df = spark
    .read
    .option("multiline", true)
    .option("header", true)
    .format("csv")
    .load("pokedex_(Update_05.20).csv")

df: org.apache.spark.sql.DataFrame = [_c0: string, pokedex_number: string ... 49 more fields]


In [4]:
df.columns

res1: Array[String] = Array(_c0, pokedex_number, name, german_name, japanese_name, generation, status, species, type_number, type_1, type_2, height_m, weight_kg, abilities_number, ability_1, ability_2, ability_hidden, total_points, hp, attack, defense, sp_attack, sp_defense, speed, catch_rate, base_friendship, base_experience, growth_rate, egg_type_number, egg_type_1, egg_type_2, percentage_male, egg_cycles, against_normal, against_fire, against_water, against_electric, against_grass, against_ice, against_fight, against_poison, against_ground, against_flying, against_psychic, against_bug, against_rock, against_ghost, against_dragon, against_dark, against_steel, "against_fairy")


In [5]:
df
    .select("name")
    .show(5)

+-------------+
|         name|
+-------------+
|    Bulbasaur|
|      Ivysaur|
|     Venusaur|
|Mega Venusaur|
|   Charmander|
+-------------+
only showing top 5 rows



## Correcting the types

In [6]:
df
    .select("_c0", "generation", "status", "height_m", "weight_kg", "speed")
    .dtypes

res3: Array[(String, String)] = Array((_c0,StringType), (generation,StringType), (status,StringType), (height_m,StringType), (weight_kg,StringType), (speed,StringType))


In [7]:
df
    .select("_c0", "generation", "status", "height_m", "weight_kg", "speed")
    .show(5)

+---+----------+------+--------+---------+-----+
|_c0|generation|status|height_m|weight_kg|speed|
+---+----------+------+--------+---------+-----+
|  0|         1|Normal|     0.7|      6.9|   45|
|  1|         1|Normal|     1.0|     13.0|   60|
|  2|         1|Normal|     2.0|    100.0|   80|
|  3|         1|Normal|     2.4|    155.5|   80|
|  4|         1|Normal|     0.6|      8.5|   65|
+---+----------+------+--------+---------+-----+
only showing top 5 rows



In [8]:
df
    .select("total_points", "hp", "attack", "defense", "sp_attack", "sp_defense",
          "speed", "catch_rate", "base_friendship", "base_experience")
    .show(5)

+------------+---+------+-------+---------+----------+-----+----------+---------------+---------------+
|total_points| hp|attack|defense|sp_attack|sp_defense|speed|catch_rate|base_friendship|base_experience|
+------------+---+------+-------+---------+----------+-----+----------+---------------+---------------+
|         318| 45|    49|     49|       65|        65|   45|        45|             70|             64|
|         405| 60|    62|     63|       80|        80|   60|        45|             70|            142|
|         525| 80|    82|     83|      100|       100|   80|        45|             70|            236|
|         625| 80|   100|    123|      122|       120|   80|        45|             70|            281|
|         309| 39|    52|     43|       60|        50|   65|        45|             70|             62|
+------------+---+------+-------+---------+----------+-----+----------+---------------+---------------+
only showing top 5 rows



In [9]:
df
    .select("percentage_male")
    .show(5)

+---------------+
|percentage_male|
+---------------+
|           87.5|
|           87.5|
|           87.5|
|           87.5|
|           87.5|
+---------------+
only showing top 5 rows



In [10]:
val df = spark
    .read
    .option("multiline", true)
    .option("header", true)
    .format("csv")
    .load("pokedex_(Update_05.20).csv")
    .withColumn("height_m",        col("height_m").cast("float"))
    .withColumn("weight_kg",       $"weight_kg".cast("float"))
    .withColumn("percentage_male", $"percentage_male".cast("float"))
    .withColumn("_c0",             $"_c0".cast("int"))
    .withColumn("pokedex_number",  $"pokedex_number".cast("int"))
    .withColumn("total_points",    $"total_points".cast("int"))
    .withColumn("hp",              $"hp".cast("int"))
    .withColumn("attack",          $"attack".cast("int"))
    .withColumn("defense",         $"defense".cast("int"))
    .withColumn("sp_attack",       $"sp_attack".cast("int"))
    .withColumn("sp_defense",      $"sp_defense".cast("int"))
    .withColumn("speed",           $"speed".cast("int"))
    .withColumn("catch_rate",      $"catch_rate".cast("int"))
    .withColumn("base_friendship", $"base_friendship".cast("int"))
    .withColumn("base_experience", $"base_experience".cast("int"))

df: org.apache.spark.sql.DataFrame = [_c0: int, pokedex_number: int ... 49 more fields]


## Analyse the data

Which pokemons are the highest?

Which pokemons are the heaviest?

Which pokemons have the biggest BMI (mass / heightÂ²)?

In [11]:

df.select(F.max(df("height_m")))

res7: org.apache.spark.sql.DataFrame = [max(height_m): float]


In [12]:
val max_height = df
    .select(F.max(df("height_m")))
    .take(1)(0)(0)

df
    .filter(df("height_m") === max_height)
    .select("name", "height_m")
    .show

+-------------------+--------+
|               name|height_m|
+-------------------+--------+
|Eternatus Eternamax|   100.0|
+-------------------+--------+



max_height: Any = 100.0


In [13]:
val max_weight = df
    .select(F.max(df("weight_kg")))
    .take(1)(0)(0)

df
    .where(df("weight_kg") === max_weight)
    .select("name", "weight_kg")
    .show

+----------+---------+
|      name|weight_kg|
+----------+---------+
|   Cosmoem|    999.9|
|Celesteela|    999.9|
+----------+---------+



max_weight: Any = 999.9


In [14]:
// that fails because you should not mix $ and ""

val imc = df.select("name", $"height_m" * $"height_m")

<console>: 27: error: overloaded method value select with alternatives:

In [15]:
val imc = df.select($"name", ($"height_m" * $"height_m").as("imc"))
val max_imc = imc
    .select(F.max($"imc"))
    .take(1)(0)(0)

imc
    .where($"imc" === max_imc)
    .show

+-------------------+-------+
|               name|    imc|
+-------------------+-------+
|Eternatus Eternamax|10000.0|
+-------------------+-------+



imc: org.apache.spark.sql.DataFrame = [name: string, imc: float]
max_imc: Any = 10000.0


## Group By and Join

Which pokemons are the heaviest in each generation?

In [16]:
val mass_by_gen = df
    .groupBy("generation")
    .agg(F.max($"weight_kg").as("weight_kg"))

mass_by_gen.show

+----------+---------+
|generation|weight_kg|
+----------+---------+
|         7|    999.9|
|         3|    999.7|
|         8|    950.0|
|         5|    345.0|
|         6|    610.0|
|         1|    460.0|
|         4|    750.0|
|         2|    740.0|
+----------+---------+



mass_by_gen: org.apache.spark.sql.DataFrame = [generation: string, weight_kg: float]


In [17]:
mass_by_gen
    .join(
        df.select("name", "generation", "weight_kg"),
        df("generation") === mass_by_gen("generation") && df("weight_kg") === mass_by_gen("weight_kg"),
        "inner"
    )
    .show()

+----------+---------+--------------------+----------+---------+
|generation|weight_kg|                name|generation|weight_kg|
+----------+---------+--------------------+----------+---------+
|         1|    460.0|             Snorlax|         1|    460.0|
|         2|    740.0|        Mega Steelix|         2|    740.0|
|         3|    999.7|      Primal Groudon|         3|    999.7|
|         4|    750.0|Giratina Altered ...|         4|    750.0|
|         5|    345.0|              Zekrom|         5|    345.0|
|         6|    610.0|Zygarde Complete ...|         6|    610.0|
|         7|    999.9|             Cosmoem|         7|    999.9|
|         7|    999.9|          Celesteela|         7|    999.9|
|         8|    950.0|           Eternatus|         8|    950.0|
+----------+---------+--------------------+----------+---------+



# Pokedex Dataset

DataSets are the statically typed equivalent of DataFrames, when working
with datasets, all the types are checked by the Scala compiler, which means
that you will be able to spot mistakes earlier in your process

DataFrames are DataSets with type Row, DataFrame = DataSet\[Row\]
and the type row can take data of any schema

Compute the maximum hp using the typed Dataset API with map and reduce

In [18]:
case class PokedexRow (
    _c0: Integer,
    pokedex_number: Integer,
    name: String,
    german_name: String,
    japanese_name: String,
    generation: String,
    status: String,
    species: String,
    type_number: String,
    type_1: String,
    type_2: String,
    height_m: Float,
    weight_kg: Float,
    abilities_number: String,
    ability_1: String,
    ability_2: String,
    ability_hidden: String,
    total_points: Integer,
    hp: Integer,
    attack: Integer,
    defense: Integer,
    sp_attack: Integer,
    sp_defense: Integer,
    speed: Integer,
    catch_rate: Integer,
    base_friendship: Integer,
    base_experience: Integer,
    growth_rate: String,
    egg_type_number: String,
    egg_type_1: String,
    egg_type_2: String,
    percentage_male: Float,
    egg_cycles: String,
    against_normal: String,
    against_fire: String,
    against_water: String,
    against_electric: String,
    against_grass: String,
    against_ice: String,
    against_fight: String,
    against_poison: String,
    against_ground: String,
    against_flying: String,
    against_psychic: String,
    against_bug: String,
    against_rock: String,
    against_ghost: String,
    against_dragon: String,
    against_dark: String,
    against_steel: String
)

defined class PokedexRow


In [19]:
val df = spark
    .read
    .option("multiline", true)
    .option("header", true)
    .format("csv")
    .load("pokedex_(Update_05.20).csv")
    .withColumn("height_m",        col("height_m").cast("float"))
    .withColumn("weight_kg",       $"weight_kg".cast("float"))
    .withColumn("percentage_male", $"percentage_male".cast("float"))
    .withColumn("_c0",             $"_c0".cast("int"))
    .withColumn("pokedex_number",  $"pokedex_number".cast("int"))
    .withColumn("total_points",    $"total_points".cast("int"))
    .withColumn("hp",              $"hp".cast("int"))
    .withColumn("attack",          $"attack".cast("int"))
    .withColumn("defense",         $"defense".cast("int"))
    .withColumn("sp_attack",       $"sp_attack".cast("int"))
    .withColumn("sp_defense",      $"sp_defense".cast("int"))
    .withColumn("speed",           $"speed".cast("int"))
    .withColumn("catch_rate",      $"catch_rate".cast("int"))
    .withColumn("base_friendship", $"base_friendship".cast("int"))
    .withColumn("base_experience", $"base_experience".cast("int"))
    .na.drop
    .as[PokedexRow]

df

df: org.apache.spark.sql.Dataset[PokedexRow] = [_c0: int, pokedex_number: int ... 49 more fields]
res13: org.apache.spark.sql.Dataset[PokedexRow] = [_c0: int, pokedex_number: int ... 49 more fields]


In [20]:
df
    .map(_.hp)
    .reduce((x: Integer, y: Integer) => new Integer(Integer.max(x, y)))

res14: Integer = 130


# RDD API

The RDD API is low level and most of the time you won't need to use it.
However, in som cases, your Spark jobs might fail because you have a lot of
data and you need to optimise your code if you don't want the cluster to
give up on your jobs, so it is useful to know how to use RDDs.

get the total weight of each generation of pokemons with the RDD API

In [21]:
import org.apache.spark.rdd.RDD

val dfRDD: RDD[(String, Float)] = df
    .select("generation", "weight_kg")
    .as[(String, Float)]
    .rdd

import org.apache.spark.rdd.RDD
dfRDD: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[87] at rdd at <console>:31


In [22]:
/*
You should use reduceByKey and not groupByKey
because grouping will have to relocate data
belonging in the same group to one cluster
that might be quite intensive on the data
while reduceByKey does not need that
*/

dfRDD
    .reduceByKey(_+_)
    .take(10)

res15: Array[(String, Float)] = Array((4,524.19995), (7,189.0), (5,414.30002), (6,497.0), (2,380.7), (3,777.0), (1,895.9))


# TF IDF IMDB

In [23]:
val df = spark
    .read
    .option("multiline", true)
    .option("header", true)
    .format("csv")
    .load("IMDB Dataset.csv")
df

df: org.apache.spark.sql.DataFrame = [review: string, sentiment: string]
res16: org.apache.spark.sql.DataFrame = [review: string, sentiment: string]


## Tokenizing

In [24]:
import org.apache.spark.ml.feature.Tokenizer

val tkn = new Tokenizer()
    .setInputCol("review")
    .setOutputCol("review_toks")
val tokenized = tkn.transform(df)
tokenized.show()


+--------------------+--------------------+--------------------+
|              review|           sentiment|         review_toks|
+--------------------+--------------------+--------------------+
|One of the other ...|            positive|[one, of, the, ot...|
|"A wonderful litt...| not only is it w...|["a, wonderful, l...|
|"I thought this w...| but spirited you...|["i, thought, thi...|
|Basically there's...|            negative|[basically, there...|
|"Petter Mattei's ...| power and succes...|["petter, mattei'...|
|"Probably my all-...| but that only ma...|["probably, my, a...|
|I sure would like...|            positive|[i, sure, would, ...|
|This show was an ...|            negative|[this, show, was,...|
|Encouraged by the...|            negative|[encouraged, by, ...|
|If you like origi...|            positive|[if, you, like, o...|
|"Phil the Alien i...|            negative|["phil, the, alie...|
|I saw this movie ...|            negative|[i, saw, this, mo...|
|"So im not a big ...| me

import org.apache.spark.ml.feature.Tokenizer
tkn: org.apache.spark.ml.feature.Tokenizer = tok_b91ad610c05a
tokenized: org.apache.spark.sql.DataFrame = [review: string, sentiment: string ... 1 more field]


## Removing stop words

In [25]:
import org.apache.spark.ml.feature.StopWordsRemover

val englishStopWords = StopWordsRemover.loadDefaultStopWords("english")
val stops = new StopWordsRemover()
    .setStopWords(englishStopWords)
    .setInputCol("review_toks")
    .setOutputCol("review_toks_sw")

val sw = stops.transform(tokenized)

sw.show

+--------------------+--------------------+--------------------+--------------------+
|              review|           sentiment|         review_toks|      review_toks_sw|
+--------------------+--------------------+--------------------+--------------------+
|One of the other ...|            positive|[one, of, the, ot...|[one, reviewers, ...|
|"A wonderful litt...| not only is it w...|["a, wonderful, l...|["a, wonderful, l...|
|"I thought this w...| but spirited you...|["i, thought, thi...|["i, thought, won...|
|Basically there's...|            negative|[basically, there...|[basically, famil...|
|"Petter Mattei's ...| power and succes...|["petter, mattei'...|["petter, mattei'...|
|"Probably my all-...| but that only ma...|["probably, my, a...|["probably, all-t...|
|I sure would like...|            positive|[i, sure, would, ...|[sure, like, see,...|
|This show was an ...|            negative|[this, show, was,...|[show, amazing,, ...|
|Encouraged by the...|            negative|[encouraged

import org.apache.spark.ml.feature.StopWordsRemover
englishStopWords: Array[String] = Array(i, me, my, myself, we, our, ours, ourselves, you, your, yours, yourself, yourselves, he, him, his, himself, she, her, hers, herself, it, its, itself, they, them, their, theirs, themselves, what, which, who, whom, this, that, these, those, am, is, are, was, were, be, been, being, have, has, had, having, do, does, did, doing, a, an, the, and, but, if, or, because, as, until, while, of, at, by, for, with, about, against, between, into, through, during, before, after, above, below, to, from, up, down, in, out, on, off, over, under, again, further, then, once, here, there, when, where, why, how, all, any, both, each, few, more, most, other, some, such, no, nor, not, only, own, same, so, than, too, ver...


## Computing TF-IDF

In [26]:
import org.apache.spark.ml.feature.{HashingTF, IDF}


val tf = new HashingTF()
    .setInputCol("review_toks_sw")
    .setOutputCol("TFOut")
    .setNumFeatures(10000)

val idf = new IDF()
    .setInputCol("TFOut")
    .setOutputCol("IDFOut")
    .setMinDocFreq(2)

val tfdat = tf.transform(sw)

val tfIdfOut = idf.fit(tfdat)
    .transform(tfdat)

tfIdfOut.select("TFOut", "IDFOut").show

+--------------------+--------------------+
|               TFOut|              IDFOut|
+--------------------+--------------------+
|(10000,[157,281,2...|(10000,[157,281,2...|
|(10000,[157,1077,...|(10000,[157,1077,...|
|(10000,[55,88,157...|(10000,[55,88,157...|
|(10000,[144,157,1...|(10000,[144,157,1...|
|(10000,[566,620,1...|(10000,[566,620,1...|
|(10000,[281,331,8...|(10000,[281,331,8...|
|(10000,[24,42,65,...|(10000,[24,42,65,...|
|(10000,[78,370,53...|(10000,[78,370,53...|
|(10000,[1,167,332...|(10000,[1,167,332...|
|(10000,[78,1277,1...|(10000,[78,1277,1...|
|(10000,[167,274,2...|(10000,[167,274,2...|
|(10000,[167,263,4...|(10000,[167,263,4...|
|(10000,[150,234,3...|(10000,[150,234,3...|
|(10000,[281,440,4...|(10000,[281,440,4...|
|(10000,[734,815,1...|(10000,[734,815,1...|
|(10000,[261,793,8...|(10000,[261,793,8...|
|(10000,[164,167,4...|(10000,[164,167,4...|
|(10000,[25,51,72,...|(10000,[25,51,72,...|
|(10000,[91,261,29...|(10000,[91,261,29...|
|(10000,[49,292,30...|(10000,[49

import org.apache.spark.ml.feature.{HashingTF, IDF}
tf: org.apache.spark.ml.feature.HashingTF = HashingTF: uid=hashingTF_f22010f16283, binary=false, numFeatures=10000
idf: org.apache.spark.ml.feature.IDF = idf_1e2fa8a5fb12
tfdat: org.apache.spark.sql.DataFrame = [review: string, sentiment: string ... 3 more fields]
tfIdfOut: org.apache.spark.sql.DataFrame = [review: string, sentiment: string ... 4 more fields]
