- Importing 

### Mounting the Azure Storage container

In [0]:
#  dbutils.fs.mount(
#    source = "wasbs://files@gledisdatabricksstorage1.blob.core.windows.net/",
#    mount_point = "/mnt/files",
#    extra_configs  = {"fs.azure.account.key.gledisdatabricksstorage1.blob.core.windows.net" : "<storage-account-key>"}
#  )

### Loading the csv datasets

In [0]:
# Initializing all the necessary variables for the dataframes

customers = spark.read.csv("/mnt/files/olist_customers_dataset.csv", header=True, inferSchema=True)
order_items = spark.read.csv("/mnt/files/olist_order_items_dataset.csv", header=True, inferSchema=True)
order_payments = spark.read.csv("/mnt/files/olist_order_payments_dataset.csv", header=True, inferSchema=True)
order_reviews = spark.read.csv("/mnt/files/olist_order_reviews_dataset.csv", header=True, inferSchema=True)
orders = spark.read.csv("/mnt/files/olist_orders_dataset.csv", header=True, inferSchema=True)
products = spark.read.csv("/mnt/files/olist_products_dataset.csv", header=True, inferSchema=True)
sellers = spark.read.csv("/mnt/files/olist_sellers_dataset.csv", header=True, inferSchema=True)

### 4.1: Data cleaning and transformation.
- Take each dataset and drop its duplicates for a column(usually primary key/id) and duplicate rows entirely.
- Drop rows with missing data in necessary columns that will be used later in creating the fact/detail table.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

def drop_duplicates_by_column(df, column_name):
    """
    Drops all rows that have duplicate values in the specified column and duplicate rows entirely.
    
    Args:
        df (pandas.DataFrame): The DataFrame from which duplicates will be removed.
        column_name (str): The column name to check for duplicates.
        
    Returns:
        pyspark.sql.DataFrame: The cleaned DataFrame with duplicates removed.
    """
    before = df.count()  # Rows before dropping duplicates

    if column_name:
        df = df.dropDuplicates(subset=[column_name])  # Keep first occurrence without ordering

    df = df.dropDuplicates()  # Remove fully duplicate rows

    after = df.count()  # Rows after dropping duplicates
    print(f"Removed {before - after} duplicates from '{column_name}' column and overall dataset.")
    
    return df

In [0]:
print('Customers before: ' + str(customers.count))
customers = drop_duplicates_by_column(customers, 'customer_unique_id')
customers = customers.dropna(subset=['customer_unique_id', 'customer_city', 'customer_state', 'customer_zip_code_prefix'])
print('Customers after: ' + str(customers.count))

Customers before: <bound method DataFrame.count of DataFrame[customer_id: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string]>
Removed 3345 duplicates from 'customer_unique_id' column and overall dataset.
Customers after: <bound method DataFrame.count of DataFrame[customer_id: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string]>


In [0]:
print('Products before: ' + str(products.count))

products = drop_duplicates_by_column(products, 'product_id')

# Remove products without a category or name.
''' 
There are 2 approaches that can be taken.
1. Deleting the products missing a category / name. These could be taken as invalid entries.

products = products.dropna(
    subset=['product_category_name', 
            'product_name_lenght', 
           ])

2. Filling in the missing values with a default value, so even category "No Category " would show up.

'''


products = products.fillna({'product_category_name': 'No Category',
                                      'product_name_lenght': '0'})

products.show(5)

Products before: <bound method DataFrame.count of DataFrame[product_id: string, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int]>
Removed 0 duplicates from 'product_id' column and overall dataset.
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|00066f42aeeb9f300...|           perfumaria|                 53|                       596|         

In [0]:


''' 
Since the city/address will be used in a SQL Query later on, we should remove all sellers without a city, or put it as N/A
sellers = sellers.dropna(subset=['seller_city', 'seller_zip_code_prefix'])
'''
sellers = drop_duplicates_by_column(sellers, 'seller_id')
sellers = sellers.fillna({'seller_city': 'N/A', 'seller_zip_code_prefix': '0000'})

sellers.show(5)

