## Spark Session

In [1]:
from pyspark.sql import SparkSession

appName = "BDP Spark SQL Example"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark

# Spark Parquet Dataset

In [3]:
!hdfs dfs -mkdir -p /user/bigdatapedia/input/customer/parquet

In [4]:
!hdfs dfs -put /home/bigdatapedia/data/customer_parq.parquet /user/bigdatapedia/input/customer/parquet/

In [5]:
!hdfs dfs -ls -h /user/bigdatapedia/input/customer/parquet/

Found 1 items
-rw-r--r--   3 bigdatapedia supergroup    248.7 K 2025-03-29 03:45 /user/bigdatapedia/input/customer/parquet/customer_parq.parquet


In [6]:
df_cust = spark.read.parquet("/user/bigdatapedia/input/customer/parquet")

                                                                                

In [7]:
df_cust.show(5,0)

[Stage 1:>                                                          (0 + 1) / 1]

+-----------+--------------+--------------+--------------+-----------------+-----------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|customer_street        |customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+-----------------------+-------------+--------------+----------------+
|1          |Richard       |Hernandez     |XXXXXXXXX     |XXXXXXXXX        |6303 Heather Plaza     |Brownsville  |TX            |78521           |
|2          |Mary          |Barrett       |XXXXXXXXX     |XXXXXXXXX        |9526 Noble Embers Ridge|Littleton    |CO            |80126           |
|3          |Ann           |Smith         |XXXXXXXXX     |XXXXXXXXX        |3422 Blue Pioneer Bend |Caguas       |PR            |00725           |
|4          |Mary          |Jones         |XXXXXXXXX     |XXXXXXXXX        |8324 Little Common     |San Marcos   |CA  

                                                                                

#### Select

In [8]:
df_select = df_cust.select("customer_id", "customer_fname", "customer_lname", "customer_city", "customer_state")

In [9]:
df_select.show(5,0)

[Stage 2:>                                                          (0 + 1) / 1]

+-----------+--------------+--------------+-------------+--------------+
|customer_id|customer_fname|customer_lname|customer_city|customer_state|
+-----------+--------------+--------------+-------------+--------------+
|1          |Richard       |Hernandez     |Brownsville  |TX            |
|2          |Mary          |Barrett       |Littleton    |CO            |
|3          |Ann           |Smith         |Caguas       |PR            |
|4          |Mary          |Jones         |San Marcos   |CA            |
|5          |Robert        |Hudson        |Caguas       |PR            |
+-----------+--------------+--------------+-------------+--------------+
only showing top 5 rows



                                                                                

# Spark ORC Dataset

In [18]:
!hdfs dfs -mkdir -p /user/bigdatapedia/input/order/orc

In [19]:
!hdfs dfs -put /home/bigdatapedia/data/neworders.snappy.orc /user/bigdatapedia/input/order/orc/

In [20]:
!hdfs dfs -ls -h /user/bigdatapedia/input/order/orc/

Found 1 items
-rw-r--r--   3 bigdatapedia supergroup    181.5 K 2025-03-29 04:04 /user/bigdatapedia/input/order/orc/neworders.snappy.orc


DataFrame[order_id: int, order_date: timestamp, order_customer_id: int, order_status: string]

In [21]:
df_order = spark.read.orc("/user/bigdatapedia/input/order/orc")

In [22]:
df_order.show(5,0)

[Stage 6:>                                                          (0 + 1) / 1]

+--------+-------------------+-----------------+---------------+
|order_id|order_date         |order_customer_id|order_status   |
+--------+-------------------+-----------------+---------------+
|1       |2013-07-25 00:00:00|11599            |CLOSED         |
|2       |2013-07-25 00:00:00|256              |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00|12111            |COMPLETE       |
|4       |2013-07-25 00:00:00|8827             |CLOSED         |
|5       |2013-07-25 00:00:00|11318            |COMPLETE       |
+--------+-------------------+-----------------+---------------+
only showing top 5 rows



                                                                                

