# Objectives
- Querying data files
- Writing to tables
- Performing advanced ETL operations
- Discover the potential of higher-order functions and user-defined functions (UDFs) in Spark SQL

# Querying Data Files
To initiate a file query, we use the SELECT * FROM syntax, followed by the file format and the path to the file. 
```sql
SELECT * FROM file_format.`/path/to/file`
```
The filepath is specified between **backticks**, to prevent potential syntax errors and ensure the correct interpretation of the path. 

A filepath in this context can refer to 
- A single file
- A wildcard character to simultaneously read multiple files; or
- An entire directory, assuming that all files within that directory adhere to the same format and schema

We can now demonstrate extracting data directly from files using a real-world dataset representing an online school environment. This dataset consists of three tables:
- Students
- Enrollments
- Courses

We begin by running a helper notebook, "School-Setup", which can be found within the `Include` subfolder. This helper notebook facilitates downloading of the dataset to the Databricks file system and prepares the working environment accordingly:

In [0]:
%run ./Includes/School-Setup

## Querying JSON Format
The student data in this dataset is formatted in JSON. The placeholder `dataset_school` referenced in the following query, is a variable defined within our "School-Setup" notebook. It points to the location where the dataset files are stored on the filesystem. 

In [0]:
%python
files = dbutils.fs.ls(f"{dataset_school}/students-json")
display(files)

The output above shows that there are 6 JSON files in the `students-json` folder.

### Reading a single data file
To read a single JSON file, the `SELECT` statement is used with the syntax `SELECT * FROM json.`, and then the full path for the JSON file is specified between backticks. We use the `dataset.school` placeholder with the `$` character to reference the location where the dataset files are stored. This placeholder is configured in the "School-Setup" notebook: 

In [0]:
SELECT * FROM json.`${dataset.school}/students-json/export_001.json`

The result displays the extracted student data, including:
- Student ID
- Email
- GPA score
- Profile information (in JSON format); and
- The last updated timestamp

### Querying multiple files
To query multiple files simultaneously, you can use the wildcard character (*) in the path. For example, you can easily query all JSON files starting with the name `export_`:

In [0]:
SELECT * FROM json.`${dataset.school}/students-json/export_*.json`

### Querying an entire directory
You can query and entire directory of files, assuming a consistent format and schema across all files in the directory. In the following query, the directory path is specified instead of an individual file:

In [0]:
SELECT * FROM json.`${dataset.school}/students-json`

#### Recording the source file
When dealing with multiple files, adding the `input_file_name` function becomes useful. This built-in Spark SQL function records the source data file for each record. This helps in troubleshooting data-related issues by precisely pinpointing their exact source.

In [0]:
SELECT *, input_file_name() source_file FROM json.`${dataset.school}/students-json`

The output above shows in addition to the original columns, a new column `source_file`. This column provides supplementary information about the origin of each record in the dataset.

## Querying Using the text Format
When dealing with a variety of text-based files, including formats such as JSON, CSV, TSV, and TXT, Databricks provides the flexibility to handle them using the text format:
```sql
SELECT * FROM text.`/path/to/file`
```
This format allows you to extract the data as raw strings, which provide significant advantages in scenarios where input data might be corrupted or contain anomalies. 
By extracting data as raw strings, you can leverage custom parsing logic to navigate and extract relevant values from the text-based files.

In [0]:
SELECT * FROM text.`${dataset.school}/students-json`

The output above displays the student data as raw strings. Each line of the file is loaded as a record within a one-string column, `named` value.

With this result, you can easily apply custom parsing or transformationt techniques to extract specific fields, correct anomalies, or reformat the data as needed, for subsequent analysis.

## Querying Using binaryFile Format
There are scenarios where the binary representation of file content is essential, such as when working with images or unstructured data. In such cases, the `binaryFile` format is suited for this task:
```sql
SELECT * FROM binaryFile.`/path/sample_image.png`
```
In the sample query provided, the `binaryFile` format is employed to query an image (`sample_image.png`), allowing you to work directly with the binary representation of the file's content.

