## Data Transformation with Spark on Databricks

Data transformation is a critical step in the journey from raw data to actionable insights. It involves the process of cleaning, enriching, and structuring data to make it suitable for analysis. The importance of data transformation lies in its ability to:

- **Enhance Data Quality**: Transformations help in cleaning and validating data, ensuring that it is accurate and reliable for downstream analysis

- **Enable Analysis**: Well-transformed data is easier to analyze, allowing data scientists and analysts to derive meaningful patterns, trends, and insights

- **Support Decision-Making**: Businesses rely on high-quality, transformed data to make informed decisions, optimize processes, and gain a competitive edge

> Databricks leverages the power of Spark to ensure that data undergoes these transformations efficiently, providing a seamless and collaborative environment where the full potential of transformed data can be realized for robust analytics and decision-making.

## Apache Spark Architecture

Before we start leveraging Spark to perform data transformation, let's first understand the architecture underlying Apache Spark. Remember, Spark is a unified engine for large-scale distributed data processing on computer clusters.

<p align="center">
    <img src="images/SparkArchitecture.png" width="700" height="350"/>
</p>

### Cluster Manager

> Apache Spark's architecture revolves around a *Cluster Manager*, a central entity that coordinates the distribution of tasks across a computing cluster. The Cluster Manager is responsible for resource allocation and task scheduling.
 
In Databricks, users are abstracted from direct interaction with the Cluster Manager. Users only interact with Databricks to create and configure clusters through an interface, but the platform automatically manages cluster resources, handling tasks like resource allocation and task scheduling.

### Spark Application

> A *Spark Application* represents the entire computation process performed using Spark. It consists of the driver program (*Spark Driver*) and a set of executor programs (*Spark Executors*). The Spark Application defines the tasks to be executed on the Spark cluster, and it is submitted to the Cluster Manager for execution.

Databricks facilitates the submission and management of Spark Applications. Users define and execute Spark Applications through Databricks notebooks, allowing them to seamlessly work with Spark's computation processes. Databrcisk manages the Spark Application lifecycle, including job submission and execution.

### Spark Executors

> **Spark Executors** are worker nodes within the cluster responsible for executing tasks. Executors manage data partitions in memory and store intermediate results. They enhance performance by processing data close to where it is stored, minimizing data movement across the network.

Databricks transparently managed Spark Executors. When users execute Spark jobs, the Databricks platform dynamically allocates and oversees Spark Executors on the underlying infrastructure. Users do not need to manually configure or monitor individual Executors.

### Spark Driver

> The **Spark Driver** is a central control program that manages the overall execution of a Spark job. It communicates with the Cluster Manager to acquire resources and coordinates tasks across Spark Executors. The Spark Driver is responsible for overseeing the execution flow and collection final results.

Users initiate the Spark Driver through Databricks notebooks or jobs. Code is written and executed in notebooks, and Databricks  coordinates with the Spark Driver.

### Spark Session

> The *Spark Session* serves as the entry point for interacting with Spark. It manages configuration settings and provides a unified interface for executing various operations. Spark Session facilitate working with different Spark components to offer a user-friendly experience.

Databricks abstract the concept of Spark Session for users. Starting a Spark Session is implicit when running code in a notebook cell. Databricks handles the creation of the Spark Session behind scenes, ensuring users can switch between Spark SQL, Python, and other components without explicit session management.

## Spark SQL vs PySpark

Throughout this lesson, we will use examples of both Spark SQL and PySpark, as most data transformation can be achieved using either of them. This flexibility allows users to choose the approach that best aligns with their preference, expertise, and specific data processing requirements. Whether you will favour the declarative nature of SQL or the programmatic flexibility of Python, Databricks accommodates both.

> **Spark SQL** is a module in Spark designed for structured data processing. It allows users to execute SQL queries on Spark data, providing a high-level interface for working with structure and semi-structured data.

Spark SQL is ideal for scenarios where you want to leverage the familiarity and expressiveness of SQL for querying and analyzing data. It's particularly well-suited for structured datasets and situations where SQL-like operations are preferable.

