In [1]:
val df = spark.read.format("json")
  .load("./Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")

df = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

In [2]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



# 綱要

In [3]:
spark.read.format("json").load("./Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json").schema

StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))

In [4]:
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata

In [5]:
val myManualSchema = StructType(Array(
  StructField("DEST_COUNTRY_NAME", StringType, true),
  StructField("ORIGIN_COUNTRY_NAME", StringType, true),
  StructField("count", LongType, false,
    Metadata.fromJson("{\"hello\":\"world\"}"))
))

myManualSchema = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,false))


StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,false))

In [6]:
val df = spark.read.format("json").schema(myManualSchema)
  .load("./Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")

df = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

# 欄位與表達式

## column

In [7]:
import org.apache.spark.sql.functions.{col, column}
col("someColumnName")
column("someColumnName")

someColumnName

In [8]:
$"myColumn"
'myColumn

'myColumn

In [9]:
df.col("count")

count

In [10]:
(((col("someCol") + 5) * 200) - 6) < col("otherCol")

((((someCol + 5) * 200) - 6) < otherCol)

In [11]:
import org.apache.spark.sql.functions.expr
expr("(((someCol + 5) * 200) - 6) < otherCol")

((((someCol + 5) * 200) - 6) < otherCol)

## 存取Dataframe的欄位

In [12]:
spark.read.format("json").load("./Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")
  .columns


Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count)

# 紀錄與Row

In [13]:
df.first()

[United States,Romania,15]

## 建立Row物件

In [38]:
import org.apache.spark.sql.Row
val myRow = Row("Hello", null, 1, false)

myRow = [Hello,null,1,false]


[Hello,null,1,false]

In [39]:
myRow(0) // type Any

Hello

In [41]:
myRow.size() // String

Name: Unknown Error
Message: lastException: Throwable = null
<console>:34: error: Int does not take parameters
       myRow.size() // String
                 ^

StackTrace: 

In [42]:
myRow.getString(0) // String

Hello

In [43]:
myRow.getInt(2) // Int

1

# DataFrame轉換操作

## 建立DataFrame

In [27]:
val df = spark.read.format("json")
  .load("./Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")

df = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

In [28]:
df.createOrReplaceTempView("dfTable")

In [45]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}

In [60]:
val myManualSchema = new StructType(Array(
  new StructField("some", StringType, true),
  new StructField("col", StringType, true),
  new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))

myManualSchema = StructType(StructField(some,StringType,true), StructField(col,StringType,true), StructField(names,LongType,false))
myRows = List([Hello,null,1])


lastException: Throwable = null


List([Hello,null,1])

In [61]:
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)

myRDD = ParallelCollectionRDD[53] at parallelize at <console>:40
myDf = [some: string, col: string ... 1 more field]


[some: string, col: string ... 1 more field]

In [62]:
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+



In [49]:
val myDF = Seq(("Hello", 2, null)).toDF("col1", "col2", "col3") 

myDF = [col1: string, col2: int ... 1 more field]


[col1: string, col2: int ... 1 more field]

## select 和 selectExpr

In [35]:
SELECT DEST_COUNTRY_NAME FROM dfTable 

Name: Unknown Error
Message: <console>:39: error: not found: value SELECT
       SELECT DEST_COUNTRY_NAME FROM dfTable ;
       ^
<console>:39: error: not found: value FROM
       SELECT DEST_COUNTRY_NAME FROM dfTable ;
                                ^

StackTrace: 

In [63]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [74]:
SELECT DEST_COUNTRY_NAME FROM df LIMIT 2

Name: Unknown Error
Message: <console>:1: error: ';' expected but integer literal found.
SELECT DEST_COUNTRY_NAME FROM df LIMIT 2
                                       ^

StackTrace: 

In [64]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [76]:
import org.apache.spark.sql.functions.{expr, col, column}
df.select(
    df.col("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"),
    'DEST_COUNTRY_NAME,
    $"DEST_COUNTRY_NAME",
    expr("DEST_COUNTRY_NAME"))
  .show(2)

+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|    United States|    United States|    United States|
|    United States|    United States|    United States|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
only showing top 2 rows



In [77]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [78]:
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))
  .show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [79]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [85]:
df.selectExpr(
    "*", // include all original columns
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)
  


+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [87]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show()

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