We can use the `binaryFile` format to extract the raw bytes and some metadata information of the student files:

In [0]:
SELECT * FROM binaryFile.`${dataset.school}/students-json`

The output of the query provides the following details about each source file:
- `path` provides the location of the source file on the storage
- `modificationTime` gives the last modification time of the file
- `length` indicates the size of the file
- `content` represents the binary representation of the file

So, by using the binaryFile format, you can access both the content and metadata for files, offering a detailed view of your dataset.

## Querying Non-Self-Describing Formats
The previous querying approach is particularly effective with self-describing formats that possess a well-defined schema, such as JSON and Parquet. By nature, these formats offer a built-in structure that makes it easy to retrieve and interpret data using `SELECT` queries.

However, when dealing with non-self-describing formats such as CSV, the `SELECT` statement may not be as informative. Unlike JSON and Parquet, CSV files lack a predefined schema, making the format less suitable for direct querying. In such cases, additional steps, such as defining a schema, may be necessary for effective data extraction and analysis.

In [0]:
SELECT * FROM csv.`${dataset.school}/courses-csv`

As shown from the output above, the query is not well-parsed:
- The header row is extracted as a table row; and
- All columns are loaded into a single column, `_c0`.
This behaviour is explained by the delimiter - the symbol used to separate columns in the file - which, in this case, is a semicolon rather than the standard comma.

This issue highlights a challenge with querying files without a well-defined schema, particulary in formats like CSV. In the upcoming sections, we will learn how to address this challenge.

## Registering Tables from Files with CTAS
Using CTAS statements allow you to register tables from files, particularly when dealing with well-defined schema sources like Parquet files. This process is crucial for loading data into a Lakehouse:
`CREATE TABLE table_name AS`
```sql
SELECT * FROM file_format.`/path/to/file`
```

CTAS statements simplify the process of creating Delta Lake tables by automatically inferring schema information from the query results. This eliminates the need for manual schema declaration. 

In the following example, we create and populate the student data using a CTAS statement:

In [0]:
DROP TABLE IF EXISTS students;
CREATE TABLE students AS SELECT * FROM json.`${dataset.school}/students-json`;
DESCRIBE EXTENDED students

The output above displays the metadata of the `students` table.

- The `Provider` value confirms the creation of a Delta Lake table. This means that the CTAS statement has extracted the data from the JSON files and loaded it into the students table in Delta format (i.e., in Parquet data files along with a Delta transaction log).
- Additionally, this table is identified as a _managed_ table, as indicated by the `Type` value.
- Moreoever, the schema has automatically been inferred from the query results, a feature common to CTAS statements; making CTAS statements a suitable choice for external data ingestion from sources with well-defined schemas, such as Parquet files. 

However, it is important to note that CTAS statements come with certain limitations. One such limitation is that CTAS statements do not support specifying additional file options. This becomes a challenge when trying to ingest data from CSV files or other formats that require specific configurations.  

In [0]:
DROP TABLE IF EXISTS courses_unparsed;
CREATE TABLE courses_unparsed AS SELECT * FROM csv.`${dataset.school}/courses-csv`;
SELECT * FROM courses_unparsed;

The output shows that we have successfully created a Delta Lake table; however, the data is not well-parsed. 
Typically, CSV files have delimiter or encoding options that need to be specified during the data loading process. In response to this requirement, we will now explore an alternative solution.

## Registering Tables on Foreign Data Sources
In scenarios where additional file options are necessary, an alternative solution is to use the regular `CREATE TABLE` statement, but with the `USING` keyword. 

The `USING` keyword provides increased flexibility by allowing you to specify the type of foreign data source, such as CSV format, as well as any additional file options, such as delimiter and header presence:
`CREATE TABLE table_name (col_name1, col_type1,...) USING data_source OPTIONS (key1 = val1, key2 = val2,...) LOCATION path`