> **PySpark** is the Python API for Spark. It enables Python developers to harness the power of Spark for distributed data processing. PySpark provides a programmatic interface for working with Spark, allowing more flexibility in expressing complex data transformations and analytics.

PySpark is versatile and can be employed when you need more control and customization in your data processing tasks. It's suitable for scenarios where Python is the preferred language, or when you need to integrate Spark with Python-based libraries and tools.

As we've seen before, Databricks provides a unified platform where you can seamlessly switch between Spark SQL and PySpark within the same notebook environment. This is because both Spark SQL and PySpark operate on the underlying concept of *DataFrames*. This serves as a bridge, allowing you to transition between the two of them using the DataFrame API.

In the previous lesson, we focus our attention on relational entities in Databricks. While effective, traditional data structures such as databases and tables are more limited to SQL-based operations. With DataFrames, you can perform data manipulation and transformation using Spark SQL operations, benefiting from SQL-like syntax, then switch to PySpark to apply more programmatic and customized transformations, all on the same DataFrame.

## DataFrames in Apache Spark

> A **DataFrame** is a distributed collection of data organized into named columns, similar to a table in a relational database. DataFrames serve as a fundamental abstraction, providing a structured and tabular representation o data.

The key features of DataFrames are:

- **Distributed Nature**: DataFrames in Spark are distributed across a cluster of machines, allowing for parallel processing. This distribution enables Spark to handle large-scale datasets by dividing them into smaller partitions and processing them in parallel.

- **Immutable Structure**: DataFrames are immutable, meaning their structure cannot be changed once created. However, you can perform transformations on a DataFrame to create a new DataFrame with the desired changes. This immutability ensures data consistency and facilitates the construction of a lineage of transformations.

- **Lazy Evaluation**: Spark employs lazy evaluation, meaning that transformations on DataFrames are not executed immediately. Instead, Spark builds a logical execution plan, and the actual computation is deferred until an action is triggered. This optimization enhances performance by minimizing unnecessary computations.

- **Schema**: DataFrames have a well-defined schema that specifies the names and types of columns. The schema provides structure to the data, allowing Spark to optimize query execution and enabling users to express complex transformations in a declarative manner.

DataFrames in Spark can be created from various data sources, including structured data formats like `CSV` and `JSON`, or external databases. We will look in details at this in the next section.

## Reading and Loading Data

In the data transformation process, reading and loading data play a pivotal role. In this section, we will learn how to read and load data from various file types and examine practical examples.

### Reading `JSON` Data

#### Reading a Single `JSON` File

Let's start by exploring how to read `JSON` data. We'll cover different scenarios, including reading a single `JSON` file, a directory of `JSON` files, and multiple `JSON` files using a wildcard.

When dealing with a single `JSON` file, PySpark DataFrames provides a straightforward method for reading and interacting with the data. The syntax is as it follows: 

```python
json_data_single_file = spark.read.json("path/to/single/json/file")
```

Let's see how we would use this in Databricks. Begin by downloading [this example `JSON` file](). Next, import it into Databricks using the **Data** explorer. In the **Data** explorer tab, use the **Create Table** button. Utilize the drag-and-drop functionality to upload the previously downloaded file. Note and copy the path at which the file will be uploaded within Databricks.

<p align="center">
    <img src="images/CreateTable.png" width="700" height="550"/>
</p>

Click the **Create Table with UI** button, select a cluster to preview the table, and click **Preview Table**. This should dislay the table preview and the inferred file type. Finally, click **Create Table** to finish.

Now, navigate to a Databricks Notebook and run the following PySpark code to read in the uploaded `JSON` file:

`json_data_from_dbfs1 = spark.read.json('/FileStore/tables/single_json_file_1.json')`

Replacing the file path with your specific file path.

#### Checking DataFrame Contents

After creating the DataFrame, you might want to inspects its contents. You can use the `show()` method to display the first few rows of the DataFrame: `json_data_from_dbfs1.show()`. 

<p align="center">
    <img src="images/ShowCommand.png" width="700" height="200"/>
</p>


