In [1]:
import spark.implicits._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

Intitializing Scala interpreter ...

Spark Web UI available at http://host.docker.internal:4040
SparkContext available as 'sc' (version = 3.5.0, master = local[*], app id = local-1696246811124)
SparkSession available as 'spark'


import spark.implicits._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}


## Datasets

In [2]:
case class Customer(name: String, surname: String, age: Int)
object Customer {
    val customers = Seq(Customer("Ivan", "Ivanov", 22))
    val customerDF = spark.sparkContext.parallelize(customers).toDF().as[Customer]
}

defined class Customer
defined object Customer


In [3]:
Customer.customerDF.filter(_.age > 20).show()

+----+-------+---+
|name|surname|age|
+----+-------+---+
|Ivan| Ivanov| 22|
+----+-------+---+



In [4]:
Customer.customerDF.filter(_.age > 20).explain()

== Physical Plan ==
*(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line10.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Customer, true])).name, true, false, true) AS name#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line10.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Customer, true])).surname, true, false, true) AS surname#5, knownnotnull(assertnotnull(input[0, $line10.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Customer, true])).age AS age#6]
+- *(1) Filter $line12.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$3991/0x000000010179b840@12282797.apply
   +- Scan[obj#3]




---------------------------

## DataFrame

In [5]:
  val columns = Seq("StudentID", "Course")
  val data = Seq(("1", "Spark"), ("2", "Scala"), ("3", "Java"))

columns: Seq[String] = List(StudentID, Course)
data: Seq[(String, String)] = List((1,Spark), (2,Scala), (3,Java))


In [6]:
def fromRDD(): DataFrame = spark.sparkContext.parallelize(data).toDF(columns: _*)
fromRDD().show
fromRDD().printSchema

+---------+------+
|StudentID|Course|
+---------+------+
|        1| Spark|
|        2| Scala|
|        3|  Java|
+---------+------+

root
 |-- StudentID: string (nullable = true)
 |-- Course: string (nullable = true)



fromRDD: ()org.apache.spark.sql.DataFrame


In [7]:
def fromList() = data.toDF()
fromList().show

+---+-----+
| _1|   _2|
+---+-----+
|  1|Spark|
|  2|Scala|
|  3| Java|
+---+-----+



fromList: ()org.apache.spark.sql.DataFrame


In [8]:
def createDataFrame() = spark.createDataFrame(data)
createDataFrame().show

+---+-----+
| _1|   _2|
+---+-----+
|  1|Spark|
|  2|Scala|
|  3| Java|
+---+-----+



createDataFrame: ()org.apache.spark.sql.DataFrame


In [9]:
def withSchema(): DataFrame = {
    val schema = StructType(Array(
        StructField("StudentID", IntegerType, false),
        StructField("Course", StringType, true))
                           )
    val rdd = spark.sparkContext.parallelize(Seq(
    Row(1, "Scala"),
    Row(2, "spark")))
    spark.createDataFrame(rdd, schema)
}

withSchema: ()org.apache.spark.sql.DataFrame


In [10]:
withSchema().show
withSchema().printSchema

+---------+------+
|StudentID|Course|
+---------+------+
|        1| Scala|
|        2| spark|
+---------+------+

root
 |-- StudentID: integer (nullable = false)
 |-- Course: string (nullable = true)



In [11]:
val data = spark.read
    .format("json")
//     .schema - для задания схемы
    .option("mode", "FAILFAST")
    .load("customer_data.json")


data: org.apache.spark.sql.DataFrame = [Address: string, Birthdate: string ... 5 more fields]


In [12]:
data.show(2)
data.printSchema()

+--------------------+-------------------+--------------+----------+--------------------+---------------+----------------+
|             Address|          Birthdate|       Country|CustomerID|               Email|           Name|        Username|
+--------------------+-------------------+--------------+----------+--------------------+---------------+----------------+
|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|     12346|cooperalexis@hotm...|  Lindsay Cowan|valenciajennifer|
|55711 Janet Plaza...|1988-06-21 00:15:34|       Iceland|     12347|timothy78@hotmail...|Katherine David|      hillrachel|
+--------------------+-------------------+--------------+----------+--------------------+---------------+----------------+
only showing top 2 rows

root
 |-- Address: string (nullable = true)
 |-- Birthdate: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Name: string (nullable = true

-----------------------------------

## Base Operations

### SELECT

In [13]:
import org.apache.spark.sql.DataFrame
// import org.apache.spark.sql.functions.{array, col, count, date_format, desc, lit}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window


In [14]:
data.select("Country").show(4)

+--------------+
|       Country|
+--------------+
|United Kingdom|
|       Iceland|
|       Finland|
|         Italy|
+--------------+
only showing top 4 rows



In [15]:
data.select(col("Country").isNotNull.as("NotNull")).show(2)

+-------+
|NotNull|
+-------+
|   true|
|   true|
+-------+
only showing top 2 rows



In [16]:
data.select($"Country").show(2)

+--------------+
|       Country|
+--------------+
|United Kingdom|
|       Iceland|
+--------------+
only showing top 2 rows



In [17]:
data.selectExpr("Country as Strana").show(2)

+--------------+
|        Strana|
+--------------+
|United Kingdom|
|       Iceland|
+--------------+
only showing top 2 rows



In [18]:
val w = Window.partitionBy("country").orderBy("Name")

w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@3e32e4dd


In [19]:
// data.withColumn("new_col", row_number() over w).show(2)  -можно так
data.withColumn("new_col2", row_number.over(w)).show(2)

+--------------------+-------------------+---------+----------+-------------------+---------------+-------------+--------+
|             Address|          Birthdate|  Country|CustomerID|              Email|           Name|     Username|new_col2|
+--------------------+-------------------+---------+----------+-------------------+---------------+-------------+--------+
|15528 Tyler Mount...|1967-11-29 02:49:52|Australia|     12424|leslieday@gmail.com|Adrienne Obrien|       rwelch|       1|
|7193 Gibbs Key\nP...|1993-04-02 13:58:45|Australia|     12388|  cfoster@gmail.com| David Lawrence|thomasmendoza|       2|
+--------------------+-------------------+---------+----------+-------------------+---------------+-------------+--------+
only showing top 2 rows



In [20]:
data.withColumn("new_col2", col("Country")).drop("Country").show(2)  //дублирование

+--------------------+-------------------+----------+--------------------+---------------+----------------+--------------+
|             Address|          Birthdate|CustomerID|               Email|           Name|        Username|      new_col2|
+--------------------+-------------------+----------+--------------------+---------------+----------------+--------------+
|Unit 1047 Box 408...|1994-02-20 00:46:27|     12346|cooperalexis@hotm...|  Lindsay Cowan|valenciajennifer|United Kingdom|
|55711 Janet Plaza...|1988-06-21 00:15:34|     12347|timothy78@hotmail...|Katherine David|      hillrachel|       Iceland|
+--------------------+-------------------+----------+--------------------+---------------+----------------+--------------+
only showing top 2 rows



In [21]:
data.withColumn("flag", lit(false)).drop("Country").show(2) // lit -> превращает значение в столбец значений

+--------------------+-------------------+----------+--------------------+---------------+----------------+-----+
|             Address|          Birthdate|CustomerID|               Email|           Name|        Username| flag|
+--------------------+-------------------+----------+--------------------+---------------+----------------+-----+
|Unit 1047 Box 408...|1994-02-20 00:46:27|     12346|cooperalexis@hotm...|  Lindsay Cowan|valenciajennifer|false|
|55711 Janet Plaza...|1988-06-21 00:15:34|     12347|timothy78@hotmail...|Katherine David|      hillrachel|false|
+--------------------+-------------------+----------+--------------------+---------------+----------------+-----+
only showing top 2 rows



In [22]:
// переимнование столбца
data.withColumnRenamed("Iceland", "strana").show(2)  // медлено - лучше использовать Селект эс

+--------------------+-------------------+--------------+----------+--------------------+---------------+----------------+
|             Address|          Birthdate|       Country|CustomerID|               Email|           Name|        Username|
+--------------------+-------------------+--------------+----------+--------------------+---------------+----------------+
|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|     12346|cooperalexis@hotm...|  Lindsay Cowan|valenciajennifer|
|55711 Janet Plaza...|1988-06-21 00:15:34|       Iceland|     12347|timothy78@hotmail...|Katherine David|      hillrachel|
+--------------------+-------------------+--------------+----------+--------------------+---------------+----------------+
only showing top 2 rows



In [23]:
// data.createGlobalTempView("customer_table")
// spark.sql("SELECT 'Country' FROM customer_table").show(2)

### FILTER

filter = where (можно заменить)

In [24]:
data.filter("Country = 'Iceland' AND Name = 'Katherine David'").show // SQL

+--------------------+-------------------+-------+----------+--------------------+---------------+----------+
|             Address|          Birthdate|Country|CustomerID|               Email|           Name|  Username|
+--------------------+-------------------+-------+----------+--------------------+---------------+----------+
|55711 Janet Plaza...|1988-06-21 00:15:34|Iceland|     12347|timothy78@hotmail...|Katherine David|hillrachel|
+--------------------+-------------------+-------+----------+--------------------+---------------+----------+



In [25]:
val filterBy = col("Country") === "Iceland"

filterBy: org.apache.spark.sql.Column = (Country = Iceland)


In [26]:
data.where(filterBy &&  col("Name") === "Katherine David").show // Scala

+--------------------+-------------------+-------+----------+--------------------+---------------+----------+
|             Address|          Birthdate|Country|CustomerID|               Email|           Name|  Username|
+--------------------+-------------------+-------+----------+--------------------+---------------+----------+
|55711 Janet Plaza...|1988-06-21 00:15:34|Iceland|     12347|timothy78@hotmail...|Katherine David|hillrachel|
+--------------------+-------------------+-------+----------+--------------------+---------------+----------+



### SORT

In [27]:
data.sort("CustomerId").show(2)

+--------------------+-------------------+--------------+----------+--------------------+---------------+----------------+
|             Address|          Birthdate|       Country|CustomerID|               Email|           Name|        Username|
+--------------------+-------------------+--------------+----------+--------------------+---------------+----------------+
|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|     12346|cooperalexis@hotm...|  Lindsay Cowan|valenciajennifer|
|55711 Janet Plaza...|1988-06-21 00:15:34|       Iceland|     12347|timothy78@hotmail...|Katherine David|      hillrachel|
+--------------------+-------------------+--------------+----------+--------------------+---------------+----------------+
only showing top 2 rows



In [28]:
data.orderBy(col("CustomerId").desc).show(2)

+--------------------+-------------------+--------------+----------+-------------------+-----------------+--------+
|             Address|          Birthdate|       Country|CustomerID|              Email|             Name|Username|
+--------------------+-------------------+--------------+----------+-------------------+-----------------+--------+
|6942 Connie Skywa...|1973-10-24 00:52:10|United Kingdom|     12989|amber97@hotmail.com|Brandon Contreras|  ecasey|
|79375 David Neck\...|1971-05-04 22:20:10|United Kingdom|     12988|  erica98@gmail.com|   Gabriel Romero| qknight|
+--------------------+-------------------+--------------+----------+-------------------+-----------------+--------+
only showing top 2 rows



In [29]:
data.orderBy(desc("CustomerId")).show(2)

+--------------------+-------------------+--------------+----------+-------------------+-----------------+--------+
|             Address|          Birthdate|       Country|CustomerID|              Email|             Name|Username|
+--------------------+-------------------+--------------+----------+-------------------+-----------------+--------+
|6942 Connie Skywa...|1973-10-24 00:52:10|United Kingdom|     12989|amber97@hotmail.com|Brandon Contreras|  ecasey|
|79375 David Neck\...|1971-05-04 22:20:10|United Kingdom|     12988|  erica98@gmail.com|   Gabriel Romero| qknight|
+--------------------+-------------------+--------------+----------+-------------------+-----------------+--------+
only showing top 2 rows



### Repartition

In [30]:
println("Num partotions: ", data.rdd.getNumPartitions)
val repartDf = data.repartition(5, col("Country"))
println("Num partotions after: ", repartDf.rdd.getNumPartitions)

(Num partotions: ,1)
(Num partotions after: ,5)


repartDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Address: string, Birthdate: string ... 5 more fields]


In [31]:
println("Num partotions after coalesce: ", repartDf.coalesce(3).rdd.getNumPartitions) //схлопывание

(Num partotions after coalesce: ,3)


### GroupBy, JOIN

In [32]:
data.groupBy(col("country"))
    .agg(count(lit(1)), max("customerId"))
    .show(5)

+---------+--------+---------------+
|  country|count(1)|max(customerId)|
+---------+--------+---------------+
|Australia|       8|          12434|
|  Austria|      11|          12865|
|  Bahrain|       2|          12355|
|  Belgium|      25|          12876|
|   Brazil|       1|          12769|
+---------+--------+---------------+
only showing top 5 rows



In [33]:
val retail = spark.read
    .format("json")
//     .schema - для задания схемы
    .option("mode", "FAILFAST")
    .load("retail_data.json")

retail: org.apache.spark.sql.DataFrame = [CustomerID: string, Description: string ... 5 more fields]


In [34]:
retail.show(2)

+----------+--------------------+---------------+---------+--------+---------+---------+
|CustomerID|         Description|    InvoiceDate|InvoiceNo|Quantity|StockCode|UnitPrice|
+----------+--------------------+---------------+---------+--------+---------+---------+
|     12346|MEDIUM CERAMIC TO...|1/18/2011 10:01|   541431|   74215|    23166|     1.04|
|     12346|MEDIUM CERAMIC TO...|1/18/2011 10:17|  C541433|  -74215|    23166|     1.04|
+----------+--------------------+---------------+---------+--------+---------+---------+
only showing top 2 rows



In [35]:
data.join(retail, "CustomerId", "left").show(2)

+----------+--------------------+-------------------+--------------+--------------------+-------------+----------------+--------------------+---------------+---------+--------+---------+---------+
|CustomerID|             Address|          Birthdate|       Country|               Email|         Name|        Username|         Description|    InvoiceDate|InvoiceNo|Quantity|StockCode|UnitPrice|
+----------+--------------------+-------------------+--------------+--------------------+-------------+----------------+--------------------+---------------+---------+--------+---------+---------+
|     12346|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|cooperalexis@hotm...|Lindsay Cowan|valenciajennifer|MEDIUM CERAMIC TO...|1/18/2011 10:17|  C541433|  -74215|    23166|     1.04|
|     12346|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|cooperalexis@hotm...|Lindsay Cowan|valenciajennifer|MEDIUM CERAMIC TO...|1/18/2011 10:01|   541431|   74215|    23166|     1.04|
+----------+---

In [36]:
data.join(retail, data("CustomerId") === retail("customerId"), "left").show(2) 
// оставит два столбца с CustomerId - может выдать ошибку если к ним обращаться.
// Если название столбцов одинаковое то использовать вариант выше

+--------------------+-------------------+--------------+----------+--------------------+-------------+----------------+----------+--------------------+---------------+---------+--------+---------+---------+
|             Address|          Birthdate|       Country|CustomerID|               Email|         Name|        Username|CustomerID|         Description|    InvoiceDate|InvoiceNo|Quantity|StockCode|UnitPrice|
+--------------------+-------------------+--------------+----------+--------------------+-------------+----------------+----------+--------------------+---------------+---------+--------+---------+---------+
|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|     12346|cooperalexis@hotm...|Lindsay Cowan|valenciajennifer|     12346|MEDIUM CERAMIC TO...|1/18/2011 10:17|  C541433|  -74215|    23166|     1.04|
|Unit 1047 Box 408...|1994-02-20 00:46:27|United Kingdom|     12346|cooperalexis@hotm...|Lindsay Cowan|valenciajennifer|     12346|MEDIUM CERAMIC TO...|1/18/2011 10:01|

-----------------------------------------

## RDD

In [37]:
case class Student(id : Int, name : String, surname : String, courses : Seq[Course])

case class Course(title: String)

object Student {

  def getStudentsSample : Seq[Student] = {
    Seq(
      Student(1, "John", "Doe", Seq(Course.sparkCourse, Course.scalaCourse)),
      Student(2, "Ivan", "Ivanov", Seq(Course.javaCourse)),
      Student(3, "Jack", "Toe", Seq(Course.javaCourse, Course.sparkCourse, Course.scalaCourse))
    )
  }
}

object Course {

  def sparkCourse: Course = Course("Spark")
  def scalaCourse: Course = Course("Scala")
  def javaCourse: Course = Course("Java")

}

defined class Student
defined class Course
defined object Student
defined object Course


In [38]:
import java.io.{File, PrintWriter}
import scala.util.Random

import java.io.{File, PrintWriter}
import scala.util.Random


In [39]:
val studentsRDD = spark.sparkContext.parallelize(Student.getStudentsSample)

studentsRDD: org.apache.spark.rdd.RDD[Student] = ParallelCollectionRDD[132] at parallelize at <console>:37


In [40]:
println("Filter John count: ", studentsRDD.filter(_.name == "John").count)
println("Filter Java course count: ", studentsRDD.filter(student => student.courses.contains(Course("Java"))).count)

(Filter John count: ,1)
(Filter Java course count: ,2)


In [41]:
studentsRDD.map(_.copy(name = "ModifiedName")).foreach(println)

Student(2,ModifiedName,Ivanov,List(Course(Java)))
Student(3,ModifiedName,Toe,List(Course(Java), Course(Spark), Course(Scala)))
Student(1,ModifiedName,Doe,List(Course(Spark), Course(Scala)))


In [42]:
studentsRDD.sortBy(_.surname).foreach(println)

Student(2,Ivan,Ivanov,List(Course(Java)))
Student(3,Jack,Toe,List(Course(Java), Course(Spark), Course(Scala)))
Student(1,John,Doe,List(Course(Spark), Course(Scala)))


In [43]:
// studentsRDD.foreachPartition(part => {
//       val randomFileName = new Random().nextInt()
//       val pw = new PrintWriter(new File(s"random-file-${randomFileName}.txt"))
//       while (part.hasNext) {
//         pw.write(part.next().toString)
//       }
//       pw.close()
//     })