#Spark SQL Examples

In [None]:
%sql
SELECT current_date

current_date()
2024-08-13


In [None]:
%fs ls dbfs:/public/retail_db



path,name,size,modificationTime
dbfs:/public/retail_db/README.md,README.md,806,1723569811738
dbfs:/public/retail_db/categories/,categories/,0,1723569814547
dbfs:/public/retail_db/create_db.sql,create_db.sql,10303297,1723569813282
dbfs:/public/retail_db/create_db_tables_pg.sql,create_db_tables_pg.sql,1748,1723569810915
dbfs:/public/retail_db/customers/,customers/,0,1723569808445
dbfs:/public/retail_db/departments/,departments/,0,1723569811129
dbfs:/public/retail_db/load_db_tables_pg.sql,load_db_tables_pg.sql,10297372,1723569815873
dbfs:/public/retail_db/order_items/,order_items/,0,1723569811943
dbfs:/public/retail_db/orders/,orders/,0,1723569813819
dbfs:/public/retail_db/products/,products/,0,1723569810303


In [None]:
%sql
-- dbfs:/ prefix is optional here.
-- SELECT * FROM TEXT.`/public/retail_db/orders`

CREATE OR REPLACE TEMPORARY VIEW orders (
  order_id INT,
  order_date STRING,
  order_customer_id INT,
  order_status STRING
) USING CSV
OPTIONS (
  path='/public/retail_db/orders',
  sep=','
)

In [None]:
%sql
SELECT * FROM orders

order_id,order_date,order_customer_id,order_status
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT


In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW order_items (
  order_item_id INT,
  order_item_order_id INT,
  order_item_product_id INT,
  order_item_quantity INT,
  order_item_subtotal FLOAT,
  order_item_products_price FLOAT
) USING CSV
OPTIONS (
  path='/public/retail_db/order_items',
  sep=','
)

In [None]:
%sql
SELECT * FROM order_items

order_item_id,order_item_order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_products_price
1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99
6,4,365,5,299.95,59.99
7,4,502,3,150.0,50.0
8,4,1014,4,199.92,49.98
9,5,957,1,299.98,299.98
10,5,365,5,299.95,59.99


# Compute Daily Product Revenue

In [None]:
%sql
SELECT o.order_date,
      oi.order_item_product_id,
      round(sum(oi.order_item_subtotal), 2) as revenue
FROM orders AS o
  JOIN order_items AS oi
    ON o.order_id = oi.order_item_order_id
  WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY 1, 2
ORDER BY 1, 3 DESC

order_date,order_item_product_id,revenue
2013-07-25 00:00:00.0,1004,5599.72
2013-07-25 00:00:00.0,191,5099.49
2013-07-25 00:00:00.0,957,4499.7
2013-07-25 00:00:00.0,365,3359.44
2013-07-25 00:00:00.0,1073,2999.85
2013-07-25 00:00:00.0,1014,2798.88
2013-07-25 00:00:00.0,403,1949.85
2013-07-25 00:00:00.0,502,1650.0
2013-07-25 00:00:00.0,627,1079.73
2013-07-25 00:00:00.0,226,599.99


## Write Data to DBFS

In [None]:
%sql
INSERT OVERWRITE DIRECTORY 'dbfs:/public/retail_db/daily_product_revenue'
USING PARQUET
SELECT o.order_date,
      oi.order_item_product_id,
      round(sum(oi.order_item_subtotal), 2) as revenue
FROM orders AS o
  JOIN order_items AS oi
    ON o.order_id = oi.order_item_order_id
  WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY 1, 2
ORDER BY 1, 3 DESC

In [None]:
%fs ls dbfs:/public/retail_db/daily_product_revenue

path,name,size,modificationTime
dbfs:/public/retail_db/daily_product_revenue/_SUCCESS,_SUCCESS,0,1723571790900
dbfs:/public/retail_db/daily_product_revenue/_committed_5860510935251631594,_committed_5860510935251631594,123,1723571790382
dbfs:/public/retail_db/daily_product_revenue/_started_5860510935251631594,_started_5860510935251631594,0,1723571786835
dbfs:/public/retail_db/daily_product_revenue/part-00000-tid-5860510935251631594-87f5d105-dace-4d4e-8a51-44bc395df48b-38-1-c000.snappy.parquet,part-00000-tid-5860510935251631594-87f5d105-dace-4d4e-8a51-44bc395df48b-38-1-c000.snappy.parquet,28796,1723571790083


In [None]:
%sql
SELECT * FROM PARQUET.`/public/retail_db/daily_product_revenue`
ORDER BY order_date, revenue DESC