This command prints a tabular representation of the DataFrame, providing an overview of the data's structure. You can adjust the number of rows displayed by specifying the desired value within the `show()` method (e.g., `show(10)` for the first 10 rows).

Alternatively, for a more interactive exploration, you can use the `display()` method:

`display(json_data_from_dbfs1)`

<p align="center">
    <img src="images/DisplayTable.png" width="700" height="200"/>
</p>


The `display()` method provides a feature-rich interface for exploring and interacting with the DataFrame, including filtering, sorting, and visualizations. It's a powerful tool for a comprehensive examination of your data.

#### Reading a Directory of `JSON` Files

To read a directory of `JSON` files, you can use a similar approach by specifying the directory path:

`json_data_directory = spark.read.json("path/to/json/files/directory")`

This command reads all the `JSON` files in the specified directory into a PySpark DataFrame. Note, this command assumes that the entire directory contains only `JSON` files.

Let's look at an example to illustrate the process. Follow these steps:

- Begin by downloading [this file](), and [this file](), representing two different `JSON` files

- Import the files into Databricks using the **Data** explorer. In the **Data** explorer tab, make sure to modify the **DBFS target directory** to create a new folder where you can store your `JSON` files. See the example below for updating the first file:

<p align="center">
    <img src="images/NewDirectory.png" width="700" height="550"/>
</p>

- Upload both files to the same directory

- Run the following command to read in all the files in the directory: `json_directory = spark.read.json('/FileStore/tables/json_files')`

- Visualize the output of this command use the `display()` command

#### Reading Multiple `JSON` Files Using a Wildcard

If you have multiple `JSON` files in a directory, that also contains other data types, and you want to read only the `JSON` files you can use the following wildcard (`*`) syntax:

`json_data_multiple_files = spark.read.json("path/to/json/files/*.json")`

This command reads all `JSON` files matching the wildcard pattern into a PySpark DataFrame.

####

### Reading `CSV` Data

When dealing with tabular data, especially in scenarios where structured data is stored in `CSV` format, Spark provides efficient methods for reading and loading such data into Spark DataFrames.

#### Reading a Single CSV File

To read a single `CSV` file into a DataFrame, you can use the following syntax:

```python
# Read a single CSV file into a PySpark DataFrame
csv_data_single_file = spark.read.csv("path/to/single/csv/file.csv", 
                                     header=True, 
                                     inferSchema=True, 
                                     sep=";")
```

This command reads a `CSV` file into a PySpark DataFrame. Here are some key parameters:

- `header=True`: Indicates that the first row contains column headers
- `inferSchema=True`: Attempts to infer the schema of the data
- `sep=";"`: Specifies the column delimiter. The default value is a comma (`,`), but in this example, it's set to a semicolon (`;`).

Other possible values for `sep` include, but are not limited to: `\t` for tab, `" "` for space, etc.

Let's look at an example to illustrate the process. Follow these steps:

- Begin by downloading [this CSV file]()

- Import the file into Databricks using the **Data** explorer. In the preview table tab, make sure to enable **First row is header** and the **Infer schema** options before creating the table.

<p align="center">
    <img src="images/CreateCSVTable.png" width="600" height="300"/>
</p>

- Run the following command in a Databricks Notebook to read in the `CSV` file: `csv_data_single_file = spark.read.csv("dbfs:/FileStore/tables/username.csv", header=True, inferSchema=True, sep=";")`

- Visualize the output of this command use the `display()` command

#### Reading `CSV` Files from a Directory

If you have multiple `CSV` files in a directory, you can read them all into a DataFrame using a similar approach:

`csv_data_directory = spark.read.csv("path/to/csv/files/directory", header=True, inferSchema=True)`

This command reads all CSV files in the specified directory into a DataFrame.

#### Handling `CSV` Files with Custom Schema

In some cases, you might want to specify a custom schema for your `CSV` data. You can achieve this by defining a schema and using it during the read operation:

