# Exercise 1: ETL with Spark

## What we will cover:
- Update Group Parameters
- Extract data from SQL
- Transform the Currencies to Euros
- Load Data to Delta tables
- Time Travel with Delta Tables

<div class="alert alert-block alert-danger">
<b>Important</b> Make sure you chose PySpark for your kernel
</div>

## Spark Architecture
![image.png](./images/exercise1/spark_archi.PNG)

## Apache Spark Architecture: Driver and Executors

### Cluster Manager:
- Manages resources across the cluster, allocating Executors to Spark applications.
- Examples include Apache Mesos, Apache Hadoop YARN, and Spark's standalone cluster manager.
- Responsible for monitoring and scheduling tasks on worker nodes.

### Driver:
- Centralized control unit of a Spark application.
- Runs the main() function and creates a SparkContext.
- Manages the execution of the Spark program.
- Coordinates tasks and schedules their execution on Executors.
- Keeps track of the overall application state.
- Communicates with the cluster manager to acquire and allocate resources.

### Executors:
- Worker nodes that perform the actual data processing tasks.
- Assigned tasks by the Driver for parallel execution.
- Responsible for running the individual stages of a Spark application.
- Manage and cache data in-memory during computation for efficient processing.
- Communicate with each other and the Driver for task coordination.
- Executors are launched on worker nodes by the cluster manager.

# 1. Setup Interactive Session

### We will use predefined commands for this Notebook called magic command to setup the Spark environment as below 

In [3]:
%manage_spark

