# Pharmacy OTC Sales Data (2022)

The purpose of the notebook is to explore ingestion methods.

In [0]:
%sql
USE CATALOG training_catalog;

In [0]:
%sql
USE SCHEMA pharmacy_otc_sales_data_2022_db;

In [0]:
%sql
LIST "/Volumes/training_catalog/pharmacy_otc_sales_data_2022_db/training_files"

## Batch data ingestion with CTAS

This method is to ingest data as a inital load.

### Using SQL language

In [0]:
%sql
SELECT *
FROM read_files(
    "/Volumes/training_catalog/pharmacy_otc_sales_data_2022_db/training_files",
    FORMAT => "CSV"
)
LIMIT 10;

Creating managed delta table

In [0]:
%sql
DROP TABLE IF EXISTS pharmacy_sales_bronze;

-- Create table
CREATE TABLE IF NOT EXISTS pharmacy_sales_bronze
AS
SELECT *
FROM read_files(
    "/Volumes/training_catalog/pharmacy_otc_sales_data_2022_db/training_files",
    FORMAT => "CSV",
    HEADER => TRUE,
    SCHEMA => "date STRING, product STRING, sales_person STRING, boxes_shipped STRING, amount STRING, country STRING"
);

-- Preview data
SELECT * FROM pharmacy_sales_bronze LIMIT 10;

In [0]:
%sql
DESCRIBE TABLE EXTENDED pharmacy_sales_bronze;

### Using python language

Read CSV files from a volume into a Dataframe

In [0]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col

In [0]:
schema = StructType([
    StructField("date", StringType(), True),
    StructField("product", StringType(), True),
    StructField("sales_person", StringType(), True),
    StructField("boxes_shipped", StringType(), True),
    StructField("amount", StringType(), True),
    StructField("country", StringType(), True)
])

In [0]:
df = (spark.read
      .format("csv")
      .option("header", "true")
      .option("inferSchema", "false")
      .option("schema", schema)
      .load("/Volumes/training_catalog/pharmacy_otc_sales_data_2022_db/training_files")
)

In [0]:
df = df.select(col("Date").alias("date"), 
               col("Product").alias("product"),
               col("Sales Person").alias("sales_person"),
               col("Boxes Shipped").alias("boxes_shipped"),
               col("Amount ($)").alias("amount"),
               col("Country").alias("country"),)

In [0]:
df.display()

Write data to a managed delta table

In [0]:
df.write.mode("overwrite").saveAsTable("training_catalog.pharmacy_otc_sales_data_2022_db.pharmacy_sales_bronze_python")

Read and view table

In [0]:
df_table = spark.table("training_catalog.pharmacy_otc_sales_data_2022_db.pharmacy_sales_bronze_python")
df_table.display()

## Incremental ingestion with COPY INTO

This method is to ingest data incrementally, where "COPY INTO" instruction only ingest new data. If there are no changes, this method does not insert data.

### Using SQL language

Explore data

In [0]:
%sql
SELECT *
FROM read_files(
    "/Volumes/training_catalog/pharmacy_otc_sales_data_2022_db/training_files",
    FORMAT => "CSV"
)
LIMIT 10;

Create an empty table, and later to ingest data into that table.

In [0]:
%sql
DROP TABLE IF EXISTS pharmacy_sales_bronze_ci;

-- Create table
CREATE TABLE IF NOT EXISTS pharmacy_sales_bronze_ci (
  date STRING,
  product STRING,
  sales_person STRING,
  boxes_shipped STRING,
  amount STRING,
  country STRING
);

-- Copy data into a table
-- Columns are mentioned in order to force this schema during ingestion
-- hear = false and skipRows = 1 help to avoid to insert the header as a row
-- inferSchema = false, because schema was specified during table creation
COPY INTO pharmacy_sales_bronze_ci (date, product, sales_person, boxes_shipped, amount, country)
FROM "/Volumes/training_catalog/pharmacy_otc_sales_data_2022_db/training_files/"
FILEFORMAT = csv
FORMAT_OPTIONS (
  "header" = "false",
  "inferSchema" = "false",
  "skipRows" = "1"
);

In [0]:
%sql
SELECT * FROM pharmacy_sales_bronze_ci LIMIT 5;

Rerun data ingestion with no changes

In [0]:
%sql
-- Copy data into a table
COPY INTO pharmacy_sales_bronze_ci (date, product, sales_person, boxes_shipped, amount, country)
FROM "/Volumes/training_catalog/pharmacy_otc_sales_data_2022_db/training_files/"
FILEFORMAT = csv
FORMAT_OPTIONS (
  "header" = "false",
  "inferSchema" = "false",
  "skipRows" = "1"
);

## Incremental ingestion with streaming tables

This method is a new version of incremental ingestion, so it is preferred instead of "COPY INTO".

In [0]:
%sql
CREATE OR REFRESH STREAMING TABLE pharmacy_sales_bronze_autoloader
SCHEDULE EVERY 1 WEEK
AS
SELECT *
FROM STREAM read_files(
  "/Volumes/training_catalog/pharmacy_otc_sales_data_2022_db/training_files/",
  FORMAT => "CSV",
  SEP => ",",
  HEADER => FALSE,
  SCHEMA => "date STRING, product STRING, sales_person STRING, boxes_shipped STRING, amount STRING, country STRING"
);