``` python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define a custom schema
custom_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    # Add more fields as needed
])

# Read CSV data with the custom schema
csv_data_custom_schema = spark.read.csv("path/to/csv/file.csv", header=True, schema=custom_schema)

````
This command reads a `CSV` file into a PySpark DataFrame, applying the specified custom schema.


### Reading Data from Cloud Storage

Data stored in cloud storage systems such as Amazon S3, Google Cloud Storage (GCS), or Azure Storage can be easily accessed with PySpark. Here's a general approach:

- **Store Credentials Safely**: Avoid hardcoding credentials directly in code. Leverage secure methods such as environment variables or secure key storage services.
- **Mount Storage to Databricks**: Mount your cloud storage to Databricks, providing a secure way to access data. This involves configuring storage-specific credentials within Databricks.

We will look at this workflow in more detail in a future lesson, using Amazon S3 as an example.

### Spark SQL for Reading Data

In addition to using PySpark DataFrames, you can leverage Spark SQL to query and interact with your data using SQL commands. Let's explore how you can use Spark SQL to read `JSON` and `CSV` files.

You can use Spark SQL to query `JSON` files directly. The following example illustrates how to select all columns from a `JSON` file:

```sql
SELECT * FROM json.`path/to/json/file`
```
This SQL command allows you to query the contents of the `JSON` file using Spark SQL.

Similarly, Spark SQL enables you to query `CSV` files using SQL commands. For instance, you can use the following SQL command to select all columns from a `CSV` file:

```sql
SELECT * FROM csv.`path/to/csv/file`
```
This SQL command provides an alternative way to interact with your `CSV` data using Spark SQL.

## Data Cleaning and Transformation

In the data analysis process, ensuring that the data is clean and appropriately formatted is crucial. Raw data often comes with inconsistencies, missing values, or formats that are not conducive to analysis. Cleaning and transforming data involve preparing it for further analysis, enhancing its quality, and making it suitable for downstream processes. In this section we will look at different techniques for cleaning and transforming data using PySpark.

Let's start by considering the following example DataFrame representing information about individuals:

```python
# Create an example DataFrame
data = [("John", 30, "Male", "$500.00", "2022-01-01 08:30:00", "New York", "john.doe@example.com"),
        ("Alice", 25, "Female", "$700.50", "2022-01-02 15:45:30", "San Francisco", "alice.smith@example.com"),
        ("Bob", 28, "Male", "$650.00", "2022-01-03 12:15:00", "Los Angeles", "Unknown"),
        ("Eve", 35, "Female", "$600.75", "2022-01-04 10:00:45", "User Info Error", "eve.white@example.com")]

columns = ["Name", "Age", "Gender", "Salary", "Timestamp", "Location", "Email"]

example_df = spark.createDataFrame(data, columns)