Tab(children=(ManageSessionWidget(children=(HTML(value='<br/>'), HTML(value='No sessions yet.'))), CreateSessi…

1. Run the `%manage_spark` command.
1. Click the `Add Endpoint` tab.
1. Leave the settings as-is and click on `Add Endpoint` on the right.

1. Click on `Create Session` tab.
1. Your endpoint should appear in `Endpoint`
1. Update the `Name` to your group name
1. Change the `Language` to `Python`
1. Clic on `Create Session` on the right side

<!-- ![image.png](./images/exercise1/init_spark2.PNG) -->

<!-- ![image.png](./images/exercise1/init_spark.PNG) -->

### The Spark session will take a few minutes to start.

We will check the status of the session in the interim: 

1. Navigate back to the Unified Analytics dashboard.
1. In the sidebar navigation menu, select `Spark Interactive Sessions`.

![image.png](./images/exercise1/menu.PNG)

3. Here, you can check the status of your session. It will take 2-3 minutes to start. When the `State` says `Idle`, the session is ready. 

![image.png](./images/exercise1/session.PNG)

4. Scroll back up to the Notebook cell of the session (%manage_spark command). Confirm under the `Manage Sessions` tab that the session should now be visible as `Idle` too. 

![image.png](./images/exercise1/session2.PNG)

In [None]:
%config_spark

<div class="alert alert-block alert-danger">
<b>Important</b> Do not restart the kernel
</div>

1. Run the `%config_spark` magic command.
2. you want a specific configuration, leave the settings as-is and clic on `Submit` at the bottom

![image.png](./images/exercise1/config.PNG)

### Set up the group parameter

In [None]:
# Set your group name
group_name = "user1"

### Import required libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import DoubleType
import os

### Set up the Spark session
Adding delta extensions to the configuration to be able to interact with the delta tables

In [None]:
# Set up the Spark session
spark = SparkSession.builder \
    .appName("DataCleaningWithSpark") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

print("Pyspark session started")

# 2. Extract Data from SQL

## Set connection parameters 
<div class="alert alert-block alert-danger">
<b>Important</b> Make sure the values are correct
</div>

In [None]:
# Define the JDBC connection properties
mysql_properties = {
    "user": "root",
    "password": "Hpepoc@123",
    "driver": "com.mysql.cj.jdbc.Driver",
}

def get_jdbc_url_for_country(country):
    # Define JDBC URLs for different countries
    jdbc_urls = {
        "germany": "jdbc:mysql://mysql1.imported-db.svc.cluster.local:3306/end2end",
        "czech": "jdbc:mysql://mysql2.imported-db.svc.cluster.local:3306/end2end",
        "swiss": "jdbc:mysql://mariadb.imported-db.svc.cluster.local:3306/end2end",
    }

    # Return the JDBC URL for the specified country
    return jdbc_urls.get(country.lower(), None)

## Define a function to extract the data from SQL in a DataFrame

In [None]:
def load_data(spark, country):
    # Define the JDBC connection properties
    jdbc_url = get_jdbc_url_for_country(country)

    if jdbc_url is None:
        raise ValueError(f"JDBC URL not defined for country: {country}")

    # Specify the table name
    table_name = f"end2end.{country}"

    # Read data from the SQL database
    df = spark.read \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", table_name) \
        .option("user", mysql_properties["user"]) \
        .option("password", mysql_properties["password"]) \
        .option("driver", mysql_properties["driver"]) \
        .load()

    return df

## Define a function to convert the currency to Euro

In [None]:
def clean_data(df, spark, country):
    # Define a UDF to convert currencies to EUR
    convert_udf = udf(lambda currency, amount: amount / CZK_TO_EUR_RATE if currency == "CZK" else amount / CHF_TO_EUR_RATE if currency == "CHF" else amount, DoubleType())

    # Apply the UDFs to the DataFrame
    corrected_df = df.withColumn("totalsales", convert_udf(col("currency"), col("totalsales"))) \
                     .withColumn("currency", lit("EUR"))

    # Show the results
    corrected_df.show()

    return corrected_df

## Define a function to save the data to a Delta Tables 
<div class="alert alert-block alert-danger">
<b>Important</b> Make sure the path is correct
</div>

In [None]:
def write_data(df, country):
    delta_path = f"file:///mounts/shared-volume/shared/end2end-delta/{group_name}/{country}"

    # Check if the directory exists, and create it if it doesn't
    if not os.path.exists(delta_path):
        os.makedirs(delta_path)
        
    df.write.format("delta").mode("overwrite").save(delta_path)

## Define the country names & currency rates 
<div class="alert alert-block alert-danger">
<b>Important</b> Make sure the values are correct
</div>

In [None]:
# Constants
COUNTRY_LIST = ["czech", "germany", "swiss"]
CZK_TO_EUR_RATE = 25
CHF_TO_EUR_RATE = 1

## Run the ETL for each countries

In [None]:
for country in COUNTRY_LIST:
    # Load data from the DBs
    df = load_data(spark, country)
    df.show()

    # Clean the data
    cleaned_df = clean_data(df, spark, country)
    
    # Write the cleaned data back to the Delta Table
    write_data(cleaned_df, country)

## Check if the Delta Tables were created

In [None]:
for country in COUNTRY_LIST:
    # List files in a directory
    files = os.listdir(f"/mounts/shared-volume/shared/end2end-delta/{group_name}/{country}")
    print("Table:", country)
    
    for file in files:
        if file.endswith(".parquet"):
            full_path = os.path.join(f"/mounts/shared-volume/shared/end2end-delta/{group_name}/{country}", file)
            print("Saved in:", full_path)

    print()

## Time Travel

1. Create Initial Delta Table by loading the `czech` table:

In [None]:
from delta.tables import DeltaTable
# Set the parameters
country = "czech"
delta_path = f"file:///mounts/shared-volume/shared/end2end-delta/{group_name}/{country}"

# Read the Delta table using the load method
read_df = spark.read.format("delta").load(delta_path)

# Show the contents of the DataFrame
read_df.show()

2. Overwrite the Delta Table with Selected Columns:

In [None]:
# Select only a subset of columns for the initial Delta Table
selected_columns = ["type", "unitprice", "qty", "totalsales"]
select_delta_path = f"file:///mounts/shared-volume/shared/end2end-delta/{group_name}/{country}"

# Create the initial Delta Table with selected columns
df_select = df.select("type", "unitprice", "unit", "qty")
df_select.write.format("delta").mode("overwrite").save(select_delta_path)

3. Read the Delta Table with Selected Columns: the old columns are marked as `NULL`

In [None]:
# Read the Delta table using the load method
read_df_select = spark.read.format("delta").load(delta_path)

# Show the contents of the DataFrame
read_df_select.show()

# Display the schema of the version 0 DataFrame
read_df_select.printSchema()

4. Display the versions available for this table:

In [None]:
# Create a DeltaTable object
delta_table = DeltaTable.forPath(spark, delta_path)

# Get the history of the Delta table
history_df = delta_table.history()

# List all versions with timestamp
versions_with_timestamp = history_df.select("version", "timestamp").distinct().collect()

# Display the list of versions with timestamp
print("List of Delta Table Versions with Timestamp:")
for version_info in versions_with_timestamp:
    version = version_info["version"]
    timestamp = version_info["timestamp"]
    print(f"Version: {version}, Timestamp: {timestamp}")

5. Compare table content

In [None]:
# Read a specific version (e.g., version 0) of the Delta table
read_df_version_0 = spark.read.format("delta").option("versionAsOf", "0").load(delta_path)
read_df_version_0.show()

# Read a specific version (e.g., version 1) of the Delta table
read_df_version_1 = spark.read.format("delta").option("versionAsOf", "2").load(delta_path)
read_df_version_1.show()

6. Rollback to Initial Delta Table

In [None]:
# Read a specific version (e.g., version 0) of the Delta table
read_df_version_0 = spark.read.format("delta").option("versionAsOf", "0").load(delta_path)

# If you want to perform further actions or overwrite the current Delta table:
# Overwrite the current Delta table with version 0 data
read_df_version_0.write.format("delta").mode("overwrite").save(delta_path)

7. Read the Delta Table with Selected Columns: the old columns are back in the current version

In [None]:
# Read the Delta table using the load method
read_df_select = spark.read.format("delta").load(delta_path)

# Show the contents of the DataFrame
read_df_select.show()

# Display the schema of the version 0 DataFrame
read_df_select.printSchema()

8. Display the versions available for this table:

In [None]:
# Create a DeltaTable object
delta_table = DeltaTable.forPath(spark, delta_path)

# Get the history of the Delta table
history_df = delta_table.history()

# List all versions with timestamp
versions_with_timestamp = history_df.select("version", "timestamp").distinct().collect()

# Display the list of versions with timestamp
print("List of Delta Table Versions with Timestamp:")
for version_info in versions_with_timestamp:
    version = version_info["version"]
    timestamp = version_info["timestamp"]
    print(f"Version: {version}, Timestamp: {timestamp}")

## END