In [1]:
import pyspark

pyspark.__version__

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import Row, SQLContext
from pyspark.sql.functions import broadcast
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F

# TODOS:

Actually to keep track with the original usecases:
- Due to the need of customization for the name of the output files -> in Glue, using `toPandas()` to generate the `.csv` files.
- Enable PyArrow for reduce the memory footprint on df to df conversion.
- Reduce the number of workers (5 to 2 DPU) and increases the size of each workers (16 to 32GB) -> Need to calculate the exact distribution.
- Is there anyway else to reduce the exchange in the cluster (of fucking course if there is no cluster to be exchanged -> auto on machine)

### Notes about PyArrow:
- Use JVM memories (on or off heap I'm not sure)
- Only support for several data types, and if there is any version mismatch -> won't work

# Prerequisites

- Run `docker compose up -scale spark-worker=3` to spin up clusters
- Install whatever needed from the thing

In [2]:
spark = (
    SparkSession.builder.appName("testing optimization")
    # .master("spark://localhost:7077")
    # .config("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", "true")
    # .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .getOrCreate()
)
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/17 10:53:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Actually do the thing

In [3]:
df1 = spark.createDataFrame(
    [
        (1, "John", "USA"),
        (2, "Mary", "UK"),
        (3, "David", "USA"),
        (4, "Emily", "Canada"),
        (5, "Robert", "USA"),
    ],
    ["id", "name", "country"],
)

# Create DataFrame 2
df2 = spark.createDataFrame(
    [
        (1, "New York", "USA"),
        (2, "London", "UK"),
        (3, "Los Angeles", "USA"),
        (4, "Toronto", "Canada"),
        (5, "Chicago", "USA"),
    ],
    ["id", "city", "country"],
)


df3 = (
    df1.join(F.broadcast(df2), df1.id == df2.id, "left")
    .drop(df1.id)
    .groupBy(df2.country)
    .agg(F.sum("id").alias("sum_id"))
    .withColumn("sum_id", F.col("sum_id").cast(IntegerType()))
)

df3.schema
pd_df = df3.toPandas()


pd_df

                                                                                

Unnamed: 0,country,sum_id
0,USA,9
1,UK,2
2,Canada,4


24/11/17 10:53:32 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [4]:
df3.explain(True)

== Parsed Logical Plan ==
'Project [country#8, cast('sum_id as int) AS sum_id#38]
+- Aggregate [country#8], [country#8, sum(id#6L) AS sum_id#35L]
   +- Project [name#1, country#2, id#6L, city#7, country#8]
      +- Join LeftOuter, (id#0L = id#6L)
         :- LogicalRDD [id#0L, name#1, country#2], false
         +- ResolvedHint (strategy=broadcast)
            +- LogicalRDD [id#6L, city#7, country#8], false

== Analyzed Logical Plan ==
country: string, sum_id: int
Project [country#8, cast(sum_id#35L as int) AS sum_id#38]
+- Aggregate [country#8], [country#8, sum(id#6L) AS sum_id#35L]
   +- Project [name#1, country#2, id#6L, city#7, country#8]
      +- Join LeftOuter, (id#0L = id#6L)
         :- LogicalRDD [id#0L, name#1, country#2], false
         +- ResolvedHint (strategy=broadcast)
            +- LogicalRDD [id#6L, city#7, country#8], false

== Optimized Logical Plan ==
Aggregate [country#8], [country#8, cast(sum(id#6L) as int) AS sum_id#38]
+- Project [id#6L, country#8]
   +- Join Le