# SparkSession
A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.
The entry point to programming Spark with the Dataset and DataFrame API.

In [1]:
from datetime import datetime
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

In [4]:
spark = (SparkSession.builder.appName("pyspark-sql-demo-{}".format(datetime.today()))
        .master("spark://spark-master:7077")      
        .getOrCreate())

sqlContext = SQLContext(spark)
# spark.sparkContext.getConf().getAll()

In [5]:
sc = spark.sparkContext
sc

# Truy vấn nâng cao

In [6]:
# load data
if "df_cars" in locals():
    df_cars.unpersist()
if "df_makers" in locals():
    df_makers.unpersist()
    
df_cars = spark.read \
         .format("csv") \
         .option("header", "true") \
         .load("s3a://warehouse/bronze/cars.csv")
            
df_makers = spark.read \
         .format("csv") \
         .option("header", "true") \
         .load("s3a://warehouse/bronze/makers.csv")        
            
# store as table
df_cars.cache()
df_makers.cache()
sqlContext.registerDataFrameAsTable(df_cars, "car")
sqlContext.registerDataFrameAsTable(df_makers, "maker")

In [7]:
# view schema and top data
print("# Cars schema")
df_cars.printSchema()
df_cars.show(5)

print("# Makers schema")
df_makers.printSchema()
df_makers.show(5)

# Cars schema
root
 |-- id: string (nullable = true)
 |-- car_name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- maker_id: string (nullable = true)

+---+------------+-------+--------+
| id|    car_name|  price|maker_id|
+---+------------+-------+--------+
|  1|          X5|5616.38|       2|
|  2|     Compass| 1837.4|       7|
|  3|   Excursion|1949.55|       2|
|  4|    Escalade|8539.64|       3|
|  5|Express 3500|2685.09|       4|
+---+------------+-------+--------+
only showing top 5 rows

# Makers schema
root
 |-- id: string (nullable = true)
 |-- maker_name: string (nullable = true)
 |-- years: string (nullable = true)

+---+----------+-----+
| id|maker_name|years|
+---+----------+-----+
|  1|   Porsche| 2011|
|  2|    Nissan| 2011|
|  3|     Dodge| 2008|
|  4|  Cadillac| 2006|
|  5|Land Rover| 2011|
+---+----------+-----+
only showing top 5 rows



# Truy vấn lồng
Truy vấn lồng là một câu truy vấn mà ở bên trong nội dung của nó có chứa một câu truy vấn con khác.
- Truy vấn lồng phân cấp: Khi nội dung của câu truy vấn con độc lập với câu truy vấn cha.
- Truy vấn lồng tương quan: Khi nội dung của câu truy vấn con phụ thuộc vào câu truy vấn cha.

## Đặt tại mệnh đề SELECT
Kết quả của câu truy vấn sẽ như là một giá trị của một thuộc tính.

In [8]:
# Với mỗi hãng xe, cho biết tên của hãng và số lượng xe tương ứng
df_sub_queries = sqlContext.sql("""
    SELECT maker_name AS Hang_Xe, (
        SELECT COUNT(*)
        FROM car
        WHERE car.maker_id = maker.id
    ) AS SL_XE
    FROM maker
""")
df_sub_queries.show()

+----------+-----+
|   Hang_Xe|SL_XE|
+----------+-----+
|   Porsche|   91|
|    Nissan|  116|
|     Dodge|   98|
|  Cadillac|  104|
|Land Rover|   82|
|     Mazda|  101|
|     Isuzu|  100|
|   Hyundai|  109|
|   Hyundai|   96|
| Chevrolet|  103|
+----------+-----+