Removed 0 duplicates from 'seller_id' column and overall dataset.
+--------------------+----------------------+-----------+------------+
|           seller_id|seller_zip_code_prefix|seller_city|seller_state|
+--------------------+----------------------+-----------+------------+
|0015a82c2db000af6...|                  9080|santo andre|          SP|
|001cca7ae9ae17fb1...|                 29156|  cariacica|          ES|
|001e6ad469a905060...|                 24754|sao goncalo|          RJ|
|002100f778ceb8431...|                 14405|     franca|          SP|
|003554e2dce176b55...|                 74565|    goiania|          GO|
+--------------------+----------------------+-----------+------------+
only showing top 5 rows


In [0]:
# There can be many payments for the same order so only general duplicates should be removed
order_payments = drop_duplicates_by_column(order_payments, '')

Removed 0 duplicates from '' column and overall dataset.


In [0]:
# There can be many items for the same order so only general duplicates should be removed
order_items = drop_duplicates_by_column(order_items, '')

Removed 0 duplicates from '' column and overall dataset.


In [0]:
order_reviews = drop_duplicates_by_column(order_reviews, 'review_id')
order_reviews = order_reviews.dropna(subset=['review_score'])
order_reviews = order_reviews.fillna({'review_comment_title': 'No Title Available',
                                      'review_comment_message': 'No Comment Available'})

Removed 1204 duplicates from 'review_id' column and overall dataset.


In [0]:
print('Orders before: ' + str(orders.count))

orders = drop_duplicates_by_column(orders, 'order_id')

# Since we need to calculate the Delivery Time, we need to remove rows missing the following columns: Order Delivered Customer Date, Order Delivered Carrier Date, Order Approved At

orders = orders.dropna(subset=['order_purchase_timestamp', 'order_approved_at', 'order_delivered_carrier_date', 'order_delivered_customer_date'])
print('Orders after: ' + str(orders.count))

Orders before: <bound method DataFrame.count of DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp]>
Removed 0 duplicates from 'order_id' column and overall dataset.
Orders after: <bound method DataFrame.count of DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp]>


### 4.2 Creating Calculated Columns

- Total Price: Sum of Product Price and Freight Value

In [0]:
order_items = order_items.withColumn('total_price', order_items['price'] + order_items['freight_value'])
order_items.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|       total_price|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+------------------+
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|             25.78|
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|             72.19|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|218.04000000000002|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|            216.87|
|00048cc3ae777c65d...|            

- Delivery Time: Difference between the delivery date and the order purchase date

In [0]:
from pyspark.sql import functions as F
# Calculate the Delivery Time as the difference between the delivered date and the purchase date in days. Add in order_items

orders_with_delivery_time = orders.withColumn('order_purchase_timestamp', F.to_timestamp('order_purchase_timestamp'))
orders_with_delivery_time = orders_with_delivery_time.withColumn('order_delivered_customer_date', F.to_timestamp('order_delivered_customer_date'))

orders_with_delivery_time = orders_with_delivery_time.withColumn('delivery_time', 
    (F.col('order_delivered_customer_date').cast('long') - F.col('order_purchase_timestamp').cast('long')) / 86400)  # Number of seconds in a day

orders_with_delivery_time.show(5)

order_items = order_items.join(
    orders_with_delivery_time[['order_id', 'delivery_time']], 
    on='order_id', 
    how='left'
)


+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|     delivery_time|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+------------------+
|00018f77f2f0320c5...|f6dd3ec061db4e398...|   delivered|     2017-04-26 10:53:06|2017-04-26 11:05:13|         2017-05-04 14:35:00|          2017-05-12 16:04:24|          2017-05-15 00:00:00|16.216180555555557|
|00042b26cf59d7ce6...|58dbd0b2d70206bf4...|   delivered|     2017-02-04 13:57:51|2017-02-04 14:10:13|         2017-02-16 09:46:09|          2017-03-01 16:42:31|

- Payment Count: Sum of payment installments for each order

In [0]:
# Group by 'order_id' and sum the 'payment_installments' for each order
payment_count = order_payments.groupBy('order_id').agg(
    F.sum('payment_installments').alias('total_payment_installments')
)
payment_count.show(5)

# Join the payment_count DataFrame with order_items on order_id
order_items = order_items.join(payment_count, on='order_id', how='left')
order_items.show(5)


+--------------------+--------------------------+
|            order_id|total_payment_installments|
+--------------------+--------------------------+
|629eb58d177eb9d9e...|                         1|
|e2b9380fcb4f1f7e2...|                         1|
|a3797015424a5a231...|                         1|
|e239d280236cdd3c4...|                         3|
|f44cb69655f8e4d13...|                         6|
+--------------------+--------------------------+
only showing top 5 rows
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+------------------+-----------------+--------------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|       total_price|    delivery_time|total_payment_installments|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+------------------+-----------------+-----

