# Glue Spark

- RDD: Datasets which storage data in distributed format
- Transformations: Datasets iterations
- Actions: Run the transformations on RDD
- Output data: Result of transformations

Spark runs lazy, so transformations are not immediately executed, they stay in a logical plan to be executed when the actions are called.

Actions examples:

- `take()`
- `collect()`
- `show()`
- `save()`
- `count()`

## Types of transformations

### Narrow

Act only on one partition of the output data (Map, Filter)

- Does not requires to share data within another workers
- Are independent of other partitions
- Are computationally efficient

### Wide

Can work on several partitions of output data (Join, GroupBy)

## Running locally

1. Docker Compose file:

```yaml
services:
  glue:
    image: amazon/aws-glue-libs:glue_libs_4.0.0_image_01
    container_name: glue-jupyter
    ports:
      - "8889:8888"
      - "4040:4040"
    volumes:
      - .:/home/glue_user/workspace/jupyter_workspace
      - ~/.aws:/home/glue_user/.aws:ro
    working_dir: /home/glue_user/workspace
    environment:
      AWS_PROFILE: default
      DISABLE_SSL: true
    command: >
      /home/glue_user/jupyter/jupyter_start.sh
```

2. Run image:

```sh
docker compose run --rm glue
```

3. Connect to jupyter server within vscode:

- Select kernel
- Existing jupyter server
- https://127.0.1:8889/lab


Import packages


In [None]:
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Create session


In [None]:
spark = SparkSession.builder.appName("example").getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Create a dataframe