# DF Actions

In [None]:
# df -> t1 -> t2 -> t3 -> .... -> action

# 100 gb df source -> filter (50 gb) -> join (20 gb) -> filter (5 gb) -> group by (1 gb) -> limit (1 mb target) -> action

In [52]:
df_select = df_cust. \
select("customer_id", "customer_fname", "customer_lname", "customer_city", "customer_state"). \
where("customer_state = 'CA'"). \
sort("customer_id").limit(5)

In [50]:
df_select.show(5, 0)

+-----------+--------------+--------------+-------------+--------------+
|customer_id|customer_fname|customer_lname|customer_city|customer_state|
+-----------+--------------+--------------+-------------+--------------+
|4          |Mary          |Jones         |San Marcos   |CA            |
|14         |Katherine     |Smith         |Pico Rivera  |CA            |
|15         |Jane          |Luna          |Fontana      |CA            |
|18         |Robert        |Smith         |Martinez     |CA            |
|35         |Margaret      |Wright        |Oceanside    |CA            |
+-----------+--------------+--------------+-------------+--------------+



### show

In [53]:
df_select.show(5, 0)

+-----------+--------------+--------------+-------------+--------------+
|customer_id|customer_fname|customer_lname|customer_city|customer_state|
+-----------+--------------+--------------+-------------+--------------+
|4          |Mary          |Jones         |San Marcos   |CA            |
|14         |Katherine     |Smith         |Pico Rivera  |CA            |
|15         |Jane          |Luna          |Fontana      |CA            |
|18         |Robert        |Smith         |Martinez     |CA            |
|35         |Margaret      |Wright        |Oceanside    |CA            |
+-----------+--------------+--------------+-------------+--------------+



### count

In [54]:
df_select.count()

5

### collect

In [55]:
df_select.collect()

[Row(customer_id=4, customer_fname='Mary', customer_lname='Jones', customer_city='San Marcos', customer_state='CA'),
 Row(customer_id=14, customer_fname='Katherine', customer_lname='Smith', customer_city='Pico Rivera', customer_state='CA'),
 Row(customer_id=15, customer_fname='Jane', customer_lname='Luna', customer_city='Fontana', customer_state='CA'),
 Row(customer_id=18, customer_fname='Robert', customer_lname='Smith', customer_city='Martinez', customer_state='CA'),
 Row(customer_id=35, customer_fname='Margaret', customer_lname='Wright', customer_city='Oceanside', customer_state='CA')]

### head

In [56]:
df_select.head()

Row(customer_id=4, customer_fname='Mary', customer_lname='Jones', customer_city='San Marcos', customer_state='CA')

In [57]:
df_select.head(2)

[Row(customer_id=4, customer_fname='Mary', customer_lname='Jones', customer_city='San Marcos', customer_state='CA'),
 Row(customer_id=14, customer_fname='Katherine', customer_lname='Smith', customer_city='Pico Rivera', customer_state='CA')]

### tail

In [59]:
df_select.tail(2)

[Row(customer_id=18, customer_fname='Robert', customer_lname='Smith', customer_city='Martinez', customer_state='CA'),
 Row(customer_id=35, customer_fname='Margaret', customer_lname='Wright', customer_city='Oceanside', customer_state='CA')]

### take

In [61]:
df_select.take(3)

[Row(customer_id=4, customer_fname='Mary', customer_lname='Jones', customer_city='San Marcos', customer_state='CA'),
 Row(customer_id=14, customer_fname='Katherine', customer_lname='Smith', customer_city='Pico Rivera', customer_state='CA'),
 Row(customer_id=15, customer_fname='Jane', customer_lname='Luna', customer_city='Fontana', customer_state='CA')]

### corr

In [64]:
df_select.corr("customer_id", "customer_id")

1.0

### cov

In [65]:
df_select.cov("customer_id", "customer_id")

126.7

### first

In [66]:
df_select.first()

