# Sales Data Preparation

Remove underlying files stored during the initial development. This will ensure a fresh start.

In [0]:
dbutils.fs.rm("dbfs:/FileStore/salesdata/published", recurse=True)


Out[1]: True

## Initiate Spark Session

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

In [0]:
spark = SparkSession.builder.appName("SalesDataPrep").getOrCreate()

## Import Data Set into DataFrame

In [0]:
schema = StructType([
    StructField("Order ID", StringType(), True), 
    StructField("Product", StringType(), True), 
    StructField("Quantity Ordered", StringType(), True), 
    StructField("Price Each", StringType(), True), 
    StructField("Order Date", StringType(), True), 
    StructField("Purchase Address", StringType(), True)
])

In [0]:
data_fpath = "dbfs:/FileStore/shared_uploads/sales/input"

sales_data_df = (spark.read.format("csv")
                 .option("header", True)
                 .schema(schema)
                 .load(data_fpath))

In [0]:
sales_data_df.show(10)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|         

In [0]:
sales_data_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



## Clean Up the Underlying Directory

To keep the table name but want to clear the old data, delete the contents of the directory at
`dbfs:/user/hive.warehouse/sales_db.db/sales_raw` with Databricks File System commands so the folder is empty before creating the table.

In [0]:
# List the files to check contents
dbutils.fs.ls("dbfs:/user/hive/warehouse/sales_db.db/sales_raw")