# Show the original DataFrame
print("Original DataFrame:")
example_df.show()
```

This DataFrame will serve as our example throughout this section.

### 1. Replacing Missing Values

Handling missing values is a common use case in data cleaning. The `replace` method allows us to replace specific values with designated replacements. In the example DataFrame, suppose we want to replace all occurrences of `'User Info Error'` with `None` in the `location` column:

```python
cleaned_df = example_df.replace({'User Info Error': None}, subset=['location'])
```

In this command, `replace` initiates the replacement operation. `{'User Info Error': None}` defines the replacement rule, indicating that occurrences of `'User Info Error'` should be replaced with `None`. `subset=['location']` specifies the column where the replacement should occur.

### 2. Updating Data Points

The `replace` method is not limited to handling null values; it can also be employed to update existing values. For instance, suppose we want to update all occurrences of `'Unknown'` in the `Email` column to `'Pending'`:

```python
cleaned_df = example_df.replace({'Unknown': 'Pending'}, subset=['Email'])
```

In this example, `{'Unknown': 'Pending'}` defines the replacement rule, indicating that occurrences of `'Unknown'` should be replaced with `'Pending'` in the `Email` column.

### 3. Using `regexp_replace` for Column Transformations

Column transformations are essential for manipulating text-based columns. The `regexp_replace` function enables us to apply regular expression patterns to modify or clean column values.

The default syntax is as follows:

```python
df = df.withColumn("ColumnName", regexp_replace("ColumnName", "pattern", "replacement")
```
Let's break down the components:

- `.withColumn("ColumnName", ...)` : This method is used to add or replace a column in the DataFrame. In this case, it specifies that the operation is targeting a column named `"ColumnName"`.

- `regexp_replace("ColumnName", "pattern", "replacement")`: This is the PySpark `regexp_replace` function. It takes three arguments:
  - `"ColumnName"`: The name of the column to which the replacement will be applied
  - `"pattern"`: The regular expression pattern to search for in the values of the specified column
  - `"replacement"`: The string that will replace the matched pattern in the column values

Let's look at an example in our `cleaned_df` DataFrame:

```python
cleaned_df = cleaned_df.withColumn("Salary", regexp_replace("Salary", "\\$", "")
```
In the example above, the `regexp_replace` function removes dollar signs from the `"Salary"` column in the PySpark DataFrame. The regex pattern `\\$` represents the dollar sign, and it is replaced with an empty string `""`.

### 4. Casting Columns to Different Data Types

Casting columns to different data types is a common operation, especially when the inferred schema needs adjustment. We can cast columns to ensure they are of the correct data type. The default syntax is as it follows:

```python
df = df.withColumn("ColumnName", df["ColumnName"].cast("desired_type"))
```
The `cast` function is applied to convert the specified column to the desired data type. The argument, `"desired_type"`, represents the target data type to which you want to cast the column. There are different types of data casting, including:

- **Numeric Types**: Casting to numeric types, such as `integer`, `double`, `float`, etc., is common when dealing with numerical data

- **String Type**: You can cast a column to the `string` type if you want to treat it as text

- **Boolean Type**: Casting to `boolean` is suitable for columns representing true/false or binary data

- **Timestamp Type**: For columns containing timestamp or date data, casting to `timestamp` is useful

Let's look at an example using our `cleaned_df`:

```python
cleaned_df = cleaned_df.withColumn("Salary", cleaned_df["Salary"].cast("float"))
```

Remember, before applying the `cast` operation, it's assumed that a `regexp_replace` operation has been performed on the `"Salary"` column. The `regexp_replace` removed any non-numeric characters, leaving a string representation of a numeric value. The subsequent cast to `"float"` indicates that you want to treat the `"Salary"` column as a floating-point numeric type. This is common when working with salary data or any other numerical values that may have decimal points.

### 5. Reordering Columns

Reordering columns can enhance DataFrame structure, providing better organization and simplifying data exploration. The syntax is as follows:

```python
cleaned_df = cleaned_df.select("Column1", "Column2", ...)
```

So for example, in our `cleaned_df`:

```python
cleaned_df = cleaned_df.select("Name", "Age", "Gender", "Salary", "Location", "Email", "Timestamp")
```

In this example, columns are reordered using the `select` method. This can be beneficial for better data organization and simplifying the DataFrame structure.

### 6. Transforming Columns to Timestamp Type

Transforming columns to `timestamp` type is crucial for handling temporal data. The `to_timestamp` function is used to convert a `string` representation of a timestamp into the `timestamp` type. While the `cast` function allows casting to different data types, including timestamps, if you need to handle timestamp or date-related data, `to_timestamp` is the appropriate choice. This is because `to_timestamp` is specific to timestamp-related transformations and ensures the correct interpretation of time-related data.

The default syntax is:

```python
df = df.withColumn("NewTimestampColumn", to_timestamp("ExistingTimestampColumn"))
```

Let's break down the components of this syntax:

- `"NewTimestampColumn"`: Specifies the name of the new column that will store the converted timestamps
- `to_timestamp("ExistingTimestampColumn")`: This is the PySpark `to_timestamp` function. It takes one argument:
  - `"ExistingTimestampColumn"`: The name of the existing column containing string representations of timestamps that you want to convert

Using our example `cleaned_df`:

```python
cleaned_df = cleaned_df.withColumn("Timestamp", to_timestamp("Timestamp"))
```
In this example, the `"Timestamp"` column is transformed to a timestamp type using the `to_timestamp` function.

### 7. Creating New Columns Using Array Functions