- Profit Margin: Product Price - Freight Value 

In [0]:
# Create the 'profit_margin' column by subtracting 'freight_value' from 'price'
order_items = order_items.withColumn('profit_margin', F.col('price') - F.col('freight_value'))

# Show the results (top rows)
order_items.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+------------------+-----------------+--------------------------+-------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|       total_price|    delivery_time|total_payment_installments|      profit_margin|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+------------------+-----------------+--------------------------+-------------------+
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|             25.78|6.147268518518518|                         2|0.20000000000000107|
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|             72.19|7.614421296296296|                         2|      

### 4.3 Using Window Functions Over Partitions (In pyspark)

- Total sales per customer, running total of product prices for each customers orders partitioned by customer id

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Aggregate order_items at the order_id level. Get order_id/price
order_totals = order_items.groupBy('order_id').agg(F.sum('price').alias('total_price'))

# Merge with orders to get customer_id from orders in a dataframe with total_price in it
merged = orders.join(order_totals, on='order_id', how='inner')

# Define the window specification that partitions by customer_id and orders by order_id
windowSpec = Window.partitionBy('customer_id').orderBy('order_id')

# Merge with customer_id window spec to get the running total sales over the specification
merged = merged.withColumn('running_total_sales', F.sum('total_price').over(windowSpec))

# Select only the customer_id, order_id and running total sales
result = merged.select('customer_id', 'order_id', 'running_total_sales')
result.show(5)

# Filter the result for the specific order_id to show correct calculations
filtered_result = result.filter(result['order_id'] == '11c177c8e97725db2631073c19f07b62')
filtered_result.show()

# order_id 11c177c8e97725db2631073c19f07b62 has 2 orders with each price being 179.99, cumulative sum is shown 359.98

+--------------------+--------------------+-------------------+
|         customer_id|            order_id|running_total_sales|
+--------------------+--------------------+-------------------+
|00050bf6e01e69d5c...|fa906f338cee30a98...|              69.99|
|000598caf2ef41174...|9b961b894e797f636...|             1107.0|
|000bf8121c3412d30...|bc3e295306ee4d3eb...|               30.0|
|00114026c1b7b52ab...|17a9050c446ea78f7...|               49.9|
|0013cd8e350a7cc76...|4ed7a5d31f58c9c3b...|               79.9|
+--------------------+--------------------+-------------------+
only showing top 5 rows
+--------------------+--------------------+-------------------+
|         customer_id|            order_id|running_total_sales|
+--------------------+--------------------+-------------------+
|b331b74b18dc79bcd...|11c177c8e97725db2...|             359.98|
+--------------------+--------------------+-------------------+



- Average Delivery Time from the moment a customer places an order to when it is delivered to their house


In [0]:
from pyspark.sql import functions as F

# Convert timestamps to datetime format and calculate Delivery Time (difference in days)
orders = orders.withColumn('order_purchase_timestamp', F.to_timestamp('order_purchase_timestamp'))
orders = orders.withColumn('order_delivered_customer_date', F.to_timestamp('order_delivered_customer_date'))
orders = orders.withColumn('delivery_time_days', F.datediff('order_delivered_customer_date', 'order_purchase_timestamp'))

# Merge order_items, orders, and products dataframes. Order Items is merged with orders to get the delivery time for each order in the order_items dataset. Products is merged with order_items to get the product category name for each order in the order_items dataset.
merged = order_items.join(orders.select('order_id', 'delivery_time_days'), on='order_id', how='inner')
merged = merged.join(products.select('product_id', 'product_category_name'), 
                     on='product_id', how='inner')

average_delivery_time = merged.groupBy('product_category_name').agg(F.avg('delivery_time_days').alias('average_delivery_time'))


average_delivery_time.show(5)

+---------------------+---------------------+
|product_category_name|average_delivery_time|
+---------------------+---------------------+
|                  pcs|    13.43718592964824|
|                bebes|    12.45137491616365|
|          No Category|   12.737630208333334|
|            cine_foto|                 10.5|
|                artes|   11.304568527918782|
+---------------------+---------------------+
only showing top 5 rows


