In [1]:
import $ivy.`org.apache.spark::spark-sql:3.5.7`
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
import org.apache.spark.sql._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.functions._

val spark = SparkSession
                .builder()
                .master("local[*]")
                .appName("Dataframe API")
                .config("spark.log.level", "WARN")
                .getOrCreate()

import spark.implicits._

println(s"spark.version == ${spark.version}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/09 14:07:33 INFO SparkContext: Running Spark version 3.5.7
26/02/09 14:07:33 INFO SparkContext: OS info Linux, 6.8.0-90-generic, amd64
26/02/09 14:07:33 INFO SparkContext: Java version 1.8.0_442
26/02/09 14:07:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting Spark log level to "WARN".


spark.version == 3.5.7


[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}[39m
[32mimport [39m[36morg.apache.spark.sql._[39m
[32mimport [39m[36morg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}[39m
[32mimport [39m[36morg.apache.spark.sql.functions._[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@4b6a54f9
[32mimport [39m[36mspark.implicits._[39m

In [2]:
val countriesDF = spark.read.option("multiline", "true").json("data/countries.json")

26/02/09 14:07:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[36mcountriesDF[39m: [32mDataFrame[39m = [altSpellings: array<string>, area: double ... 21 more fields]

In [3]:
def getCountriesWithManyBorders(df: DataFrame, minBorders: Int = 5): DataFrame = {
  df.withColumn("NumBorders", size($"borders"))
    .filter($"NumBorders" >= minBorders)
    .withColumn("BorderCountries", concat_ws(",", $"borders"))
    .select(
      $"name.common".as("Country"),
      $"NumBorders",
      $"BorderCountries"
    )
}

defined [32mfunction[39m [36mgetCountriesWithManyBorders[39m

In [4]:
val countriesManyBorders = getCountriesWithManyBorders(countriesDF)
countriesManyBorders.count()
countriesManyBorders.show(5, false)

+------------+----------+-------------------------------+
|Country     |NumBorders|BorderCountries                |
+------------+----------+-------------------------------+
|Afghanistan |6         |IRN,PAK,TKM,UZB,TJK,CHN        |
|Argentina   |5         |BOL,BRA,CHL,PRY,URY            |
|Austria     |8         |CZE,DEU,HUN,ITA,LIE,SVK,SVN,CHE|
|Azerbaijan  |5         |ARM,GEO,IRN,RUS,TUR            |
|Burkina Faso|6         |BEN,CIV,GHA,MLI,NER,TGO        |
+------------+----------+-------------------------------+
only showing top 5 rows



[36mcountriesManyBorders[39m: [32mDataFrame[39m = [Country: string, NumBorders: int ... 1 more field]
[36mres4_1[39m: [32mLong[39m = [32m60L[39m

In [5]:
def getLanguageRanking(df: DataFrame): DataFrame = {
  val languageCols = df.select("languages.*").columns

  val dfWithMap = df.withColumn(
    "languages_map",
    map(languageCols.flatMap(c => Seq(lit(c), col(s"languages.$c"))): _*)
  )

val langExploded = dfWithMap
  .select(
    col("name.common").as("Country"),
    explode(col("languages_map")).as(Seq("lang_code", "Language"))
  )
  .filter(col("Language").isNotNull)

  langExploded
    .groupBy("Language")
    .agg(
      count("*").as("NumCountries"),
      collect_list("Country").as("Countries")
    )
    .orderBy(desc("NumCountries"))
}

defined [32mfunction[39m [36mgetLanguageRanking[39m

In [6]:
val languageRanking = getLanguageRanking(countriesDF)
languageRanking.count()
languageRanking.show(2)

+--------+------------+--------------------+
|Language|NumCountries|           Countries|
+--------+------------+--------------------+
| English|          91|[Sint Maarten, Se...|
|  French|          46|[Sint Maarten, Se...|
+--------+------------+--------------------+
only showing top 2 rows



[36mlanguageRanking[39m: [32mDataFrame[39m = [Language: string, NumCountries: bigint ... 1 more field]
[36mres6_1[39m: [32mLong[39m = [32m155L[39m

In [8]:
countriesManyBorders.coalesce(1).write.mode("overwrite").parquet("/tmp/output/countries_many_borders")
languageRanking.coalesce(1).write.mode("overwrite").parquet("/tmp/output/language_ranking")