order_date,order_item_product_id,revenue
2013-07-25 00:00:00.0,1004,5599.72
2013-07-25 00:00:00.0,191,5099.49
2013-07-25 00:00:00.0,957,4499.7
2013-07-25 00:00:00.0,365,3359.44
2013-07-25 00:00:00.0,1073,2999.85
2013-07-25 00:00:00.0,1014,2798.88
2013-07-25 00:00:00.0,403,1949.85
2013-07-25 00:00:00.0,502,1650.0
2013-07-25 00:00:00.0,627,1079.73
2013-07-25 00:00:00.0,226,599.99


## PySpark Examples

In [None]:
%sql
SELECT * FROM TEXT.`/public/retail_db/schemas.json`

value
{
"""departments"": ["
{
"""column_name"": ""department_id"","
"""data_type"": ""integer"","
"""column_position"": 1"
"},"
{
"""column_name"": ""department_name"","
"""data_type"": ""string"","


### Read in Column Details from JSON schema file

In [None]:
%python
import json

schemas_text = spark.read.text('/public/retail_db/schemas.json', wholetext=True).first().value

column_details = json.loads(schemas_text)['orders']

sorted_column_details = sorted(column_details, key=lambda col: col['column_position'])

sorted_column_names = [col ['column_name'] for col in sorted_column_details]

spark.read.csv('/public/retail_db/orders', inferSchema=True).toDF(*sorted_column_names)


DataFrame[order_id: int, order_date: timestamp, order_customer_id: int, order_status: string]

### Get Order Count by Status

In [None]:
%python
import json
from pyspark.sql.functions import count, col

schemas_text = spark.read.text('/public/retail_db/schemas.json', wholetext=True).first().value

column_details = json.loads(schemas_text)['orders']

sorted_column_details = sorted(column_details, key=lambda col: col['column_position'])

sorted_column_names = [col ['column_name'] for col in sorted_column_details]

orders = spark.read.csv('/public/retail_db/orders', inferSchema=True).toDF(*sorted_column_names)

orders.groupBy('order_status').agg(count('*').alias('order_count')).orderBy(col('order_count').desc()).show()

+---------------+-----------+
|   order_status|order_count|
+---------------+-----------+
|       COMPLETE|      22899|
|PENDING_PAYMENT|      15030|
|     PROCESSING|       8275|
|        PENDING|       7610|
|         CLOSED|       7556|
|        ON_HOLD|       3798|
|SUSPECTED_FRAUD|       1558|
|       CANCELED|       1428|
| PAYMENT_REVIEW|        729|
+---------------+-----------+



### Apply many Schemas to their respective Datasets

In [None]:
%python
import json

base_dir = "/public/retail_db"
base_output_dir = '/public/retail_db_parquet'
dataset_list = [
    'departments',
    'categories',
    'products',
    'customers',
    'orders',
    'order_items'
]

def get_columns(schemas_file, dataset_name):
    schemas_text = spark.read.text(schemas_file, wholetext=True).first().value
    schemas = json.loads(schemas_text)
    column_details = schemas[dataset_name]
    sorted_column_details = sorted(column_details, key=lambda col: col['column_position'])
    sorted_column_names = [col ['column_name'] for col in sorted_column_details]
    return sorted_column_names

for dataset in dataset_list:
    print(f'Processing {dataset} data')
    columns = get_columns(f'{base_dir}/schemas.json', dataset)
    dataframe = spark.read.csv(f'{base_dir}/{dataset}', inferSchema=True).toDF(*columns)
    dataframe.write.mode('overwrite').parquet(f'{base_output_dir}/{dataset}')

# orders.groupBy('order_status').agg(count('*').alias('order_count')).orderBy(col('order_count').desc()).show()

Processing departments data
Processing categories data
Processing products data
Processing customers data
Processing orders data
Processing order_items data


In [None]:
%fs ls dbfs:/public/retail_db_parquet/categories

path,name,size,modificationTime
dbfs:/public/retail_db_parquet/categories/_SUCCESS,_SUCCESS,0,1723573716658
dbfs:/public/retail_db_parquet/categories/_committed_4542002153579187313,_committed_4542002153579187313,124,1723573716180
dbfs:/public/retail_db_parquet/categories/_started_4542002153579187313,_started_4542002153579187313,0,1723573715736
dbfs:/public/retail_db_parquet/categories/part-00000-tid-4542002153579187313-00a18a69-e752-4f3c-81ac-f2676d4504c9-114-1-c000.snappy.parquet,part-00000-tid-4542002153579187313-00a18a69-e752-4f3c-81ac-f2676d4504c9-114-1-c000.snappy.parquet,2049,1723573715966