## 轉換成Sark型別(Literal)

In [3]:
import org.apache.spark.sql.functions.lit
df.select(expr("*"), lit(1).as("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [4]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [5]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
  .show(10)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
|            Egypt|      United States|   15|        false|
|    United States|              India|   62|        false|
|    United States|          Singapore|    1|        false|
|    United States|            Grenada|   62|        false|
|       Costa Rica|      United States|  588|        false|
|          Senegal|      United States|   40|        false|
|          Moldova|      United States|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 10 rows



In [6]:
df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns

Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count, Destination)

## 重新命名欄位

In [7]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns


Array(dest, ORIGIN_COUNTRY_NAME, count)

## 保留字與關鍵字

In [8]:
import org.apache.spark.sql.functions.expr

In [9]:
val dfWithLongColName = df.withColumn(
  "This Long Column-Name",
  expr("ORIGIN_COUNTRY_NAME"))

dfWithLongColName = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 2 more fields]


[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 2 more fields]

In [15]:
dfWithLongColName.columns

Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count, This Long Column-Name)

In [11]:
dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`")
  .show()

+---------------------+----------------+
|This Long Column-Name|         new col|
+---------------------+----------------+
|              Romania|         Romania|
|              Croatia|         Croatia|
|              Ireland|         Ireland|
|        United States|   United States|
|                India|           India|
|            Singapore|       Singapore|
|              Grenada|         Grenada|
|        United States|   United States|
|        United States|   United States|
|        United States|   United States|
|         Sint Maarten|    Sint Maarten|
|     Marshall Islands|Marshall Islands|
|        United States|   United States|
|        United States|   United States|
|        United States|   United States|
|        United States|   United States|
|             Paraguay|        Paraguay|
|        United States|   United States|
|        United States|   United States|
|            Gibraltar|       Gibraltar|
+---------------------+----------------+
only showing top

In [12]:
dfWithLongColName.createOrReplaceTempView("dfTableLong")

In [13]:
dfWithLongColName.select(col("This Long Column-Name")).columns


Array(This Long Column-Name)

## 刪除欄位

In [24]:
dfWithLongColName.columns

Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count, This Long Column-Name)

In [14]:
df.drop("ORIGIN_COUNTRY_NAME").columns


Array(DEST_COUNTRY_NAME, count)

In [25]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns

Array(count, This Long Column-Name)

In [None]:
df.withColumn("count2", col("count").cast("long"))

In [None]:
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)

In [None]:
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")
  .show(2)

In [None]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

In [None]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

In [None]:
val seed = 5
val withReplacement = false
val fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

In [None]:
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
dataFrames(0).count() > dataFrames(1).count() // False

In [None]:
import org.apache.spark.sql.Row
val schema = df.schema

In [None]:
val newRows = Seq(
  Row("New Country", "Other Country", 5L),
  Row("New Country 2", "Other Country 3", 1L)
)

In [None]:
val parallelizedRows = spark.sparkContext.parallelize(newRows)

In [None]:
val newDF = spark.createDataFrame(parallelizedRows, schema)

In [None]:
df.union(newDF)
  .where("count = 1")
  .where($"ORIGIN_COUNTRY_NAME" =!= "United States")
  .show() // get all of them and we'll see our new rows at the end

In [None]:
df.sort("count").show(5)

In [None]:
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

In [None]:
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

In [None]:
import org.apache.spark.sql.functions.{desc, asc}

In [None]:
df.orderBy(expr("count desc")).show(2)

In [None]:
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(2)

In [None]:
spark.read.format("json").load("./Spark-The-Definitive-Guide-master/data/flight-data/json/*-summary.json")
  .sortWithinPartitions("count")

In [None]:
df.limit(5).show()

In [None]:
df.orderBy(expr("count desc")).limit(6).show()

In [None]:
df.rdd.getNumPartitions // 1

In [None]:
df.repartition(5)

In [None]:
df.repartition(col("DEST_COUNTRY_NAME"))

In [None]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

In [None]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

In [None]:
val collectDF = df.limit(10)

In [None]:
collectDF.take(5) // take works with an Integer count

In [None]:
collectDF.show() // this prints it out nicely

In [None]:
collectDF.show(5, false)

In [None]:
collectDF.collect()

In [None]:
collectDF.toLocalIterator()