### 4.4: Saving Processed Data

- Creating the fact table ( order_items + calculated columns from 4.2)

In [0]:
# Reset the dataframes from previous modifications

customers = spark.read.csv("/mnt/files/olist_customers_dataset.csv", header=True, inferSchema=True)
order_items = spark.read.csv("/mnt/files/olist_order_items_dataset.csv", header=True, inferSchema=True)
order_payments = spark.read.csv("/mnt/files/olist_order_payments_dataset.csv", header=True, inferSchema=True)
order_reviews = spark.read.csv("/mnt/files/olist_order_reviews_dataset.csv", header=True, inferSchema=True)
orders = spark.read.csv("/mnt/files/olist_orders_dataset.csv", header=True, inferSchema=True)
products = spark.read.csv("/mnt/files/olist_products_dataset.csv", header=True, inferSchema=True)
sellers = spark.read.csv("/mnt/files/olist_sellers_dataset.csv", header=True, inferSchema=True)

In [0]:
from pyspark.sql import functions as F

# 1. Total Price: price + freight_value
order_items = order_items.withColumn('total_price', order_items['price'] + order_items['freight_value'])

# 2. Calculate the Delivery Time as the difference between the delivered date and the purchase date in days. Add it in order_items dataframe

orders_with_delivery_time = orders.withColumn('order_purchase_timestamp', F.to_timestamp('order_purchase_timestamp'))
orders_with_delivery_time = orders_with_delivery_time.withColumn('order_delivered_customer_date', F.to_timestamp('order_delivered_customer_date'))

orders_with_delivery_time = orders_with_delivery_time.withColumn('delivery_time', 
    (F.col('order_delivered_customer_date').cast('long') - F.col('order_purchase_timestamp').cast('long')) / 86400)  # Number of seconds in a day

order_items = order_items.join(
    orders_with_delivery_time[['order_id', 'delivery_time']], 
    on='order_id', 
    how='left'
)

# 3. Payment Count (Total Payment Installments for each order)
# Group by 'order_id' and sum the 'payment_installments' for each order
payment_count = order_payments.groupBy('order_id').agg(
    F.sum('payment_installments').alias('total_payment_installments')
)
order_items = order_items.join(payment_count, on='order_id', how='left')


# 4. Profit Margin: price - freight_value
order_items = order_items.withColumn('profit_margin', F.col('price') - F.col('freight_value'))

# Populate the fact_table dataframe with all the order_items and customer_id from the orders dataframe. This is needed to query the number of orders from each state from the customer dimension, in order to keep the link: customers <--> orders <--> order_items

fact_table = order_items \
    .join(orders.select('order_id', 'customer_id'), on='order_id', how='left')

# Display the final fact table
fact_table.show(15)


+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+------------------+------------------+--------------------------+-------------------+--------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|       total_price|     delivery_time|total_payment_installments|      profit_margin|         customer_id|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+------------------+------------------+--------------------------+-------------------+--------------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|             72.19| 7.614421296296296|                         2|              45.61|3ce436f183e68e078...|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:1

- Creating the dimension tables needed. Only date dimension needs to be created. Customers, Products and Sellers are already in a dataframe

In [0]:
date_dim = orders.select(
    'order_id',
    'order_purchase_timestamp',
    'order_delivered_customer_date'
)

date_dim.show(5)

+--------------------+------------------------+-----------------------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|
+--------------------+------------------------+-----------------------------+
|e481f51cbdc54678b...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|
|53cdb2fc8bc7dce0b...|     2018-07-24 20:41:37|          2018-08-07 15:27:45|
|47770eb9100c2d0c4...|     2018-08-08 08:38:49|          2018-08-17 18:06:29|
|949d5b44dbf5de918...|     2017-11-18 19:28:06|          2017-12-02 00:28:42|
|ad21c59c0840e6cb8...|     2018-02-13 21:18:39|          2018-02-16 18:17:02|
+--------------------+------------------------+-----------------------------+
only showing top 5 rows


- Write all the dataframes in delta tables in the /mnt/files/deltas directory

In [0]:
customers.write.format("delta").mode("overwrite").save("/mnt/files/deltas/customers")
products.write.format("delta").mode("overwrite").save("/mnt/files/deltas/products")
sellers.write.format("delta").mode("overwrite").save("/mnt/files/deltas/sellers")
fact_table.write.format("delta").mode("overwrite").save("/mnt/files/deltas/orders")
date_dim.write.format("delta").mode("overwrite").save("/mnt/files/deltas/date_dim")