Row(customer_id=4, customer_fname='Mary', customer_lname='Jones', customer_city='San Marcos', customer_state='CA')

### write

In [68]:
df_select.write.json("/user/bigdatapedia/output/json")

                                                                                

In [69]:
!hdfs dfs -ls /user/bigdatapedia/output/json

Found 2 items
-rw-r--r--   3 bigdatapedia supergroup          0 2025-03-29 04:51 /user/bigdatapedia/output/json/_SUCCESS
-rw-r--r--   3 bigdatapedia supergroup        600 2025-03-29 04:51 /user/bigdatapedia/output/json/part-00000-ac466b75-b57c-4ffd-9630-6232d7100da9-c000.json


In [70]:
!hdfs dfs -cat /user/bigdatapedia/output/json/*.json

{"customer_id":4,"customer_fname":"Mary","customer_lname":"Jones","customer_city":"San Marcos","customer_state":"CA"}
{"customer_id":14,"customer_fname":"Katherine","customer_lname":"Smith","customer_city":"Pico Rivera","customer_state":"CA"}
{"customer_id":15,"customer_fname":"Jane","customer_lname":"Luna","customer_city":"Fontana","customer_state":"CA"}
{"customer_id":18,"customer_fname":"Robert","customer_lname":"Smith","customer_city":"Martinez","customer_state":"CA"}
{"customer_id":35,"customer_fname":"Margaret","customer_lname":"Wright","customer_city":"Oceanside","customer_state":"CA"}


# Spark SQL

In [10]:
df_select.createOrReplaceTempView("customer")

In [17]:
spark.sql("show tables").show()

25/03/29 04:00:49 WARN conf.HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist


+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        | customer|       true|
+--------+---------+-----------+



In [12]:
spark.sql("select * from customer").show(5,0)

+-----------+--------------+--------------+-------------+--------------+
|customer_id|customer_fname|customer_lname|customer_city|customer_state|
+-----------+--------------+--------------+-------------+--------------+
|1          |Richard       |Hernandez     |Brownsville  |TX            |
|2          |Mary          |Barrett       |Littleton    |CO            |
|3          |Ann           |Smith         |Caguas       |PR            |
|4          |Mary          |Jones         |San Marcos   |CA            |
|5          |Robert        |Hudson        |Caguas       |PR            |
+-----------+--------------+--------------+-------------+--------------+
only showing top 5 rows



In [13]:
df_select.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [customer_id#0,customer_fname#1,customer_lname#2,customer_city#6,customer_state#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[hdfs://hdfs-bigdatapedia:9000/user/bigdatapedia/input/customer/parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<customer_id:int,customer_fname:string,customer_lname:string,customer_city:string,customer_...




In [14]:
spark.sql("select * from customer").explain()

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [customer_id#0,customer_fname#1,customer_lname#2,customer_city#6,customer_state#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[hdfs://hdfs-bigdatapedia:9000/user/bigdatapedia/input/customer/parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<customer_id:int,customer_fname:string,customer_lname:string,customer_city:string,customer_...




### Join

In [26]:
df_order.createOrReplaceTempView("order")

In [None]:
df_join = df_select.join(df_order, df_select["customer_id"] == df_order["order_customer_id"], "inner")

In [28]:
spark.sql("""
    select * from customer c
        inner join order o
        on c.customer_id = o.order_customer_id""").show(5,0)

[Stage 14:>                                                         (0 + 1) / 1]

+-----------+--------------+--------------+-------------+--------------+--------+-------------------+-----------------+------------+
|customer_id|customer_fname|customer_lname|customer_city|customer_state|order_id|order_date         |order_customer_id|order_status|
+-----------+--------------+--------------+-------------+--------------+--------+-------------------+-----------------+------------+
|148        |Stephanie     |Richards      |Caguas       |PR            |15061   |2013-10-28 00:00:00|148              |CLOSED      |
|148        |Stephanie     |Richards      |Caguas       |PR            |59569   |2013-10-03 00:00:00|148              |COMPLETE    |
|148        |Stephanie     |Richards      |Caguas       |PR            |61124   |2013-12-02 00:00:00|148              |CLOSED      |
|463        |Harry         |Smith         |Caguas       |PR            |6857    |2013-09-06 00:00:00|463              |COMPLETE    |
|463        |Harry         |Smith         |Caguas       |PR          

                                                                                

In [29]:
spark.sql("""
    select * from customer c
        inner join order o
        on c.customer_id = o.order_customer_id""").explain()

== Physical Plan ==
*(5) SortMergeJoin [customer_id#0], [order_customer_id#198], Inner
:- *(2) Sort [customer_id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(customer_id#0, 200), ENSURE_REQUIREMENTS, [id=#428]
:     +- *(1) Filter isnotnull(customer_id#0)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet [customer_id#0,customer_fname#1,customer_lname#2,customer_city#6,customer_state#7] Batched: true, DataFilters: [isnotnull(customer_id#0)], Format: Parquet, Location: InMemoryFileIndex[hdfs://hdfs-bigdatapedia:9000/user/bigdatapedia/input/customer/parquet], PartitionFilters: [], PushedFilters: [IsNotNull(customer_id)], ReadSchema: struct<customer_id:int,customer_fname:string,customer_lname:string,customer_city:string,customer_...
+- *(4) Sort [order_customer_id#198 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(order_customer_id#198, 200), ENSURE_REQUIREMENTS, [id=#437]
      +- *(3) Filter isnotnull(order_customer_id#198)
         +- *(3) Column

In [23]:
df_join = df_select.join(df_order, df_select["customer_id"] == df_order["order_customer_id"], "inner")

In [24]:
df_join.show(5,0)

[Stage 9:>                                                          (0 + 1) / 1]

+-----------+--------------+--------------+-------------+--------------+--------+-------------------+-----------------+------------+
|customer_id|customer_fname|customer_lname|customer_city|customer_state|order_id|order_date         |order_customer_id|order_status|
+-----------+--------------+--------------+-------------+--------------+--------+-------------------+-----------------+------------+
|148        |Stephanie     |Richards      |Caguas       |PR            |15061   |2013-10-28 00:00:00|148              |CLOSED      |
|148        |Stephanie     |Richards      |Caguas       |PR            |59569   |2013-10-03 00:00:00|148              |COMPLETE    |
|148        |Stephanie     |Richards      |Caguas       |PR            |61124   |2013-12-02 00:00:00|148              |CLOSED      |
|463        |Harry         |Smith         |Caguas       |PR            |6857    |2013-09-06 00:00:00|463              |COMPLETE    |
|463        |Harry         |Smith         |Caguas       |PR          

                                                                                

In [25]:
df_join.explain()

== Physical Plan ==
*(5) SortMergeJoin [customer_id#0], [order_customer_id#198], Inner
:- *(2) Sort [customer_id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(customer_id#0, 200), ENSURE_REQUIREMENTS, [id=#230]
:     +- *(1) Filter isnotnull(customer_id#0)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet [customer_id#0,customer_fname#1,customer_lname#2,customer_city#6,customer_state#7] Batched: true, DataFilters: [isnotnull(customer_id#0)], Format: Parquet, Location: InMemoryFileIndex[hdfs://hdfs-bigdatapedia:9000/user/bigdatapedia/input/customer/parquet], PartitionFilters: [], PushedFilters: [IsNotNull(customer_id)], ReadSchema: struct<customer_id:int,customer_fname:string,customer_lname:string,customer_city:string,customer_...
+- *(4) Sort [order_customer_id#198 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(order_customer_id#198, 200), ENSURE_REQUIREMENTS, [id=#239]
      +- *(3) Filter isnotnull(order_customer_id#198)
         +- *(3) Column

In [25]:
df_join.explain()

== Physical Plan ==
*(5) SortMergeJoin [customer_id#0], [order_customer_id#198], Inner
:- *(2) Sort [customer_id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(customer_id#0, 200), ENSURE_REQUIREMENTS, [id=#230]
:     +- *(1) Filter isnotnull(customer_id#0)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet [customer_id#0,customer_fname#1,customer_lname#2,customer_city#6,customer_state#7] Batched: true, DataFilters: [isnotnull(customer_id#0)], Format: Parquet, Location: InMemoryFileIndex[hdfs://hdfs-bigdatapedia:9000/user/bigdatapedia/input/customer/parquet], PartitionFilters: [], PushedFilters: [IsNotNull(customer_id)], ReadSchema: struct<customer_id:int,customer_fname:string,customer_lname:string,customer_city:string,customer_...
+- *(4) Sort [order_customer_id#198 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(order_customer_id#198, 200), ENSURE_REQUIREMENTS, [id=#239]
      +- *(3) Filter isnotnull(order_customer_id#198)
         +- *(3) Column

### Group By

In [30]:
spark.sql("""
    select customer_city, count(*) 
        from customer 
        group by customer_city""").show(10,0)

                                                                                

+---------------+--------+
|customer_city  |count(1)|
+---------------+--------+
|Hanover        |9       |
|Caguas         |4584    |
|Corona         |25      |
|Tempe          |35      |
|Bowling Green  |8       |
|Springfield    |3       |
|Lawrenceville  |12      |
|North Las Vegas|12      |
|Palatine       |8       |
|Phoenix        |64      |
+---------------+--------+
only showing top 10 rows



In [32]:
df_group = df_cust.groupby("customer_city").count()

In [33]:
df_group.show(10,0)

                                                                                

+---------------+-----+
|customer_city  |count|
+---------------+-----+
|Hanover        |9    |
|Caguas         |4584 |
|Corona         |25   |
|Tempe          |35   |
|Bowling Green  |8    |
|Springfield    |3    |
|Lawrenceville  |12   |
|North Las Vegas|12   |
|Palatine       |8    |
|Phoenix        |64   |
+---------------+-----+
only showing top 10 rows



In [35]:
spark.sql("""
    select customer_city, count(*) 
        from customer 
        group by customer_city""").explain()

== Physical Plan ==
*(2) HashAggregate(keys=[customer_city#6], functions=[count(1)])
+- Exchange hashpartitioning(customer_city#6, 200), ENSURE_REQUIREMENTS, [id=#548]
   +- *(1) HashAggregate(keys=[customer_city#6], functions=[partial_count(1)])
      +- *(1) ColumnarToRow
         +- FileScan parquet [customer_city#6] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[hdfs://hdfs-bigdatapedia:9000/user/bigdatapedia/input/customer/parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<customer_city:string>




In [34]:
df_group.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[customer_city#6], functions=[count(1)])
+- Exchange hashpartitioning(customer_city#6, 200), ENSURE_REQUIREMENTS, [id=#524]
   +- *(1) HashAggregate(keys=[customer_city#6], functions=[partial_count(1)])
      +- *(1) ColumnarToRow
         +- FileScan parquet [customer_city#6] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[hdfs://hdfs-bigdatapedia:9000/user/bigdatapedia/input/customer/parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<customer_city:string>




### UDF

In [36]:
def upperConvert(x):
    return x.upper()

In [37]:
upperConvert("hello")

'HELLO'

In [38]:
from pyspark.sql.types import StringType

In [40]:
spark.udf.register("upper_udf", upperConvert, StringType())

<function __main__.upperConvert(x)>

In [42]:
spark.sql("select upper_udf(customer_fname) as customer_first from customer").show(5,0)

[Stage 25:>                                                         (0 + 1) / 1]

+--------------+
|customer_first|
+--------------+
|RICHARD       |
|MARY          |
|ANN           |
|MARY          |
|ROBERT        |
+--------------+
only showing top 5 rows



                                                                                