It is crucial to note that:
- This method creates an external table, serving as a reference to the files without physically moving the data during table creation, to Delta Lake.
- Unlike CTAS statements, which automatically infer schema information, creating a table via the `USING` keyword requires you to provide the schema explicitly. Hence, this method offers more control over the schema definition.

### Examples of foreign data sources

#### Example 1: CSV foreign source
To deal with CSV files stored in an external location, the followiing example demonstrates the creation of a table using a CSV foreign source:

`CREATE TABLE csv_external_table 
  (col_name1, col_type1, ...)
  USING CSV
  OPTIONS (header = "true", delimiter = ";")
  LOCATION '/path/to/csv/files'`

This code sample:
- Creates an external table that points to CSV files located in the specified path.
- It configures the `header` option to indicate the presence of a header in the files.
- The delimiter option is set to use a semicolon instead of the default comma separator.

Let's apply this method to our courses data:

In [0]:
CREATE TABLE IF NOT EXISTS courses_csv 
( course_id STRING, title STRING, instructor STRING, category STRING, price DOUBLE)
USING CSV 
OPTIONS (header = "True", delimiter = ";")
LOCATION "${dataset.school}/courses-csv"

In this example, the courses_csv table is created by 
- Specifying the CSV format as a foreign source, 
- Indicating the presence of a header in the files, 
- Defining the semicolon as the delimiter, and lastly, 
- Specifying the location of the source files.

Once the table is created, querying it shows that we have the courses' data in a well-defined structured form:

In [0]:
SELECT * FROM courses_csv

**NOTE**
When working with CSV files as a data source, maintaining the column order becomes crucial, especially if additional data files will be added to the source directory. 
- Spark relies on the specified order during table creation to load data and apply column names and data types correctly from CSV files. Any changes to the column order could impact the integrity of the data loading process. 

#### Example 2: Databases as foreign data sources
Another scenario where the `CREATE TABLE` statement with the `USING` keyword proves useful is when creating a table using a JDBC connection, which references data in an external SQL database. 
- This approach enables you to establish a connection to an external database by defining necessary options such as the _connection string_, `username`, _password_, and the specific _database table_ containing the data.

Here's an example of creating an external table using a JDBC connection:

In [0]:
/*
CREATE TABLE jdbc_external_table 
USING JDBC
OPTIONS (
  url = 'jdbc:mysql://your_database_server:port',
  dbtable = 'your_table',
  user = 'your_username',
  password = 'your_password'
);
*/

In the example above, the following apply:
- The `url` option specifies the JDBC connection string to your external database.
- The `dbtable` option indicates the specific table within the external database.
- The `user` and `password` are credentials required for authentication.

By creating an external table using a JDBC connection, you can access and query data from the external database without physically moving or duplicating the data.

### A limitation of using a foreign data source
Tables having foreign data sources are not Delta tables. Performance benefits and features offered by Delta Lake, such as time travel and guaranteed access to the most recent version of the data, are not available for these tables. This limitation becomes especially noticeable when dealing with large database tables, potentially leading to performance issues. 

In [0]:
DESCRIBE EXTENDED courses_csv

The output reveals that
- The table is an external table, and that it is not a Delta table, as indicated in the `Provider` value. This means that no data conversion to the Delta format occurred during table creation; instead, the table simply points to the CSV files stored in the external location.
- Additionally, the `Storage Properties` value captures all metadata and options specified during table creation, ensuring that data in the location is always read with these specified options. 

### Impact of not having a Delta table
Unlike Delta Lake tables which guarantee querying the most recent version of source data, tables registered against other data sources, like CSV, may represent outdated cached data. 

To illustrate this, we will add new data and observe the resulting behaviour of the table. 
First, let's check the number of files in the `courses` directory:

In [0]:
%python

files = dbutils.fs.ls(f"{dataset_school}/courses-csv")
display(files)

The directory currently contains 4 files. As seen in the following output, each file contains three records; and hence, the table holds 12 records:

In [0]:
SELECT COUNT(*) FROM courses_csv

Now, let's run the following Python command to duplicate and rename one of these files. This action simulates the ingestion of new CSV files by a source system:

In [0]:
%python

dbutils.fs.cp(f"{dataset_school}/courses-csv/export_001.csv", f"{dataset_school}/courses-csv/copy_001.csv")

After this operation, exploring the courses directory confirms that the new file has been added:

In [0]:
%python

files = dbutils.fs.ls(f"{dataset_school}/courses-csv")
display(files)

Despite adding new data to the directory, we notice that the table does not immediately reflect the changes from 12 to 15 records:

In [0]:
SELECT COUNT(*) FROM courses_csv

Spark automatically caches the underlying data in local storage for better performance in subsequent queries. However, the external CSV file does not natively signal Spark to refresh this cached data. Consequently, the new data remains invisible until the cache is manually refreshed using the `REFRESH TABLE` command:

In [0]:
REFRESH TABLE courses_csv

However, the action invalidates the table cache, necessitating a rescan of the original data source to reload all data into memory. This process can be particularly time-consuming when dealing with large datasets. 

Upon refreshing the table, querying it again retrieves the updated count:

In [0]:
SELECT COUNT(*) FROM courses_csv

This observation emphasises the trade-offs and considerations associated with choosing between Delta tables and foreign data sources when working with Databricks.

### A hybrid approach
To address this limitation and leverage the advantages of Delta Lake, a workaround involves:
- Creating a temporary view that refers to the foreign data source, then
- Executing a CTAS statement on this temporary view to extract data from the external source and load it into a Delta table.

Here's an illustrative example of this process:


In [0]:
/*
CREATE TEMP VIEW foreign_source_tmp_vw (
  col1 col1_type, ...
)
  USING data_source
  OPTIONS (key1 = "val1", key2 = "val2", ..., path = "/path/to/data");

CREATE TABLE AS delta_table AS SELECT * FROM foreign_source_tmp_vw;
*/

In the example above, a temporary view is created referring to a foreign data source. A Delta Lake table is then created by executing a CTAS statement on the temporary view. This process moves the data into a Delta format (Parquet data files + transaction log in JSON format).

In the same way, we can apply this approach on the course data, delivered in CSV format. 
- We first create a temporary view and configure it to handle file options. Then,
- We execute a CTAS statement to make a copy of the data from the temporary view into a Delta Lake table, named `courses`:

In [0]:

CREATE TEMP VIEW courses_tmp_vw (course_id STRING, title STRING, instructor STRING, category STRING, price DOUBLE)
USING CSV
OPTIONS (
  path "${dataset.school}/courses-csv/export_*.csv",
  header = "true",
  delimiter = ";"
);


CREATE TABLE courses AS SELECT * FROM courses_tmp_vw; 

The output below displays the metadata information of the courses table. It confirms that it is a Delta Lake table.

In [0]:
DESCRIBE EXTENDED courses

Finally, querying the table confirms that it contains well-parsed data from the CSV files:

In [0]:
SELECT * FROM courses

# Writing to Tables
We initiate our exploration by using a CTAS statement to create the enrollments Delta table from Parquet files:

In [0]:
%run ./Includes/School-Setup

In [0]:
CREATE TABLE enrollments AS SELECT * FROM parquet.`${dataset.school}/enrollments`

Once the table is created, we proceed to query its content:

In [0]:
SELECT * FROM enrollments

Since Parquet files have a well-defined schema, we observe that Delta Lake has accurately captured the schema and successfully extracted the data. 

## Replacing Data
You can completely replace the content of a Delta Lake table either by
- Overwriting the existing table, or by
- Other traditional methods like dumping and re-creating it.
However, overwriting Delta tables offers several advantages over the approach of merely dropping and re-creating tables:

| Category                        | Drop and Recreate Table                                                                 | Overwrite Table                                                                       |
|---------------------------------|------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------|
| **Processing time**             | Time-consuming as it involves recursively listing directories and deleting large files. | Fast process since the updated data is just a new table version.                      |
| **Leveraging Delta’s time travel** | Deletes the old versions of the table, making its historical data unavailable for retrieval. | Preserves the old table versions, allowing easy retrieval of historical data.         |
| **Concurrency**                | Concurrent queries are unable to access the table while the operation is ongoing.       | Concurrent queries can continue reading the table seamlessly while the operation is in progress. |
| **ACID guarantees**            | If the operation fails, the table cannot be reverted to its original state.             | If the operation fails, the table will revert to its previous state.                  |

In Databricks, there are two methods to completely replace the content of Delta Lake tables:
- `CREATE OR REPLACE TABLE` statements
- `INSERT OVERWRITE` statements

### CREATE OR REPLACE TABLE statements
This is also known as **CRAS** (**CREATE OR REPLACE AS SELECT**) statement. This statement fully replaces the content of a table each time it executes:

In [0]:
CREATE OR REPLACE TABLE enrollments AS SELECT * FROM parquet.`${dataset.school}/enrollments`

Upon executing this statement, the `enrollments` table will be overwritten with the newer data.

In [0]:
DESCRIBE HISTORY enrollments

As illustrated in the output, version 0 is a CTAS statement; meanwhile, the CRAS statement has generated a new table version, reflecting the updated state of the table after the overwrite operation.

### INSERT OVERWRITE

In [0]:
INSERT OVERWRITE enrollments SELECT * FROM parquet.`${dataset.school}/enrollments`

While the above statement achieves a similar outcome to the `CREATE OR REPLACE TABLE` approach, there are some key differences and nuances to consider. Unlike the CRAS statement which can create a new table if one does not exist, `INSERT OVERWRITE` can only overwrite an existing table.

In [0]:
DESCRIBE HISTORY enrollments

Delta Lake records the `INSERT OVERWRITE` operation as a standard `WRITE` operation. However, the mode of this operation is marked as `OVERWRITE` in the `operationParameters` field. This indicates that the existing data was replaced with the new records from the query.

One significant aspect of `INSERT OVERWRITE` is that it replaces all existing data in the target (table or partition), provided the new data matches the schema. This prevents any risk of accidentally modifying the table structure. Thus, `INSERT OVERWRITE` is considered a more secure approach for overwriting existing tables. 

When attempting to overwrite data with a schema that differs from the existing schema, a schema mismatch error will be generated. For example, let's consider adding an extra column containing the source file name to our table:

In [0]:
INSERT OVERWRITE enrollments SELECT *, input_file_name() FROM parquet.`${dataset.school}/enrollments`

The previous command results in an exception indicating a schema mismatch. This occurs because the schema of the new data being inserted does not match the existing schema of the `enrollments` table.
**Delta Lake tables are by definition, schema-on-write**, which means Delta Lake enforces schema consistency during write options. Any attempt to write data with a schema that differs from the table's schema will be rejected to maintain data integrity.

## Appending Data
One of the simplest methods to append records to Delta Lake tables is through the use of the INSERT INTO statement. This statement allows you to easily add new data to existing tables from the result of an SQL query. 

In [0]:
INSERT INTO enrollments SELECT * FROM parquet.`${dataset.school}/enrollments-new`

By executing the above `INSERT INTO` statement, we insert 700 new records into our table.

In [0]:
SELECT COUNT(1) FROM enrollments

While the `INSERT INTO` statement provides a convenient means of appending records to tables, it lacks built-in mechanisms to prevent the insertion of duplicate data. This means that if the insertion query is executed multiple times, it will write the same records to the target table repeatedly, leading to the creation of duplicate entries. To address this issue effectively, we use an alternative method: the `MERGE INTO` statement.

## Merging Data
The MERGE INTO statement enables you to perform **upsert** operations - meaning you can insert new data and update existing records - and even delete records, all within a single statement. Here's the basic syntax:

In [0]:
/*
MERGE INTO target_table AS target
USING source_table AS source
ON target.id = source.id 
WHEN MATCHED THEN 
  UPDATE SET target.id = source.id
WHEN NOT MATCHED THEN
  INSERT (id, name) VALUES (source.id, source.name)

*/

| Clause             | Purpose                                                               |
|--------------------|-----------------------------------------------------------------------|
| MERGE INTO         | Target table you want to update or insert into                        |
| USING              | Source table or subquery providing new data                           |
| ON                 | Match condition between source and target rows                        |
| WHEN MATCHED       | What to do when a matching row exists (usually an UPDATE)             |
| WHEN NOT MATCHED   | What to do when no matching row exists in target (usually an INSERT)  |

In [0]:
%run ./Includes/School-Setup

For example, we can use the `MERGE INTO` statement to update student data with modified email addresses and add new students into the table. 

To accomplish this, we first create a temporary view containing the updated student data. This will serve as the source from which we will merge changes into our students table:

In [0]:
CREATE OR REPLACE TEMP VIEW students_updates AS SELECT * FROM json.`${dataset.school}/students-json-new`

The following merge operation is executed to merge changes from the `students_updates` temporary view into the target `students` table, using the student ID as the key for matching records:

In [0]:
MERGE INTO students c
USING students_updates u
ON c.student_id = u.student_id 
WHEN MATCHED AND c.email IS NULL AND u.email IS NOT NULL THEN
  UPDATE SET email = u.email, updated = u.updated 
WHEN NOT MATCHED THEN INSERT *

Within the above `MERGE INTO` statement, we define two primary actions based on the matching status of records:

- **Update action (WHEN MATCHED clause)**: When a match is found between the source and target records, an update action is performed. This action involves updating the email address and the last updated timestamp. We introduce additional conditions: _we check if the email address in the current row is null while the corresponding record in the `students_updates` view contains a valid email address_. For such records, we proceed by updating the email field and the last updated timestamp in the target table.
- **Insert action (WHEN NOT MATCHED)**: For records in the `students_updates` view that do not match any existing students based on the student ID, an insertion is triggered. This ensures that all new students are added into our target table.

**Some advantages of using MERGE INTO statements include**:
- The ability to execute updates, inserts, and deletes within a single atomic transaction. This ensures data consistency and integrity by treating all operations as a single unit, thereby minimising the risk of inconsitencies or partial changes on the table. 
- The merge operation serves as an excellent solution for preventing duplicates during record insertion.

Let's consider another scenario where we have a set of new courses to be inserted, delivered in CSV format. To facilitate this, we'll establish a temporary view based on this new data:

In [0]:
CREATE OR REPLACE TEMP VIEW courses_updates (course_id STRING, title STRING, instructor STRING, category STRING, price DOUBLE)
USING CSV
OPTIONS (
  path = "${dataset.school}/courses-csv-new",
  header = "true",
  delimiter = ";"
);

Now, we can use the `MERGE INTO` statement to synchronise the `courses` table with the information sourced from the temporary view `courses_updates`. 

In this scenario, we exclusively focus on the condition where there is no match. This implies that we will only insert new data if it does not already exist in our target table, based on the unique key comprising both the `course_id` and the `title` fields.

Among the new courses, our interest lies only in inserting those categorised under _computer science_:

In [0]:
MERGE INTO courses c
USING courses_updates u
ON c.course_id = u.course_id AND c.title = u.title 
WHEN NOT MATCHED AND u.category = 'Computer Science' THEN INSERT * 

The query execution resulted in the insertion of three new records, all belonging to the computer science category. The operation is called an **insert-only merge**, which demonstrates the merge operation's ability to prevent duplicate records.

In [0]:
MERGE INTO courses c
USING courses_updates u
ON c.course_id = u.course_id AND c.title = u.title 
WHEN NOT MATCHED AND u.category = 'Computer Science' THEN INSERT * 

The second execution of our merge statement didn't lead into the reinsertion of the records, as they already exist in the table.