Out[8]: [FileInfo(path='dbfs:/user/hive/warehouse/sales_db.db/sales_raw/_delta_log/', name='_delta_log/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/hive/warehouse/sales_db.db/sales_raw/part-00000-59e51fe1-808e-45a9-8f52-2d68e9bff2af-c000.snappy.parquet', name='part-00000-59e51fe1-808e-45a9-8f52-2d68e9bff2af-c000.snappy.parquet', size=1046001, modificationTime=1741818759000),
 FileInfo(path='dbfs:/user/hive/warehouse/sales_db.db/sales_raw/part-00001-d28bb312-0411-4f03-9bce-045660a557cb-c000.snappy.parquet', name='part-00001-d28bb312-0411-4f03-9bce-045660a557cb-c000.snappy.parquet', size=841317, modificationTime=1741818759000),
 FileInfo(path='dbfs:/user/hive/warehouse/sales_db.db/sales_raw/part-00002-1b750e73-fa71-46cd-940b-7fa2457b2da7-c000.snappy.parquet', name='part-00002-1b750e73-fa71-46cd-940b-7fa2457b2da7-c000.snappy.parquet', size=752974, modificationTime=1741818758000),
 FileInfo(path='dbfs:/user/hive/warehouse/sales_db.db/sales_raw/part-00003-dc446da5-a860-4272-a0

In [0]:
# Remove the directory recursively
dbutils.fs.rm("dbfs:/user/hive/warehouse/sales_db.db/sales_raw", recurse=True)


Out[9]: True

## Create Database and Table (`sales_raw`)

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS sales_db")

Out[10]: DataFrame[]

In [0]:
spark.sql("USE sales_db")

Out[11]: DataFrame[]

In [0]:
spark.sql("DROP TABLE IF EXISTS sales_raw")

Out[12]: DataFrame[]

In [0]:
spark.sql("""
          CREATE TABLE IF NOT EXISTS sales_raw (
              OrderId STRING,
              Product STRING,
              QuantityOrdered STRING,
              PriceEach STRING,
              OrderDate STRING,
              PurchaseAddress STRING
          )
          """)

Out[13]: DataFrame[]

## Load DataFrame to Database Table

### Create Temporary Table (`tmp_sales_raw`)

In [0]:
sales_data_df.createOrReplaceTempView("tmp_sales_raw")

In [0]:
spark.sql("SELECT * FROM tmp_sales_raw").show(10, truncate=False)

+--------+--------------------------+----------------+----------+--------------+-----------------------------------------+
|Order ID|Product                   |Quantity Ordered|Price Each|Order Date    |Purchase Address                         |
+--------+--------------------------+----------------+----------+--------------+-----------------------------------------+
|295665  |Macbook Pro Laptop        |1               |1700      |12/30/19 00:01|136 Church St, New York City, NY 10001   |
|295666  |LG Washing Machine        |1               |600.0     |12/29/19 07:03|562 2nd St, New York City, NY 10001      |
|295667  |USB-C Charging Cable      |1               |11.95     |12/12/19 18:21|277 Main St, New York City, NY 10001     |
|295668  |27in FHD Monitor          |1               |149.99    |12/22/19 15:13|410 6th St, San Francisco, CA 94016      |
|295669  |USB-C Charging Cable      |1               |11.95     |12/18/19 12:38|43 Hill St, Atlanta, GA 30301            |
|295670  |AA Bat

In [0]:
spark.sql("DESCRIBE tmp_sales_raw").show()

+----------------+---------+-------+
|        col_name|data_type|comment|
+----------------+---------+-------+
|        Order ID|   string|   null|
|         Product|   string|   null|
|Quantity Ordered|   string|   null|
|      Price Each|   string|   null|
|      Order Date|   string|   null|
|Purchase Address|   string|   null|
+----------------+---------+-------+



### Insert `tmp_sales_raw` Data Into `sales_raw`

In [0]:
spark.sql("""
            INSERT OVERWRITE sales_db.sales_raw
            SELECT `Order ID`,
                   `Product`,
                   `Quantity Ordered`,
                   `Price Each`,
                   `Order Date`,
                   `Purchase Address`
            FROM tmp_sales_raw
          """)

Out[17]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("SELECT * FROM sales_raw").show(10)

+-------+--------------------+---------------+---------+--------------+--------------------+
|OrderId|             Product|QuantityOrdered|PriceEach|     OrderDate|     PurchaseAddress|
+-------+--------------------+---------------+---------+--------------+--------------------+
| 295665|  Macbook Pro Laptop|              1|     1700|12/30/19 00:01|136 Church St, Ne...|
| 295666|  LG Washing Machine|              1|    600.0|12/29/19 07:03|562 2nd St, New Y...|
| 295667|USB-C Charging Cable|              1|    11.95|12/12/19 18:21|277 Main St, New ...|
| 295668|    27in FHD Monitor|              1|   149.99|12/22/19 15:13|410 6th St, San F...|
| 295669|USB-C Charging Cable|              1|    11.95|12/18/19 12:38|43 Hill St, Atlan...|
| 295670|AA Batteries (4-p...|              1|     3.84|12/31/19 22:58|200 Jefferson St,...|
| 295671|USB-C Charging Cable|              1|    11.95|12/16/19 15:10|928 12th St, Port...|
| 295672|USB-C Charging Cable|              2|    11.95|12/13/19 09:29

In [0]:
spark.sql("SELECT count(*) FROM sales_db.sales_raw").show()

+--------+
|count(1)|
+--------+
|  186850|
+--------+



## Data Transformation

### Update Data Types and Add New Columns

In [0]:
tmp_sales_df = spark.sql("""
                         
SELECT 
  cast(OrderId AS INT) AS OrderID,
  Product,
  QuantityOrdered,
  cast(PriceEach AS DECIMAL(10,2)) AS PriceEach,
  to_timestamp(OrderDate, 'MM/dd/yy HH:mm') AS OrderDate,
  PurchaseAddress,
  trim(split(PurchaseAddress, ",")[1]) AS City, -- new column
  trim(substring(split(PurchaseAddress, ",")[2], 1, 3)) AS State, -- new column
  year(to_timestamp(OrderDate, 'MM/dd/yy HH:mm')) AS ReportYear, -- new column
  month(to_timestamp(OrderDate, 'MM/dd/yy HH:mm')) AS ReportMonth -- new column
FROM sales_raw
WHERE OrderID != "Order ID" 
  AND OrderID IS NOT NULL; -- ignoring bad records

""")

### Create Temporary Table (`tmp_sales`)

In [0]:
tmp_sales_df.createOrReplaceTempView("tmp_sales")

In [0]:
spark.sql("DESCRIBE tmp_sales").show()

+---------------+-------------+-------+
|       col_name|    data_type|comment|
+---------------+-------------+-------+
|        OrderID|          int|   null|
|        Product|       string|   null|
|QuantityOrdered|       string|   null|
|      PriceEach|decimal(10,2)|   null|
|      OrderDate|    timestamp|   null|
|PurchaseAddress|       string|   null|
|           City|       string|   null|
|          State|       string|   null|
|     ReportYear|          int|   null|
|    ReportMonth|          int|   null|
+---------------+-------------+-------+



## Create Final Table (`sales`)

In [0]:
spark.sql("DROP TABLE IF EXISTS sales_db.sales;")

Out[23]: DataFrame[]

### Assign Parquet, Partition, Compression, and Location Options

In [0]:
spark.sql("""

CREATE TABLE sales (
    OrderId INT,
    Product STRING,
    Quantity INT,
    Price DECIMAL(10, 2),
    OrderDate TIMESTAMP,
    PurchaseAddress STRING,
    City STRING,
    State STRING,
    ReportYear INT,
    ReportMonth INT
)
USING PARQUET 
PARTITIONED BY (ReportYear, ReportMonth)
OPTIONS('compression' = 'snappy')
LOCATION 'dbfs:/FileStore/salesdata/published';

""")

Out[24]: DataFrame[]

In [0]:
spark.sql("DESCRIBE sales").show()

+--------------------+-------------+-------+
|            col_name|    data_type|comment|
+--------------------+-------------+-------+
|             OrderId|          int|   null|
|             Product|       string|   null|
|            Quantity|          int|   null|
|               Price|decimal(10,2)|   null|
|           OrderDate|    timestamp|   null|
|     PurchaseAddress|       string|   null|
|                City|       string|   null|
|               State|       string|   null|
|          ReportYear|          int|   null|
|         ReportMonth|          int|   null|
|# Partition Infor...|             |       |
|          # col_name|    data_type|comment|
|          ReportYear|          int|   null|
|         ReportMonth|          int|   null|
+--------------------+-------------+-------+



### Insert `tmp_sales` Data Into `sales`

In [0]:
spark.sql("""

INSERT INTO sales
    SELECT 
        OrderID,
        Product,
        cast(QuantityOrdered AS INT) AS Quantity,
        cast(PriceEach AS DECIMAL(10, 2)) AS Price,
        OrderDate,
        PurchaseAddress,
        City,
        State,
        ReportYear,
        ReportMonth
    FROM tmp_sales;

""")

Out[26]: DataFrame[]

In [0]:
spark.sql(
"""

SELECT *
    FROM sales
    LIMIT 10;

""").show()

+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+----------+-----------+
|OrderId|             Product|Quantity| Price|          OrderDate|     PurchaseAddress|         City|State|ReportYear|ReportMonth|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+----------+-----------+
| 141234|              iPhone|       1|700.00|2019-01-22 21:25:00|944 Walnut St, Bo...|       Boston|   MA|      2019|          1|
| 141235|Lightning Chargin...|       1| 14.95|2019-01-28 14:15:00|185 Maple St, Por...|     Portland|   OR|      2019|          1|
| 141236|    Wired Headphones|       2| 11.99|2019-01-17 13:33:00|538 Adams St, San...|San Francisco|   CA|      2019|          1|
| 141237|    27in FHD Monitor|       1|149.99|2019-01-05 20:33:00|738 10th St, Los ...|  Los Angeles|   CA|      2019|          1|
| 141238|    Wired Headphones|       1| 11.99|2019-01-25 11:59:00|387 10th St, Aust