In [None]:
data = [
    ("John", "Sales", 3000),
    ("Bryan", "Sales", 4200),
    ("Selena", "Sales", 4600),
    ("Alice", "Sales", 3500),
    ("Mark", "Sales", 3900),
    ("Sophia", "Sales", 4100),
    ("Daniel", "Sales", 3800),
    ("Emma", "Sales", 4400),
    ("Lucas", "Sales", 3600),
    ("Olivia", "Sales", 4700),
]
df = spark.createDataFrame(data, ["name", "department", "salary"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Visualize data


In [None]:
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+------+
|  name|department|salary|
+------+----------+------+
|  John|     Sales|  3000|
| Bryan|     Sales|  4200|
|Selena|     Sales|  4600|
| Alice|     Sales|  3500|
|  Mark|     Sales|  3900|
|Sophia|     Sales|  4100|
|Daniel|     Sales|  3800|
|  Emma|     Sales|  4400|
| Lucas|     Sales|  3600|
|Olivia|     Sales|  4700|
+------+----------+------+

Filtering dataset (narrow)


In [None]:
df_filtered = df.filter(df["salary"] > 4000)
df_filtered.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+------+
|  name|department|salary|
+------+----------+------+
| Bryan|     Sales|  4200|
|Selena|     Sales|  4600|
|Sophia|     Sales|  4100|
|  Emma|     Sales|  4400|
|Olivia|     Sales|  4700|
+------+----------+------+

Group By (Wide)


In [None]:
df_grouped = df.groupby("department").count()
df_grouped.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----+
|department|count|
+----------+-----+
|     Sales|   10|
+----------+-----+

## Pushdown Predicate

Reduce the total data read applying conditional filters to the data columns.

- Depends on file formats (Parquet and ORC)

1. Query submitted to spark
2. Planning phase using Catalyst optimizer
3. Filters identification (predicate)
4. Pushdown: try to bring the filter to reading

Using pure spark `WHERE` clause is enough, when using _AWS Glue_, `pushdown_predicate` argument is needed.


## Catalyst

Is a mechanism to optimize spark jobs utilizing functional programming principles to identify logical plan improvements (performance and execution time)

Catalyst has 19 rules (Spark 3+) to improve query execution.

Example:

- Pushdown predicate
- Rearrange filters
- Decimal operations conversion to long integer
- Regex rewriting in Java native code (startswith and contains)
- If-else simplification


In [None]:
spark = SparkSession.builder.appName("catalyst-join").getOrCreate()

employees_data = [
    (1, "John", "Sales", 3000),
    (2, "Bryan", "Sales", 4200),
    (3, "Selena", "Sales", 4600),
    (4, "Alice", "Sales", 3500),
    (5, "Mark", "Sales", 3900),
]

df_employees = spark.createDataFrame(
    employees_data, ["employee_id", "name", "department", "base_salary"]
)

sales_data = [
    (1, 120_000),
    (2, 180_000),
    (3, 220_000),
    (4, 95_000),
    (5, 160_000),
]

df_sales = spark.createDataFrame(sales_data, ["employee_id", "annual_sales"])

spark.sparkContext.setJobGroup(
    "JOIN", "Join employees with annual sales performance"
)

df_join_result = df_employees.join(df_sales, on="employee_id", how="inner")

df_join_result.show()

df_join_result.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------+----------+-----------+------------+
|employee_id|  name|department|base_salary|annual_sales|
+-----------+------+----------+-----------+------------+
|          1|  John|     Sales|       3000|      120000|
|          2| Bryan|     Sales|       4200|      180000|
|          3|Selena|     Sales|       4600|      220000|
|          4| Alice|     Sales|       3500|       95000|
|          5|  Mark|     Sales|       3900|      160000|
+-----------+------+----------+-----------+------------+

== Physical Plan ==
AdaptiveSparkPlan (11)
+- Project (10)
   +- SortMergeJoin Inner (9)
      :- Sort (4)
      :  +- Exchange (3)
      :     +- Filter (2)
      :        +- Scan ExistingRDD (1)
      +- Sort (8)
         +- Exchange (7)
            +- Filter (6)
               +- Scan ExistingRDD (5)


(1) Scan ExistingRDD
Output [4]: [employee_id#50L, name#51, department#52, base_salary#53L]
Arguments: [employee_id#50L, name#51, department#52, base_salary#53L], MapPartitionsRDD

## Joins

- [different-types-of-join-strategies-in-apache-spark](https://oindrila-chakraborty88.medium.com/different-types-of-join-strategies-in-apache-spark-5c0066999d0d)

```mermaid
flowchart LR
A[Start] --> B{Has user hint?};
B --> |Yes| C[Apply desired user join]
B --> |No| D{Dataset size allows to apply broadcast?}
D --> |Yes| E[Broadcast Join]
D --> |No| F{It fits on worker memory?}
F --> |Yes| G[Shuffle Hash Join]
F --> |No| H[Sort Merge Join - Default]
```

### Broadcast Join

When `broadcast` join type is used, it avoids shuffle that is an expensive network exchange between nodes. This improve time, performance and cost.

When a dataset small, the duplication of the dataset in in all nodes is more effective to do joins. Spark defaults to a limit of _10MB_ to do it.


In [None]:
spark = SparkSession.builder.appName("broadcast-join-example").getOrCreate()

countries_data = [
    (1, "Brazil"),
    (2, "USA"),
    (3, "Germany"),
]

df_countries = spark.createDataFrame(
    countries_data, ["country_id", "country_name"]
)

customers_data = [
    (101, "Alice", 1),
    (102, "Bob", 2),
    (103, "Carol", 1),
    (104, "Dave", 3),
]

df_customers = spark.createDataFrame(
    customers_data, ["customer_id", "customer_name", "country_id"]
)

spark.sparkContext.setJobGroup(
    "BROADCAST_JOIN", "Join customers with country lookup using broadcast"
)

df_result = df_customers.join(
    broadcast(df_countries), on="country_id", how="inner"
)

df_result.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+-------------+------------+
|country_id|customer_id|customer_name|country_name|
+----------+-----------+-------------+------------+
|         1|        101|        Alice|      Brazil|
|         2|        102|          Bob|         USA|
|         1|        103|        Carol|      Brazil|
|         3|        104|         Dave|     Germany|
+----------+-----------+-------------+------------+

Increasing the limit for broadcast join


In [None]:
spark = (
    SparkSession.builder.appName("boardcast-join-threshold")
    .config(
        "spark.sql.autoBroadcastJoinThreshold",
        str(3 * 1024 * 1024 * 1024),  # 3GB
    )
    .getOrCreate()
)

glue_context = GlueContext(spark)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

After `BroadcastJoin`, `ShuffleHashJoin` is more performant because it does not sort values like `SortMergeJoin` that is the _Spark default_


In [None]:
spark = (
    SparkSession.builder.appName("hash-shuffle-join")
    .config("spark.sql.join.preferSortMergeJoin", "false")
    .config("spark.sql.autoBroadcastJoinThreshold", -1)
    .getOrCreate()
)

glue_context = GlueContext(spark)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

When we have a large volume of data and neither of previous joins work `SortMergeJoin` will be the most flexible. The join occurs through sorted keys in each partition


In [None]:
spark = (
    SparkSession.builder.appName("sort-merge-join")
    .config("spark.sql.join.preferSortMergeJoin", "true")
    .config("spark.sql.autoBroadcastJoinThreshold", -1)
    .getOrCreate()
)

glue_context = GlueContext(spark)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Repartition

Partition manipulation method, can be triggered manually or by `AdaptativeQueryExecution`

- Do not do shuffle
- Create partitions with different sizes


## Coalesce

Partition manipulation method, can be triggered manually or by `AdaptativeQueryExecution`

- Increase or decrease the number of partitions
- Create partitions with equal sizes using shuffle


## Shuffle


## Spills

When data does not fit on memory or storage. Some times can lead to `Out Of Memory Error`


## Persist

Stores the already done computations in partitions avoiding redoing shuffles or other operations. It should be applied when the same dataset will be used multiple times


## Configs


In [None]:
spark = SparkSession.builder.appName("optimized-spark-session")


config_params = {}

for key, value in config_params.items():
    spark = spark.config(key, value)

spark = spark.enableHiveSupport().getOrCreate()

all_configs = spark.sparkContext.getConf().getAll()

for config in all_configs:
    print(f"{config[0]} = {config[1]}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

spark.master = local
spark.eventLog.enabled = true
spark.network.crypto.keyLength = 256
spark.network.crypto.enabled = true
spark.repl.local.jars = file:///home/glue_user/livy/rsc-jars/arpack_combined_all-0.1.jar,file:///home/glue_user/livy/rsc-jars/core-1.1.2.jar,file:///home/glue_user/livy/rsc-jars/jniloader-1.1.jar,file:///home/glue_user/livy/rsc-jars/livy-api-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/livy-rsc-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/livy-thriftserver-session-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/native_ref-java-1.1.jar,file:///home/glue_user/livy/rsc-jars/native_system-java-1.1.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-armhf-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-osx-x86_64-1.1-natives.jar,file:/