print("DataFrames saved to delta tables succesfully.")

DataFrames saved to delta tables succesfully.


- Creating the sql tables in order to write the queries needed

In [0]:
customers.createOrReplaceTempView("customers")
products.createOrReplaceTempView("products")
sellers.createOrReplaceTempView("sellers")
fact_table.createOrReplaceTempView("orders")
date_dim.createOrReplaceTempView("date")


+---------------------+------------------+
|product_category_name|       total_sales|
+---------------------+------------------+
|         beleza_saude|1258681.3400000052|
|   relogios_presentes|1205005.6800000032|
|      cama_mesa_banho| 1036988.680000001|
|        esporte_lazer| 988048.9699999947|
| informatica_acess...| 911954.3199999924|
|     moveis_decoracao|  729762.489999984|
|           cool_stuff|  635290.849999994|
| utilidades_domest...|   632248.65999999|
|           automotivo| 592720.1099999945|
|   ferramentas_jardim|485256.45999999484|
|           brinquedos|483946.59999999625|
|                bebes|411764.88999999815|
|           perfumaria|  399124.869999999|
|            telefonia|323667.53000000207|
|    moveis_escritorio| 273960.7000000008|
|            papelaria| 230943.2300000003|
|                  pcs|222963.12999999998|
|             pet_shop|214315.41000000027|
| instrumentos_musi...| 191498.8800000003|
|      eletroportateis|190648.58000000007|
+----------

- Query total sales per product category from the fact table.

In [0]:
spark.sql("SELECT p.product_category_name, SUM(o.price) AS total_sales FROM orders o JOIN products p ON o.product_id = p.product_id GROUP BY p.product_category_name ORDER BY total_sales DESC").show()

- Query the average delivery time per seller from the fact table.

In [0]:
spark.sql("SELECT seller_id, IFNULL(AVG(delivery_time), 0) AS avg_delivery_time FROM orders GROUP BY seller_id").show()

+--------------------+------------------+
|           seller_id| avg_delivery_time|
+--------------------+------------------+
|ff063b022a9a0aab9...| 9.401329571759259|
|8e6cc767478edae94...|16.496237675754458|
|a49928bcdf77c55c6...|16.937956508190883|
|da7039f29f90ce5b4...| 8.862591306584363|
|062ce95fa2ad4dfae...|13.336690207156307|
|2009a095de2a2a416...| 9.829877507716049|
|0ea22c1cfbdc755f8...|11.021397274633127|
|6eeed17989b0ae47c...| 6.500030864197531|
|e63e8bfa530fb1691...| 9.344903273809525|
|4d600e08ecbe08258...|12.936089616402116|
|9803a40e82e45418a...|15.709532536008233|
|b3f19518fcec265b2...| 10.66591724537037|
|ec8879960bd2221d5...| 16.49511937830688|
|0b64bcdb0784abc13...| 8.495092592592593|
|c522be04e020c1e7b...|15.649359567901234|
|9c068d10aca38e85c...| 16.52933142701525|
|297d5eccd19fa9a83...| 6.882055844907407|
|9b1050e85becf3ae9...|               0.0|
|e38db885400cd35c7...|12.689117476851852|
|13fa2a6c6b9d0f43c...| 20.52068634259259|
+--------------------+------------

- Query the number of orders from each state from the customer dimension.

In [0]:
spark.sql("SELECT c.customer_state, COUNT(o.order_id) AS num_orders FROM orders o JOIN customers c ON o.customer_id = c.customer_id GROUP BY c.customer_state ORDER BY customer_state ASC").show()

+--------------+----------+
|customer_state|num_orders|
+--------------+----------+
|            AC|        92|
|            AL|       444|
|            AM|       165|
|            AP|        82|
|            BA|      3799|
|            CE|      1478|
|            DF|      2406|
|            ES|      2256|
|            GO|      2333|
|            MA|       824|
|            MG|     13129|
|            MS|       819|
|            MT|      1055|
|            PA|      1080|
|            PB|       602|
|            PE|      1806|
|            PI|       542|
|            PR|      5740|
|            RJ|     14579|
|            RN|       529|
+--------------+----------+
only showing top 20 rows
