# Spark 1: Extract, transform, and load a CSV file with Spark (from bucket to database)

In this lesson we use PySpark to load some sales rows from CSV files located in a storage bucket and save them to `fact_sales` database table in PostgreSQL.

## Step 1: Add a file to the storage bucket

- Execute `taito open bucket` on command-line to open the locally running bucket on web browser.
  - TIP: You can alternatively use `taito open bucket:ENV` to connect to a non-local bucket (ENV is `dev`, `test`, `stag`, or `prod`).
- Login in with access key `minio` and secret key `secret1234`.
- Create a folder named `sales` and upload the Sales.csv file to the folder.

## Step 2: Execute the code

In [None]:
# Imports
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Load generic helper functions
%run ../../common/jupyter.ipynb
import src_common_database as db
%run ../../common/spark.ipynb
import src_common_util as util

# Use storage bucket defined with environment variables
bucket = os.environ['STORAGE_BUCKET_URL']
protocol = st.init_spark(sc)

In [None]:
# Read CSV files from the sales folder
df = spark.read.csv(protocol + bucket + "/sales",        # Read from /sales folder
                    pathGlobFilter="*.csv",              # Read only *.csv files
                    recursiveFileLookup=True,            # Read recursively also from subfolders
                    modifiedAfter="2021-04-01T00:00:00", # Fetch only files modified after this timestamp
                    header=True,                         # Each CSV file includes a header row with column names
                    ignoreLeadingWhiteSpace=True,        # Trim column values
                    ignoreTrailingWhiteSpace=True,       # Trim column values
                    mode="FAILFAST")                     # Do not allow invalid CSV

# DEBUG: Show the contents
df.show()

In [None]:
# Change dataframe schema to match the database table and
# generate unique key by concatenating order number and product SKU
db_df = df.select(
    concat(col("Order"), lit("."), col("Product")).alias("key"),
    col("Date").alias("date_key"),
    col("Product").alias("product_key"),
    col("Order").alias("order_number"),
    col("Quantity").alias("quantity").cast(IntegerType()),
    col("Price").alias("price").cast(FloatType())
);

# DEBUG: Show the renamed schema
db_df.printSchema()

In [None]:
# Insert data to the fact_sales database table
# NOTE: If you get "ERROR: duplicate key value violates unique constraint", execute `taito init --clean` to clean your database from old data.
db_df.write.mode("append").jdbc(db.get_jdbc_url(), "fact_sales", properties=db.get_jdbc_options())

# DEBUG: Show the data stored in database
spark.read.jdbc(db.get_jdbc_url(), "fact_sales", properties=db.get_jdbc_options()).show()

## Step 3: Connect to the database with Taito CLI

- Execute `taito db connect` on command-line to connect to the local database.
  - TIP: You can alternatively use `taito db connect:ENV` to connect to a non-local database (ENV is `dev`, `test`, `stag`, or `prod`).
- Show all sales rows with `select * from fact_sales`.

## Step 4: Change the implementation to update existing data and insert new data

Unfortunately Spark does not currently support upsert (see [SPARK-19335](https://issues.apache.org/jira/browse/SPARK-19335)). There are multiple ways to go around this, for example:

- Write data to a separate loading view that has a trigger that executes upsert for the target table on insert.
- Write data to a separate loading table that has a trigger that executes upsert for the target table on insert.
- Write data to a temporary table and then merge the data to the target table with a custom sql clause.
- Just overwrite all data in the target table, preferably with truncate mode to keep the table schema intact.

This is how you can implement the first option (loading view). Normally we would add a new database migration for this with `taito db add NAME`, but since our database tables are not yet in production, we can just modify the existing migrations and redeploy them.

TODO: UUSI MIGRAATIO NÄISTÄ!

1. Copy-paste the following content to the existing files: `database/deploy/fact_sales.sql`, `database/revert/fact_sales.sql`, and `database/verify/fact_sales.sql`.

```sql
-- Deploy fact_sales to pg

BEGIN;

CREATE TABLE fact_sales (
  key text PRIMARY KEY,
  date_key text NOT NULL REFERENCES dim_dates (key),
  product_key text NOT NULL REFERENCES dim_products (key),
  order_number text NOT NULL,
  quantity integer NOT NULL,
  price numeric(12,2) NOT NULL
);

CREATE VIEW load_sales AS SELECT * FROM fact_sales;

CREATE OR REPLACE FUNCTION load_sales() RETURNS TRIGGER AS $$
BEGIN
  INSERT INTO fact_sales VALUES (NEW.*)
  ON CONFLICT (key) DO
    UPDATE SET
      date_key = EXCLUDED.date_key,
      product_key = EXCLUDED.product_key,
      order_number = EXCLUDED.order_number,
      quantity = EXCLUDED.quantity,
      price = EXCLUDED.price;
  RETURN new;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER load_sales
INSTEAD OF INSERT ON load_sales
FOR EACH ROW EXECUTE PROCEDURE load_sales();

COMMIT;
```

```sql
-- Revert fact_sales from pg

BEGIN;

DROP TRIGGER load_sales ON load_sales;
DROP FUNCTION load_sales;
DROP VIEW load_sales;
DROP TABLE fact_sales;

COMMIT;
```

```sql
-- Verify fact_sales on pg

BEGIN;

SELECT key FROM load_sales LIMIT 1;
SELECT key FROM fact_sales LIMIT 1;

ROLLBACK;
```
    
2. Redeploy database migrations and example data to local database with `taito init --clean`.
3. Execute the following code to load CSV data to database yet again:

In [1]:
# Write the data to the "load_sales" view instead of "fact_sales" table
db_df.write.mode("append").jdbc(db.get_jdbc_url(), "load_sales", properties=db.get_jdbc_options())

# DEBUG: Show the data stored in fact_sales. You manual data changes should have been overwritten.
spark.read.jdbc(db.get_jdbc_url(), "fact_sales", properties=db.get_jdbc_options()).show()

NameError: name 'db_df' is not defined

4. Connect to the database with `taito db connect` and modify some quantity and price values manually in the `fact_sales` table. Also delete one of the rows.
5. Execute the following code to make sure your manual changes will be overwritten on data load. Note that the CSV data contains only 4 rows (orders 00000000003, 00000000004, and 00000000005). Other rows wont be overwritten.

In [2]:
# Write the data to the "load_sales" view instead of "fact_sales" table
db_df.write.mode("append").jdbc(db.get_jdbc_url(), "load_sales", properties=db.get_jdbc_options())

# DEBUG: Show the data stored in fact_sales. You manual data changes should have been overwritten.
spark.read.jdbc(db.get_jdbc_url(), "fact_sales", properties=db.get_jdbc_options()).show()

NameError: name 'db_df' is not defined

## Next lesson: [Spark 2 - Listen storage bucket for uploads with Spark](02.ipynb)