In [9]:
df_sub_queries.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [maker_name#43 AS Hang_Xe#262, if (isnull(alwaysTrue#472)) 0 else count(1)#266L AS SL_XE#264L]
   +- BroadcastHashJoin [id#42], [maker_id#20], LeftOuter, BuildRight, false
      :- InMemoryTableScan [id#42, maker_name#43]
      :     +- InMemoryRelation [id#42, maker_name#43, years#44], StorageLevel(disk, memory, deserialized, 1 replicas)
      :           +- FileScan csv [id#42,maker_name#43,years#44] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://warehouse/bronze/makers.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,maker_name:string,years:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [plan_id=219]
         +- HashAggregate(keys=[maker_id#20], functions=[count(1)])
            +- Exchange hashpartitioning(maker_id#20, 200), ENSURE_REQUIREMENTS, [plan_id=216]
               +- HashAggregate(keys=

## Đặt tại mệnh đề FROM:
Kết quả của câu truy vấn sẽ xem như là một bảng dữ liệu, do vậy có thể truy vấn từ bảng dữ liệu này.

In [10]:
# Cho biết tên và giá mỗi xe của hãng Nissan
df_sub_queries = sqlContext.sql("""
    SELECT T.car_name, T.price AS price_usd
    FROM (
        SELECT *
        FROM car
        WHERE maker_id = 2
    ) AS T
""")
df_sub_queries.show(5)

+---------+---------+
| car_name|price_usd|
+---------+---------+
|       X5|  5616.38|
|Excursion|  1949.55|
|   Tacoma|  5696.24|
|    Tahoe|  7247.84|
|  M-Class|  8541.21|
+---------+---------+
only showing top 5 rows



In [11]:
df_sub_queries.explain()

== Physical Plan ==
*(1) Project [car_name#18, price#19 AS price_usd#509]
+- *(1) Filter (isnotnull(maker_id#20) AND (cast(maker_id#20 as int) = 2))
   +- InMemoryTableScan [car_name#18, maker_id#20, price#19], [isnotnull(maker_id#20), (cast(maker_id#20 as int) = 2)]
         +- InMemoryRelation [id#17, car_name#18, price#19, maker_id#20], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- FileScan csv [id#17,car_name#18,price#19,maker_id#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://warehouse/bronze/cars.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,car_name:string,price:string,maker_id:string>




## Đặt tại mệnh đề WHERE:
Kết quả của câu truy vấn được sử dụng như một thành phần trong biểu thức điều kiện.

In [12]:
# Cho biết những xe có giá lớn hơn xe có mã = 5
df_sub_queries = sqlContext.sql("""
    SELECT car_name, price
    FROM car
    WHERE price > (
        SELECT price
        FROM car
        WHERE id = 5
    )
""")
df_sub_queries.show(5)

+--------+-------+
|car_name|  price|
+--------+-------+
|      X5|5616.38|
|Escalade|8539.64|
|  Virage|6297.87|
|    RX-8|7033.46|
| Caravan|3101.49|
+--------+-------+
only showing top 5 rows



In [13]:
df_sub_queries.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (isnotnull(price#19) AND (price#19 > Subquery subquery#661, [id=#291]))
   :  +- Subquery subquery#661, [id=#291]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- Project [price#664]
   :           +- Filter (isnotnull(id#662) AND (cast(id#662 as int) = 5))
   :              +- InMemoryTableScan [id#662, price#664], [isnotnull(id#662), (cast(id#662 as int) = 5)]
   :                    +- InMemoryRelation [id#662, car_name#663, price#664, maker_id#665], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                          +- FileScan csv [id#17,car_name#18,price#19,maker_id#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://warehouse/bronze/cars.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,car_name:string,price:string,maker_id:string>
   +- InMemoryTableScan [car_name#18, price#19], [isnotnull(price#19), (price#19 > Subque

## Truy vấn lồng phân cấp với toán tử IN
Toán tử IN dùng để kiểm tra một giá trị có nằm trong một tập hợp nào đó hay không. Tập hợp đó có thể là kết quả của một câu truy vấn hoặc một tập hợp tường minh

In [14]:
# Cho biết các xe có giá nhỏ hơn 3000 USD
df_sub_queries = sqlContext.sql("""
    SELECT car_name, price
    FROM car
    WHERE id NOT IN (
        SELECT id
        FROM car
        WHERE price > 3000
    )
""")
df_sub_queries.show(5)

+------------+-------+
|    car_name|  price|
+------------+-------+
|     Compass| 1837.4|
|   Excursion|1949.55|
|Express 3500|2685.09|
|   Fleetwood|1016.99|
|   Cabriolet|1185.46|
+------------+-------+
only showing top 5 rows



In [15]:
df_sub_queries.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [car_name#18, price#19]
   +- BroadcastHashJoin [id#17], [id#1018], LeftAnti, BuildRight, true
      :- InMemoryTableScan [id#17, car_name#18, price#19]
      :     +- InMemoryRelation [id#17, car_name#18, price#19, maker_id#20], StorageLevel(disk, memory, deserialized, 1 replicas)
      :           +- FileScan csv [id#17,car_name#18,price#19,maker_id#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://warehouse/bronze/cars.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,car_name:string,price:string,maker_id:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),true), [plan_id=383]
         +- Project [id#1018]
            +- Filter (isnotnull(price#1020) AND (cast(price#1020 as int) > 3000))
               +- InMemoryTableScan [id#1018, price#1020], [isnotnull(price#1020), (cast(price#1020 as int) > 3000)]
       

## Truy vấn lồng tương quan với EXISTS

In [16]:
# Tìm xe không phải hãng Nissan
df_sub_queries = sqlContext.sql("""
    SELECT car_name
    FROM car
    WHERE NOT EXISTS (
        SELECT *
        FROM maker
        WHERE car.maker_id = maker.id
        AND maker.id = 5
    )
""")
df_sub_queries.show(5)

+------------+
|    car_name|
+------------+
|          X5|
|     Compass|
|   Excursion|
|    Escalade|
|Express 3500|
+------------+
only showing top 5 rows



In [17]:
df_sub_queries.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [car_name#18]
   +- BroadcastHashJoin [maker_id#20], [id#42], LeftAnti, BuildRight, false
      :- InMemoryTableScan [car_name#18, maker_id#20]
      :     +- InMemoryRelation [id#17, car_name#18, price#19, maker_id#20], StorageLevel(disk, memory, deserialized, 1 replicas)
      :           +- FileScan csv [id#17,car_name#18,price#19,maker_id#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://warehouse/bronze/cars.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,car_name:string,price:string,maker_id:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=464]
         +- Filter (isnotnull(id#42) AND (cast(id#42 as int) = 5))
            +- InMemoryTableScan [id#42], [isnotnull(id#42), (cast(id#42 as int) = 5)]
                  +- InMemoryRelation [id#42, maker_name#43, years#44], StorageLevel(disk,

## Ví dụ khác về truy vấn lồng tương quan

In [18]:
# Cho biết các xe có giá lớn hơn giá trung bình của hãng xe đó sản xuất
df_sub_queries = sqlContext.sql("""
    SELECT car_name, price
    FROM car AS car1
    WHERE car1.price > (
        SELECT AVG(car2.price)
        FROM car AS car2
        WHERE car2.maker_id = car1.maker_id
    )
""")
df_sub_queries.show(5)

+--------+-------+
|car_name|  price|
+--------+-------+
|      X5|5616.38|
|Escalade|8539.64|
|  Virage|6297.87|
|    RX-8|7033.46|
|      X3|9361.88|
+--------+-------+
only showing top 5 rows



In [19]:
df_sub_queries.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [car_name#18, price#19]
   +- BroadcastHashJoin [maker_id#20], [maker_id#1646], Inner, BuildRight, (cast(price#19 as double) > avg(price)#1642), false
      :- Filter (isnotnull(price#19) AND isnotnull(maker_id#20))
      :  +- InMemoryTableScan [car_name#18, price#19, maker_id#20], [isnotnull(price#19), isnotnull(maker_id#20)]
      :        +- InMemoryRelation [id#17, car_name#18, price#19, maker_id#20], StorageLevel(disk, memory, deserialized, 1 replicas)
      :              +- FileScan csv [id#17,car_name#18,price#19,maker_id#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://warehouse/bronze/cars.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,car_name:string,price:string,maker_id:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [plan_id=642]
         +- Filter isnotnull(avg(price)#1642)
       

In [20]:
# Cho biết các xe có giá cao nhất
df_sub_queries = sqlContext.sql("""
    SELECT car_name, price
    FROM car
    WHERE price = (
        SELECT MAX(price)
        FROM car
    )
""")
df_sub_queries.show(5)

+--------+-------+
|car_name|  price|
+--------+-------+
|     100|9986.79|
+--------+-------+



In [21]:
df_sub_queries.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (isnotnull(price#19) AND (price#19 = Subquery subquery#1970, [id=#698]))
   :  +- Subquery subquery#1970, [id=#698]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- SortAggregate(key=[], functions=[max(price#1975)])
   :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=696]
   :              +- SortAggregate(key=[], functions=[partial_max(price#1975)])
   :                 +- InMemoryTableScan [price#1975]
   :                       +- InMemoryRelation [id#1973, car_name#1974, price#1975, maker_id#1976], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                             +- FileScan csv [id#17,car_name#18,price#19,maker_id#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://warehouse/bronze/cars.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,car_name:string,price:string,maker_id:string>
   +- InMemory

In [22]:
# Cho biết các hãng sản xuất nhiều xe nhất
df_sub_queries = sqlContext.sql("""
    SELECT maker_id
    FROM car
    GROUP BY maker_id
    HAVING COUNT(*) = (
        SELECT MAX(SL_XE)
        FROM (
            SELECT COUNT(*) AS SL_XE
            FROM car
            GROUP BY maker_id
        )
    )
""")
df_sub_queries.show(5)

+--------+
|maker_id|
+--------+
|       2|
+--------+



In [23]:
df_sub_queries.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [maker_id#20]
   +- Filter (count(1)#2340L = Subquery subquery#2334, [id=#901])
      :  +- Subquery subquery#2334, [id=#901]
      :     +- AdaptiveSparkPlan isFinalPlan=false
      :        +- HashAggregate(keys=[], functions=[max(SL_XE#2333L)])
      :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=899]
      :              +- HashAggregate(keys=[], functions=[partial_max(SL_XE#2333L)])
      :                 +- HashAggregate(keys=[maker_id#2344], functions=[count(1)])
      :                    +- Exchange hashpartitioning(maker_id#2344, 200), ENSURE_REQUIREMENTS, [plan_id=895]
      :                       +- HashAggregate(keys=[maker_id#2344], functions=[partial_count(1)])
      :                          +- InMemoryTableScan [maker_id#2344]
      :                                +- InMemoryRelation [id#2341, car_name#2342, price#2343, maker_id#2344], StorageLevel(disk, memory, deserialized,

# Truy vấn khác

In [24]:
# create inventory by joining cars and makers
if "df_inventory" in locals():
    df_inventory.unpersist()
    
df_inventory = sqlContext.sql("""
    SELECT car_name, price, maker_name, years
    FROM car
    JOIN maker
    ON car.maker_id = maker.id
""")
df_inventory.show(5)

# store as table
df_inventory.cache()
sqlContext.registerDataFrameAsTable(df_inventory, "inventory")

+------------+-------+----------+-----+
|    car_name|  price|maker_name|years|
+------------+-------+----------+-----+
|          X5|5616.38|    Nissan| 2011|
|     Compass| 1837.4|     Isuzu| 1998|
|   Excursion|1949.55|    Nissan| 2011|
|    Escalade|8539.64|     Dodge| 2008|
|Express 3500|2685.09|  Cadillac| 2006|
+------------+-------+----------+-----+
only showing top 5 rows



In [25]:
df_inventory.explain()

== Physical Plan ==
InMemoryTableScan [car_name#18, price#19, maker_name#43, years#44]
   +- InMemoryRelation [car_name#18, price#19, maker_name#43, years#44], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(2) Project [car_name#18, price#19, maker_name#43, years#44]
            +- *(2) BroadcastHashJoin [maker_id#20], [id#42], Inner, BuildRight, false
               :- *(2) Filter isnotnull(maker_id#20)
               :  +- InMemoryTableScan [car_name#18, price#19, maker_id#20], [isnotnull(maker_id#20)]
               :        +- InMemoryRelation [id#17, car_name#18, price#19, maker_id#20], StorageLevel(disk, memory, deserialized, 1 replicas)
               :              +- FileScan csv [id#17,car_name#18,price#19,maker_id#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[s3a://warehouse/bronze/cars.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,car_name:string,price:string,maker_id:string>
         

## Tổng hợp sử dụng CUBE

In [26]:
# Tổng hợp tổng giá trị của các xe theo:
# - Tên xe và tên hãng
# - Từng tên xe
# - Từng hãng
# - Tổng giá trị

# Output:
# - Ranger, Land Rover, 7535.8: tổng giá trị xe Ranger, hãng Land Rover là 7535.8
# - Chariot, null, 7867.58: tổng giá trị xe Chariot, hãng bất kỳ là 7867.58
# - null, null, 5396393.689999999: tổng giá trị các xe từ các hãng là 5396393.689999999
# - null, Cadillac, 544548.9600000001: tổng giá trị từ hãng Cadillac là 544548.9600000001

df_compute = sqlContext.sql("""
    SELECT car_name, maker_name, SUM(price) AS TotalPrice
    FROM inventory
    GROUP BY car_name, maker_name WITH CUBE
""")
df_compute.show(10)

+-------------+----------+----------+
|     car_name|maker_name|TotalPrice|
+-------------+----------+----------+
|      Impreza|     Isuzu|   8235.85|
|       Blazer|   Hyundai|    3030.1|
|        Amigo|    Nissan|  14082.82|
|         MX-5|   Hyundai|  14614.16|
|      Tribute|     Mazda|   9091.44|
|     Ram 2500|    Nissan|   8612.41|
|        Ghost|   Hyundai|   5244.89|
|Suburban 1500|      null|   11556.9|
|        Talon|     Dodge|   6080.36|
|          MPV|     Dodge|    1492.7|
+-------------+----------+----------+
only showing top 10 rows



In [27]:
df_compute.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[car_name#3105, maker_name#3106, spark_grouping_id#3104L], functions=[sum(cast(price#19 as double))])
   +- Exchange hashpartitioning(car_name#3105, maker_name#3106, spark_grouping_id#3104L, 200), ENSURE_REQUIREMENTS, [plan_id=1086]
      +- HashAggregate(keys=[car_name#3105, maker_name#3106, spark_grouping_id#3104L], functions=[partial_sum(cast(price#19 as double))])
         +- Expand [[price#19, car_name#18, maker_name#43, 0], [price#19, car_name#18, null, 1], [price#19, null, maker_name#43, 2], [price#19, null, null, 3]], [price#19, car_name#3105, maker_name#3106, spark_grouping_id#3104L]
            +- InMemoryTableScan [price#19, car_name#18, maker_name#43]
                  +- InMemoryRelation [car_name#18, price#19, maker_name#43, years#44], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(2) Project [car_name#18, price#19, maker_name#43, years#44]
                    

## Tổng hợp sử dụng ROLLUP

In [28]:
# Tổng hợp tổng giá trị của các xe theo:
# - Tên xe và tên hãng
# - Từng tên xe
# - Tổng giá trị

# Output:
# - Ranger, Land Rover, 7535.8: tổng giá trị xe Ranger, hãng Land Rover là 7535.8
# - Chariot, null, 7867.58: tổng giá trị xe Chariot, hãng bất kỳ là 7867.58
# - null, null, 5396393.689999999: tổng giá trị các xe từ các hãng là 5396393.689999999

df_compute = sqlContext.sql("""
    SELECT car_name, maker_name, SUM(price) AS TotalPrice
    FROM inventory
    GROUP BY car_name, maker_name WITH ROLLUP
""")
df_compute.show(10)

+-------------+----------+----------+
|     car_name|maker_name|TotalPrice|
+-------------+----------+----------+
|      Impreza|     Isuzu|   8235.85|
|       Blazer|   Hyundai|    3030.1|
|        Amigo|    Nissan|  14082.82|
|         MX-5|   Hyundai|  14614.16|
|      Tribute|     Mazda|   9091.44|
|     Ram 2500|    Nissan|   8612.41|
|        Ghost|   Hyundai|   5244.89|
|Suburban 1500|      null|   11556.9|
|        Talon|     Dodge|   6080.36|
|          MPV|     Dodge|    1492.7|
+-------------+----------+----------+
only showing top 10 rows



In [29]:
df_compute.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[car_name#3327, maker_name#3328, spark_grouping_id#3326L], functions=[sum(cast(price#19 as double))])
   +- Exchange hashpartitioning(car_name#3327, maker_name#3328, spark_grouping_id#3326L, 200), ENSURE_REQUIREMENTS, [plan_id=1157]
      +- HashAggregate(keys=[car_name#3327, maker_name#3328, spark_grouping_id#3326L], functions=[partial_sum(cast(price#19 as double))])
         +- Expand [[price#19, car_name#18, maker_name#43, 0], [price#19, car_name#18, null, 1], [price#19, null, null, 3]], [price#19, car_name#3327, maker_name#3328, spark_grouping_id#3326L]
            +- InMemoryTableScan [price#19, car_name#18, maker_name#43]
                  +- InMemoryRelation [car_name#18, price#19, maker_name#43, years#44], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(2) Project [car_name#18, price#19, maker_name#43, years#44]
                           +- *(2) BroadcastHashJoin [ma