# Spark Dataframe and Spark SQL

* Demo between Spark Dataframe and Spark RDD
* Spark Dataframe Operations
* Spark SQL Operations
* Save dataframe into MinIO
    * Write a dataframe by Delta Lake format
    * Write a dataframe by Parquet format

In [90]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, count, max, avg, asc, desc, col, date_format, round

In [76]:
spark = SparkSession.builder \
    .appName('Ingest checkin table into bronze') \
    .master('spark://spark-master:7077') \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "100") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config('spark.sql.warehouse.dir', f's3a://lakehouse/') \
    .enableHiveSupport() \
    .getOrCreate()

In [77]:
spark

## Demo between Spark Dataframe and Spark RDD

### 1. Spark RDD

In [4]:
text_rdd = spark.sparkContext.textFile("s3a://lakehouse/test_csv/data.txt")
text_rdd.collect()

['Apache Spark has its architectural foundation in the resilient distributed dataset (RDD), a read-only multiset of data items distributed over a cluster of machines, that is maintained in a fault-tolerant way. The Dataframe API was released as an abstraction on top of the RDD, followed by the Dataset API. In Spark 1.x, the RDD was the primary application programming interface (API), but as of Spark 2.x use of the Dataset API is encouraged even though the RDD API is not deprecated. The RDD technology still underlies the Dataset API.',
 '',
 "Spark and its RDDs were developed in 2012 in response to limitations in the MapReduce cluster computing paradigm, which forces a particular linear dataflow structure on distributed programs: MapReduce programs read input data from disk, map a function across the data, reduce the results of the map, and store reduction results on disk. Spark's RDDs function as a working set for distributed programs that offers a (deliberately) restricted form of dis

In [5]:
seqFunc = (lambda x, y: x+y)
combFunc = (lambda c1, c2: c1+c2)

wordcount_rdd = text_rdd.flatMap(lambda line: line.split(' '))\
                        .map(lambda word: (word, 1))\
                        .aggregateByKey(0, seqFunc, combFunc)\
                        .sortByKey(False)

for word in wordcount_rdd.take(20):
    print(f"{word[0]}: {word[1]}")

working: 1
which: 1
were: 1
way.: 1
was: 2
use: 1
underlies: 1
top: 1
to: 1
though: 1
the: 12
that: 2
technology: 1
structure: 1
store: 1
still: 1
shared: 1
set: 1
results: 2
restricted: 1


### 2. Spark Dataframe

In [6]:
text_df = spark.read.text("s3a://lakehouse/test_csv/data.txt")
columns = ['text']
text_df = text_df.toDF(*columns)
text_df.show()

+--------------------+
|                text|
+--------------------+
|Apache Spark has ...|
|                    |
|Spark and its RDD...|
+--------------------+



In [7]:
wordcount_df = text_df.selectExpr("explode(split(text, ' ')) as word")\
                    .groupBy('word')\
                    .agg(count('word').alias('count'))\
                    .sort(desc('word'))
wordcount_df.show(20)

+----------+-----+
|      word|count|
+----------+-----+
|   working|    1|
|     which|    1|
|      were|    1|
|      way.|    1|
|       was|    2|
|       use|    1|
| underlies|    1|
|       top|    1|
|        to|    1|
|    though|    1|
|       the|   12|
|      that|    2|
|technology|    1|
| structure|    1|
|     store|    1|
|     still|    1|
|    shared|    1|
|       set|    1|
|   results|    2|
|restricted|    1|
+----------+-----+
only showing top 20 rows



## Spark Dataframe Operations

### 1. Loading data from MinIO

In [57]:
customer_df = spark.read.csv("s3a://lakehouse/test_csv/customer.csv", header=True, inferSchema=True, samplingRatio=0.1)
print("Number of rows: ", customer_df.count())
customer_df.show(5)

Number of rows:  599
+-----------+--------+----------+---------+--------------------+----------+----------+-------------------+-------------------+------+
|customer_id|store_id|first_name|last_name|               email|address_id|activebool|        create_date|        last_update|active|
+-----------+--------+----------+---------+--------------------+----------+----------+-------------------+-------------------+------+
|          1|       1|      MARY|    SMITH|MARY.SMITH@sakila...|         5|         t|2006-02-14 00:00:00|2006-02-15 09:57:20|     1|
|          2|       1|  PATRICIA|  JOHNSON|PATRICIA.JOHNSON@...|         6|         t|2006-02-14 00:00:00|2006-02-15 09:57:20|     1|
|          3|       1|     LINDA| WILLIAMS|LINDA.WILLIAMS@sa...|         7|         t|2006-02-14 00:00:00|2006-02-15 09:57:20|     1|
|          4|       2|   BARBARA|    JONES|BARBARA.JONES@sak...|         8|         t|2006-02-14 00:00:00|2006-02-15 09:57:20|     1|
|          5|       1| ELIZABETH|    BROW

In [9]:
customer_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address_id: integer (nullable = true)
 |-- activebool: string (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- active: integer (nullable = true)



### 2. Selecting columns

In [10]:
## TODO: selecting columns by select()
temp_df = customer_df.select("customer_id", "email", "address_id")
temp_df.show()

+-----------+--------------------+----------+
|customer_id|               email|address_id|
+-----------+--------------------+----------+
|          1|MARY.SMITH@sakila...|         5|
|          2|PATRICIA.JOHNSON@...|         6|
|          3|LINDA.WILLIAMS@sa...|         7|
|          4|BARBARA.JONES@sak...|         8|
|          5|ELIZABETH.BROWN@s...|         9|
|          6|JENNIFER.DAVIS@sa...|        10|
|          7|MARIA.MILLER@saki...|        11|
|          8|SUSAN.WILSON@saki...|        12|
|          9|MARGARET.MOORE@sa...|        13|
|         10|DOROTHY.TAYLOR@sa...|        14|
|         11|LISA.ANDERSON@sak...|        15|
|         12|NANCY.THOMAS@saki...|        16|
|         13|KAREN.JACKSON@sak...|        17|
|         14|BETTY.WHITE@sakil...|        18|
|         15|HELEN.HARRIS@saki...|        19|
|         16|SANDRA.MARTIN@sak...|        20|
|         17|DONNA.THOMPSON@sa...|        21|
|         18|CAROL.GARCIA@saki...|        22|
|         19|RUTH.MARTINEZ@sak...|

### 3. Filtering records following conditions

In [11]:
## TODO: filtering records by filter()
temp_df = customer_df.filter(customer_df.active==1).orderBy(col("address_id").asc())
print("Number of rows: ", temp_df.count())
temp_df.show(5)

Number of rows:  584
+-----------+--------+----------+---------+--------------------+----------+----------+-------------------+-------------------+------+
|customer_id|store_id|first_name|last_name|               email|address_id|activebool|        create_date|        last_update|active|
+-----------+--------+----------+---------+--------------------+----------+----------+-------------------+-------------------+------+
|          1|       1|      MARY|    SMITH|MARY.SMITH@sakila...|         5|         t|2006-02-14 00:00:00|2006-02-15 09:57:20|     1|
|          2|       1|  PATRICIA|  JOHNSON|PATRICIA.JOHNSON@...|         6|         t|2006-02-14 00:00:00|2006-02-15 09:57:20|     1|
|          3|       1|     LINDA| WILLIAMS|LINDA.WILLIAMS@sa...|         7|         t|2006-02-14 00:00:00|2006-02-15 09:57:20|     1|
|          4|       2|   BARBARA|    JONES|BARBARA.JONES@sak...|         8|         t|2006-02-14 00:00:00|2006-02-15 09:57:20|     1|
|          5|       1| ELIZABETH|    BROW

### 4. Grouping records with aggregation functions

In [13]:
## TODO: Grouping records by using groupBy()
temp_df = customer_df.groupBy("store_id")\
                    .agg(count("customer_id").alias("quantity"), max("last_update").alias("recent_activity_customer_date"))
temp_df.show()

+--------+--------+-----------------------------+
|store_id|quantity|recent_activity_customer_date|
+--------+--------+-----------------------------+
|       2|     273|          2006-02-15 09:57:20|
|       1|     326|          2006-02-15 09:57:20|
+--------+--------+-----------------------------+



### 5. Joining the dataframes

In [58]:
## TODO: Joining dataframe by join()
rental_df = spark.read.csv("s3a://lakehouse/test_csv/rental.csv", header=True, inferSchema=True)
print("Number of rows: ", rental_df.count())
rental_df.show(5)

Number of rows:  36938
+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
|rental_id|        rental_date|inventory_id|customer_id|        return_date|staff_id|        last_update|
+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
|   281475|2010-01-01 04:21:21|        1372|        238|2010-01-03 00:00:00|      68|2010-01-01 02:18:47|
|   281476|2010-01-01 07:47:23|        3430|        167|2010-01-03 04:21:21|     122|2010-01-01 04:05:03|
|   281477|2010-01-01 09:15:59|        1161|        446|2010-01-05 07:47:23|       1|2010-01-01 05:08:26|
|   281478|2010-01-01 15:20:16|         743|        565|2010-01-06 09:15:59|     120|2010-01-01 06:03:18|
|   281479|2010-01-01 20:41:05|        1919|        573|2010-01-06 15:20:16|      21|2010-01-01 06:20:04|
+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
only showing top 5 rows

In [15]:
joined_df = customer_df.join(other=rental_df, on="customer_id", how="inner")\
                    .select("customer_id", "store_id", "email", "rental_id", "rental_date", "staff_id")
joined_df.show(5)

+-----------+--------+--------------------+---------+-------------------+--------+
|customer_id|store_id|               email|rental_id|        rental_date|staff_id|
+-----------+--------+--------------------+---------+-------------------+--------+
|        238|       1|NELLIE.GARRETT@sa...|   281475|2010-01-01 04:21:21|      68|
|        167|       2|SALLY.PIERCE@saki...|   281476|2010-01-01 07:47:23|     122|
|        446|       2|THEODORE.CULP@sak...|   281477|2010-01-01 09:15:59|       1|
|        565|       2|JAIME.NETTLES@sak...|   281478|2010-01-01 15:20:16|     120|
|        573|       1|BYRON.BOX@sakilac...|   281479|2010-01-01 20:41:05|      21|
+-----------+--------+--------------------+---------+-------------------+--------+
only showing top 5 rows



### 6. Sorting records by columns

In [16]:
sorted_df = rental_df.orderBy(col("rental_date").desc(), col("customer_id").asc())
sorted_df.show(5)

+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
|rental_id|        rental_date|inventory_id|customer_id|        return_date|staff_id|        last_update|
+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
|   318412|2020-12-30 00:23:16|        2200|         44|2020-12-30 23:48:02|      76|2015-03-31 13:16:31|
|   318411|2020-12-29 23:48:02|        2217|         65|2021-01-03 22:59:50|      11|2015-03-31 11:32:29|
|   318410|2020-12-29 22:59:50|        1440|        235|2021-01-02 22:33:28|       7|2015-03-31 10:00:55|
|   318409|2020-12-29 22:33:28|        2809|        346|2020-12-31 22:14:03|      64|2015-03-31 09:13:12|
|   318408|2020-12-29 22:14:03|        3089|        554|2020-12-31 21:10:26|      78|2015-03-31 08:50:00|
+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
only showing top 5 rows



### 7. Window Function

In [78]:
payment_df = spark.read.csv("s3a://lakehouse/test_csv/payment.csv", header=True, inferSchema=True)
payment_df.show(5)

+----------+-----------+--------+---------+------+-------------------+-------------------+
|payment_id|customer_id|staff_id|rental_id|amount|       payment_date|        last_update|
+----------+-----------+--------+---------+------+-------------------+-------------------+
|     40237|        238|      71|   281475|  39.3|2010-01-03 00:00:00|2010-01-01 00:10:25|
|     40238|        167|      80|   281476|  83.7|2010-01-03 04:21:21|2010-01-01 00:21:13|
|     40239|        446|     121|   281477|  38.5|2010-01-05 07:47:23|2010-01-01 00:33:58|
|     40240|        565|      19|   281478|  14.8|2010-01-06 09:15:59|2010-01-01 00:48:18|
|     40241|        573|      64|   281479|  67.0|2010-01-06 15:20:16|2010-01-01 01:03:01|
+----------+-----------+--------+---------+------+-------------------+-------------------+
only showing top 5 rows



- **Using Window Function for finding contribution precentage in a mounth**

In [93]:
## TODO: definite a Window
window_spec = Window.partitionBy(col("customer_id"), date_format(col("payment_date"), "yyyy-MM"))

## TODO: compute aggregation over the window
percent_df = payment_df.withColumn(
    "percent_in_mounth", round((col("amount")*100/sum("amount").over(window_spec)), 2)
).orderBy(
    col("customer_id").asc()
)

percent_df.show(5)

+----------+-----------+--------+---------+------+-------------------+-------------------+-----------------+
|payment_id|customer_id|staff_id|rental_id|amount|       payment_date|        last_update|percent_in_mounth|
+----------+-----------+--------+---------+------+-------------------+-------------------+-----------------+
|     40396|          1|     113|   281651|  53.4|2010-01-22 08:02:16|2010-01-02 10:55:56|            100.0|
|     40707|          1|       3|   281990|  60.3|2010-03-02 14:44:06|2010-01-05 08:34:39|            53.79|
|     40806|          1|       2|   282094|   7.6|2010-03-10 16:55:31|2010-01-06 06:56:51|             6.78|
|     40843|          1|      12|   282140|  44.2|2010-03-22 08:20:36|2010-01-06 14:47:55|            39.43|
|     41432|          1|      78|   282774|  49.3|2010-07-30 06:14:08|2010-01-12 01:37:03|            100.0|
+----------+-----------+--------+---------+------+-------------------+-------------------+-----------------+
only showing top 5 

- **Using Window Function for moving average in 7 days**

In [92]:
## TODO: definite a window
window_spec = Window.orderBy("payment_date").rowsBetween(-7, 0)

## TODO: compute aggregation over the window
avgmove_df = payment_df.withColumn(
    "avg_m3", avg("amount").over(window_spec)
)
avgmove_df.show(5)

+----------+-----------+--------+---------+------+-------------------+-------------------+------+
|payment_id|customer_id|staff_id|rental_id|amount|       payment_date|        last_update|avg_m3|
+----------+-----------+--------+---------+------+-------------------+-------------------+------+
|     40237|        238|      71|   281475|  39.3|2010-01-03 00:00:00|2010-01-01 00:10:25|  39.3|
|     40238|        167|      80|   281476|  83.7|2010-01-03 04:21:21|2010-01-01 00:21:13|  61.5|
|     40253|         18|      12|   281492|  93.0|2010-01-03 19:38:45|2010-01-01 03:48:42|  72.0|
|     40255|        312|      72|   281494|  63.9|2010-01-04 00:13:16|2010-01-01 04:15:37|69.975|
|     40258|         16|      16|   281497|  61.8|2010-01-04 04:14:58|2010-01-01 04:53:17| 68.34|
+----------+-----------+--------+---------+------+-------------------+-------------------+------+
only showing top 5 rows



## Spark SQL Operations

### 1. Register dataframe as Temporary table

In [60]:
customer_df.createOrReplaceTempView("vw_customer")
rental_df.createOrReplaceTempView("vw_rental")
payment_df.createOrReplaceTempView("vw_payment")

In [61]:
spark.sql("SELECT current_database()").show()
spark.sql("SHOW TABLES").show()

+------------------+
|current_database()|
+------------------+
|           default|
+------------------+

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|         |vw_customer|      false|
|         | vw_payment|      false|
|         |  vw_rental|      false|
+---------+-----------+-----------+



In [62]:
print("Number of rows in vw_customer", spark.sql("SELECT * FROM vw_customer").count())
print("Number of rows in vw_payment", spark.sql("SELECT * FROM vw_payment").count())
print("Number of rows in vw_rental", spark.sql("SELECT * FROM vw_rental").count())

Number of rows in vw_customer 599
Number of rows in vw_payment 33286
Number of rows in vw_rental 36938


### 2. Manipulate data with SQL script

- **Joining views and creating a new view**

In [63]:
sqlStr = """ 
    SELECT
        c.customer_id, c.active, CONCAT(c.first_name,' ', c.last_name) AS full_name, r.rental_id, r.rental_date, p.amount, p.payment_date
    FROM vw_customer c
    INNER JOIN vw_rental r ON r.customer_id=c.customer_id
    INNER JOIN vw_payment p ON p.rental_id=r.rental_id
    ORDER BY c.customer_id, rental_date
"""
activities_df = spark.sql(sqlStr)
activities_df.createOrReplaceTempView("vw_activity")

In [64]:
spark.sql("SELECT * FROM vw_activity LIMIT 5").show()

+-----------+------+----------+---------+-------------------+------+-------------------+
|customer_id|active| full_name|rental_id|        rental_date|amount|       payment_date|
+-----------+------+----------+---------+-------------------+------+-------------------+
|          1|     1|MARY SMITH|   281651|2010-01-21 09:37:31|  53.4|2010-01-22 08:02:16|
|          1|     1|MARY SMITH|   281990|2010-02-25 20:16:23|  60.3|2010-03-02 14:44:06|
|          1|     1|MARY SMITH|   282094|2010-03-07 19:34:22|   7.6|2010-03-10 16:55:31|
|          1|     1|MARY SMITH|   282140|2010-03-19 14:24:00|  44.2|2010-03-22 08:20:36|
|          1|     1|MARY SMITH|   282774|2010-07-25 09:54:25|  49.3|2010-07-30 06:14:08|
+-----------+------+----------+---------+-------------------+------+-------------------+



- **Window Function in Spark SQL**

In [65]:
sqlStr = """
    SELECT DISTINCT
        customer_id, full_name,
        DATEDIFF(latest_activity_date, earliest_activity_date) AS recency,
        AVG(amount) OVER (PARTITION BY customer_id) AS avg_amount
    FROM (
        SELECT
            *,
            MAX(rental_date) OVER (PARTITION BY customer_id) AS latest_activity_date,
            MIN(rental_date) OVER (PARTITION BY customer_id) AS earliest_activity_date
        FROM vw_activity ac
    )
"""
recency_df = spark.sql(sqlStr)

In [66]:
recency_df.show(5)

+-----------+----------------+-------+------------------+
|customer_id|       full_name|recency|        avg_amount|
+-----------+----------------+-------+------------------+
|          1|      MARY SMITH|   3992| 49.03203883495146|
|          2|PATRICIA JOHNSON|   3963| 46.43461538461538|
|          3|  LINDA WILLIAMS|   3406| 45.36190476190477|
|          4|   BARBARA JONES|   3922| 50.99411764705882|
|          5| ELIZABETH BROWN|   3889|51.247272727272716|
+-----------+----------------+-------+------------------+
only showing top 5 rows



## Save data into MinIO

In [73]:
spark.sql("USE test_db")
spark.sql("SHOW TABLES").show()

# spark.sql("DROP TABLE recency_02")

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  test_db|    recency|      false|
|         |vw_activity|      false|
|         |vw_customer|      false|
|         | vw_payment|      false|
|         |  vw_rental|      false|
+---------+-----------+-----------+



- **Save dataframe into Parquet**

In [28]:
recency_df.write.format("parquet").save("s3a://lakehouse/test_parquet/recency")

- **Save dataframe into Delta Lake**

In [68]:
recency_df.write.format("delta").mode("overwrite").saveAsTable("test_db.recency")

- **Check the metadata of table in Hive Metastore**

In [69]:
spark.sql("DESCRIBE EXTENDED recency").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|         customer_id|                 int|       |
|           full_name|              string|       |
|             recency|                 int|       |
|          avg_amount|              double|       |
|                    |                    |       |
|      # Partitioning|                    |       |
|     Not partitioned|                    |       |
|                    |                    |       |
|# Detailed Table ...|                    |       |
|                Name|     test_db.recency|       |
|            Location|s3a://lakehouse/t...|       |
|            Provider|               delta|       |
|               Owner|              jovyan|       |
|    Table Properties|[delta.minReaderV...|       |
+--------------------+--------------------+-------+



- **Read dataframe from Delta table in MinIO**

In [71]:
delta_df = spark.read.format("delta").load("s3a://lakehouse/test_db.db/recency")
delta_df.show(5)

+-----------+----------------+-------+------------------+
|customer_id|       full_name|recency|        avg_amount|
+-----------+----------------+-------+------------------+
|          1|      MARY SMITH|   3992| 49.03203883495146|
|          2|PATRICIA JOHNSON|   3963| 46.43461538461538|
|          3|  LINDA WILLIAMS|   3406| 45.36190476190477|
|          4|   BARBARA JONES|   3922| 50.99411764705882|
|          5| ELIZABETH BROWN|   3889|51.247272727272716|
+-----------+----------------+-------+------------------+
only showing top 5 rows



## Kill Spark Session

In [74]:
spark.stop()