In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [3]:
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [4]:
spark = (
    SparkSession
    .builder
    .config("spark.driver.memory", "10g")
    .config("spark.sql.files.maxPartitionBytes", "268435456")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .master("local[*]")
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [7]:
customers_file = "part-00000-7951754c-3166-4cb7-8227-a5760c654c14-c000.snappy.parquet"

In [8]:
df_customers = spark.read.parquet(customers_file)

In [9]:
df_customers.show(5, False)

+----------+-------------+---+------+----------+-----+-----------+
|cust_id   |name         |age|gender|birthday  |zip  |city       |
+----------+-------------+---+------+----------+-----+-----------+
|C007YEYTX9|Aaron Abbott |34 |Female|7/13/1991 |97823|boston     |
|C00B971T1J|Aaron Austin |37 |Female|12/16/2004|30332|chicago    |
|C00WRSJF1Q|Aaron Barnes |29 |Female|3/11/1977 |23451|denver     |
|C01AZWQMF3|Aaron Barrett|31 |Male  |7/9/1998  |46613|los_angeles|
|C01BKUFRHA|Aaron Becker |54 |Male  |11/24/1979|40284|san_diego  |
+----------+-------------+---+------+----------+-----+-----------+
only showing top 5 rows



In [10]:
# creating df_base dataframe without caching. also creating two df's by using df1 and df2

In [11]:
df_base = (
    df_customers
    .filter(F.col("city") == "boston")
    .withColumn(
        "customer_group", 
        F.when(
            F.col("age").between(20, 30), 
            F.lit("young") 
        )
        .when(
            F.col("age").between(31, 50), 
            F.lit("mid") 
        )
        .when(
            F.col("age") > 51, 
            F.lit("old") 
        )
        .otherwise(F.lit("kid"))
     )
    .select("cust_id", "name", "age", "gender", "birthday", "zip", "city", "customer_group")
)

In [12]:
df_base.show(5, False)

+----------+--------------+---+------+---------+-----+------+--------------+
|cust_id   |name          |age|gender|birthday |zip  |city  |customer_group|
+----------+--------------+---+------+---------+-----+------+--------------+
|C007YEYTX9|Aaron Abbott  |34 |Female|7/13/1991|97823|boston|mid           |
|C08XAQUY73|Aaron Lambert |54 |Female|11/5/1966|75218|boston|old           |
|C094P1VXF9|Aaron Lindsey |24 |Male  |9/21/1990|29399|boston|young         |
|C097SHE1EF|Aaron Lopez   |22 |Female|4/18/2001|82129|boston|young         |
|C0DTC6436T|Aaron Schwartz|52 |Female|7/9/1962 |57192|boston|old           |
+----------+--------------+---+------+---------+-----+------+--------------+
only showing top 5 rows



In [13]:
df1 = (
    df_base
    .withColumn("test_column_1", F.lit("test_column_1"))
    .withColumn("birth_year", F.split("birthday", "/").getItem(2))
)

In [14]:
df1.explain(True)

== Parsed Logical Plan ==
'Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44, test_column_1#94, split('birthday, /, -1)[2] AS birth_year#104]
+- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44, test_column_1 AS test_column_1#94]
   +- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44]
      +- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, CASE WHEN ((cast(age#2 as int) >= 20) AND (cast(age#2 as int) <= 30)) THEN young WHEN ((cast(age#2 as int) >= 31) AND (cast(age#2 as int) <= 50)) THEN mid WHEN (cast(age#2 as int) > 51) THEN old ELSE kid END AS customer_group#44]
         +- Filter (city#6 = boston)
            +- Relation [cust_id#0,name#1,age#2,gender#3,birthday#4,zip#5,city#6] parquet

== Analyzed Logical Plan ==
cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string, customer_group: stri

In [None]:
== Physical Plan ==
*(1) Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, CASE WHEN ((cast(age#2 as int) >= 20) AND (cast(age#2 as int) <= 30)) THEN young WHEN ((cast(age#2 as int) >= 31) AND (cast(age#2 as int) <= 50)) THEN mid WHEN (cast(age#2 as int) > 51) THEN old ELSE kid END AS customer_group#44, test_column_1 AS test_column_1#94, split(birthday#4, /, -1)[2] AS birth_year#104]
+- *(1) Filter (isnotnull(city#6) AND (city#6 = boston))
   +- *(1) ColumnarToRow
      +- FileScan parquet [cust_id#0,name#1,age#2,gender#3,birthday#4,zip#5,city#6] Batched: true, DataFilters: [isnotnull(city#6), (city#6 = boston)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/part-00000-7951754c-3166-4cb7-8227-a5760c654c14-c000..., PartitionFilters: [], PushedFilters: [IsNotNull(city), EqualTo(city,boston)], ReadSchema: struct<cust_id:string,name:string,age:string,gender:string,birthday:string,zip:string,city:string>


In [15]:
df2 = (
    df_base
    .withColumn("test_column_2", F.lit("test_column_2"))
    .withColumn("birth_month", F.split("birthday", "/").getItem(1))
)

df2.explain(True)

== Parsed Logical Plan ==
'Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44, test_column_2#115, split('birthday, /, -1)[1] AS birth_month#125]
+- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44, test_column_2 AS test_column_2#115]
   +- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44]
      +- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, CASE WHEN ((cast(age#2 as int) >= 20) AND (cast(age#2 as int) <= 30)) THEN young WHEN ((cast(age#2 as int) >= 31) AND (cast(age#2 as int) <= 50)) THEN mid WHEN (cast(age#2 as int) > 51) THEN old ELSE kid END AS customer_group#44]
         +- Filter (city#6 = boston)
            +- Relation [cust_id#0,name#1,age#2,gender#3,birthday#4,zip#5,city#6] parquet

== Analyzed Logical Plan ==
cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string, customer_group: s

In [None]:
== Physical Plan ==
*(1) Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, CASE WHEN ((cast(age#2 as int) >= 20) AND (cast(age#2 as int) <= 30)) THEN young WHEN ((cast(age#2 as int) >= 31) AND (cast(age#2 as int) <= 50)) THEN mid WHEN (cast(age#2 as int) > 51) THEN old ELSE kid END AS customer_group#44, test_column_2 AS test_column_2#115, split(birthday#4, /, -1)[1] AS birth_month#125]
+- *(1) Filter (isnotnull(city#6) AND (city#6 = boston))
   +- *(1) ColumnarToRow
      +- FileScan parquet [cust_id#0,name#1,age#2,gender#3,birthday#4,zip#5,city#6] Batched: true, DataFilters: [isnotnull(city#6), (city#6 = boston)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/part-00000-7951754c-3166-4cb7-8227-a5760c654c14-c000..., PartitionFilters: [], PushedFilters: [IsNotNull(city), EqualTo(city,boston)], ReadSchema: struct<cust_id:string,name:string,age:string,gender:string,birthday:string,zip:string,city:string>


In [None]:
#lets cache df_base and create two more df's and check the physical plan

In [16]:
df_base.cache() 

DataFrame[cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string, customer_group: string]

In [17]:
df3 = (
    df_base
    .withColumn("test_column_1", F.lit("test_column_1"))
    .withColumn("birth_year", F.split("birthday", "/").getItem(2))
)

In [19]:
df3.explain(True)

== Parsed Logical Plan ==
'Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44, test_column_1#176, split('birthday, /, -1)[2] AS birth_year#186]
+- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44, test_column_1 AS test_column_1#176]
   +- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44]
      +- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, CASE WHEN ((cast(age#2 as int) >= 20) AND (cast(age#2 as int) <= 30)) THEN young WHEN ((cast(age#2 as int) >= 31) AND (cast(age#2 as int) <= 50)) THEN mid WHEN (cast(age#2 as int) > 51) THEN old ELSE kid END AS customer_group#44]
         +- Filter (city#6 = boston)
            +- Relation [cust_id#0,name#1,age#2,gender#3,birthday#4,zip#5,city#6] parquet

== Analyzed Logical Plan ==
cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string, customer_group: st

In [None]:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44, test_column_1 AS test_column_1#176, split(birthday#4, /, -1)[2] AS birth_year#186]
   +- InMemoryTableScan [age#2, birthday#4, city#6, cust_id#0, customer_group#44, gender#3, name#1, zip#5]
         +- InMemoryRelation [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, CASE WHEN ((cast(age#2 as int) >= 20) AND (cast(age#2 as int) <= 30)) THEN young WHEN ((cast(age#2 as int) >= 31) AND (cast(age#2 as int) <= 50)) THEN mid WHEN (cast(age#2 as int) > 51) THEN old ELSE kid END AS customer_group#44]
                  +- *(1) Filter (isnotnull(city#6) AND (city#6 = boston))
                     +- *(1) ColumnarToRow
                        +- FileScan parquet [cust_id#0,name#1,age#2,gender#3,birthday#4,zip#5,city#6] Batched: true, DataFilters: [isnotnull(city#6), (city#6 = boston)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/part-00000-7951754c-3166-4cb7-8227-a5760c654c14-c000..., PartitionFilters: [], PushedFilters: [IsNotNull(city), EqualTo(city,boston)], ReadSchema: struct<cust_id:string,name:string,age:string,gender:string,birthday:string,zip:string,city:string>

In [18]:
df4 = (
    df_base
    .withColumn("test_column_2", F.lit("test_column_2"))
    .withColumn("birth_month", F.split("birthday", "/").getItem(1))
)

In [20]:
df4.explain(True)

== Parsed Logical Plan ==
'Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44, test_column_2#197, split('birthday, /, -1)[1] AS birth_month#207]
+- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44, test_column_2 AS test_column_2#197]
   +- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44]
      +- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, CASE WHEN ((cast(age#2 as int) >= 20) AND (cast(age#2 as int) <= 30)) THEN young WHEN ((cast(age#2 as int) >= 31) AND (cast(age#2 as int) <= 50)) THEN mid WHEN (cast(age#2 as int) > 51) THEN old ELSE kid END AS customer_group#44]
         +- Filter (city#6 = boston)
            +- Relation [cust_id#0,name#1,age#2,gender#3,birthday#4,zip#5,city#6] parquet

== Analyzed Logical Plan ==
cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string, customer_group: s

In [None]:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44, test_column_2 AS test_column_2#197, split(birthday#4, /, -1)[1] AS birth_month#207]
   +- InMemoryTableScan [age#2, birthday#4, city#6, cust_id#0, customer_group#44, gender#3, name#1, zip#5]
         +- InMemoryRelation [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, customer_group#44], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [cust_id#0, name#1, age#2, gender#3, birthday#4, zip#5, city#6, CASE WHEN ((cast(age#2 as int) >= 20) AND (cast(age#2 as int) <= 30)) THEN young WHEN ((cast(age#2 as int) >= 31) AND (cast(age#2 as int) <= 50)) THEN mid WHEN (cast(age#2 as int) > 51) THEN old ELSE kid END AS customer_group#44]
                  +- *(1) Filter (isnotnull(city#6) AND (city#6 = boston))
                     +- *(1) ColumnarToRow
                        +- FileScan parquet [cust_id#0,name#1,age#2,gender#3,birthday#4,zip#5,city#6] Batched: true, DataFilters: [isnotnull(city#6), (city#6 = boston)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/part-00000-7951754c-3166-4cb7-8227-a5760c654c14-c000..., PartitionFilters: [], PushedFilters: [IsNotNull(city), EqualTo(city,boston)], ReadSchema: struct<cust_id:string,name:string,age:string,gender:string,birthday:string,zip:string,city:string>


In [None]:
StorageLevel Types:
(As of Spark 3.4)

DISK_ONLY: CPU efficient, memory efficient, slow to access, data is serialized when stored on disk

DISK_ONLY_2: disk only, replicated 2x

DISK_ONLY_3: disk only, replicated 3x

MEMORY_AND_DISK: spills to disk if there's no space in memory

MEMORY_AND_DISK_2: memory and disk, replicated 2x

MEMORY_AND_DISK_DESER(default): same as MEMORY_AND_DISK, deserialized in both for fast access

MEMORY_ONLY: CPU efficient, memory intensive

MEMORY_ONLY_2: memory only, replicated 2x - for resilience, if one executor fails

In [None]:
When to use what?
Storage Level    Space used  CPU time  In memory  On-disk  Serialized
---------------------------------------------------------------------
MEMORY_ONLY          High        Low       Y          N        N         
MEMORY_ONLY_SER      Low         High      Y          N        Y     
MEMORY_AND_DISK      High        Medium    Some       Some     Some  
MEMORY_AND_DISK_SER  Low         High      Some       Some     Y     
DISK_ONLY            Low         High      N          Y        Y     

In [21]:
df_base.unpersist()
df_base.persist(StorageLevel.MEMORY_ONLY)

df2 = (
    df_base
    .withColumn("test_column_1", F.lit("test_column_1"))
    .withColumn("birth_year", F.split("birthday", "/").getItem(2))
)

df1.show(5, False)

DataFrame[cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string, customer_group: string]

DataFrame[cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string, customer_group: string]

+----------+--------------+---+------+---------+-----+------+--------------+-------------+----------+
|cust_id   |name          |age|gender|birthday |zip  |city  |customer_group|test_column_1|birth_year|
+----------+--------------+---+------+---------+-----+------+--------------+-------------+----------+
|C007YEYTX9|Aaron Abbott  |34 |Female|7/13/1991|97823|boston|mid           |test_column_1|1991      |
|C08XAQUY73|Aaron Lambert |54 |Female|11/5/1966|75218|boston|old           |test_column_1|1966      |
|C094P1VXF9|Aaron Lindsey |24 |Male  |9/21/1990|29399|boston|young         |test_column_1|1990      |
|C097SHE1EF|Aaron Lopez   |22 |Female|4/18/2001|82129|boston|young         |test_column_1|2001      |
|C0DTC6436T|Aaron Schwartz|52 |Female|7/9/1962 |57192|boston|old           |test_column_1|1962      |
+----------+--------------+---+------+---------+-----+------+--------------+-------------+----------+
only showing top 5 rows



In [22]:
spark.stop()