In [1]:
import random
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import col
from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as sf
from pyspark.sql.window import Window
import time
import matplotlib.pyplot as plt

In [2]:
# master url
master_url = "spark://spark-master:7077"  # Or use the actual hostname or IP

spark = SparkSession.builder \
    .appName("MyLocalSparkJob") \
    .master(master_url) \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/24 17:54:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
def create_dataframe(num_rows, num_cols):

    #generate column names
    column_names = [f"col{i}" for i in range(num_cols)]

    #generate data
    data = [[random.random() for _ in range(num_cols)] for _ in range(num_rows)]

    #define schema
    schema = StructType([StructField(name, StringType(),True) for name in column_names])

    return spark.createDataFrame(data, schema)

In [4]:
df = create_dataframe(20,5)
df2 = create_dataframe(20,5)

In [5]:
df.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[col0#0,col1#1,col2#2,col3#3,col4#4]




In [6]:
from pyspark.sql.functions import col

# List of columns to be transformed
columns = ['col1', 'col2', 'col3']

# Loop to add 1 to each column and create new columns
for i, column in enumerate(columns, start=1):
    if column == 'col1':
        # Perform the special action: add 5 to col1
        df = df.withColumn(f"new_col{i}", col(column) + 5)
        print(f"Physical plan after adding 5 to {column}:")
    else:
        # For other columns, just add i
        df = df.withColumn(f"new_col{i}", col(column) + i)
        print(f"Physical plan after adding new_col{i}:")
    
    # Force execution by calling an action, e.g., `count()`
    df.explain()
    df.count()  # Forces execution of the physical plan

print("Final plan")
df.explain()


Physical plan after adding 5 to col1:
== Physical Plan ==
*(1) Project [col0#0, col1#1, col2#2, col3#3, col4#4, (cast(col1#1 as double) + 5.0) AS new_col1#20]
+- *(1) Scan ExistingRDD[col0#0,col1#1,col2#2,col3#3,col4#4]




                                                                                

Physical plan after adding new_col2:
== Physical Plan ==
*(1) Project [col0#0, col1#1, col2#2, col3#3, col4#4, (cast(col1#1 as double) + 5.0) AS new_col1#20, (cast(col2#2 as double) + 2.0) AS new_col2#38]
+- *(1) Scan ExistingRDD[col0#0,col1#1,col2#2,col3#3,col4#4]


Physical plan after adding new_col3:
== Physical Plan ==
*(1) Project [col0#0, col1#1, col2#2, col3#3, col4#4, (cast(col1#1 as double) + 5.0) AS new_col1#20, (cast(col2#2 as double) + 2.0) AS new_col2#38, (cast(col3#3 as double) + 3.0) AS new_col3#58]
+- *(1) Scan ExistingRDD[col0#0,col1#1,col2#2,col3#3,col4#4]


Final plan
== Physical Plan ==
*(1) Project [col0#0, col1#1, col2#2, col3#3, col4#4, (cast(col1#1 as double) + 5.0) AS new_col1#20, (cast(col2#2 as double) + 2.0) AS new_col2#38, (cast(col3#3 as double) + 3.0) AS new_col3#58]
+- *(1) Scan ExistingRDD[col0#0,col1#1,col2#2,col3#3,col4#4]




24/10/24 19:19:54 WARN HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 1828636 ms exceeds timeout 120000 ms
24/10/24 19:19:54 WARN HeartbeatReceiver: Removing executor 0 with no recent heartbeats: 1832329 ms exceeds timeout 120000 ms
24/10/24 19:19:54 WARN HeartbeatReceiver: Removing executor 3 with no recent heartbeats: 1828271 ms exceeds timeout 120000 ms
24/10/24 19:19:54 ERROR TaskSchedulerImpl: Lost executor 1 on 172.18.0.7: Executor heartbeat timed out after 1828636 ms
24/10/24 19:19:54 ERROR TaskSchedulerImpl: Lost executor 0 on 172.18.0.6: Executor heartbeat timed out after 1832329 ms
24/10/24 19:19:54 ERROR TaskSchedulerImpl: Lost executor 3 on 172.18.0.3: Executor heartbeat timed out after 1828271 ms
24/10/24 19:42:02 WARN HeartbeatReceiver: Removing executor 5 with no recent heartbeats: 316774 ms exceeds timeout 120000 ms
24/10/24 19:42:02 WARN HeartbeatReceiver: Removing executor 4 with no recent heartbeats: 316430 ms exceeds timeout 120000 ms
24/10/24 19:

In [6]:
from pyspark.sql.functions import col

# List of columns to be transformed
columns = ['col1', 'col2', 'col3']

# Loop to add 1 to each column and create new columns
for i, column in enumerate(columns, start=1):
    df = df.withColumn(f"new_col{i}", col(column) + i)

# Show the physical plan
df.explain()


== Physical Plan ==
*(1) Project [col0#0, col1#1, col2#2, col3#3, col4#4, (cast(col1#1 as double) + 1.0) AS new_col1#20, (cast(col2#2 as double) + 2.0) AS new_col2#27, (cast(col3#3 as double) + 3.0) AS new_col3#35]
+- *(1) Scan ExistingRDD[col0#0,col1#1,col2#2,col3#3,col4#4]




In [15]:
from pyspark.sql.functions import col

# List of columns to be transformed
columns = ['col1', 'col2', 'col3']

# Loop to add 1 to each column and create new columns
for i, column in enumerate(columns, start=1):
    df = df.withColumn(f"new_col{i}", col(column) + i)
    print(f"Physical plan after adding new_col{i}:")
    df.explain()  

print("Final plan")
df.explain()

Physical plan after adding new_col1:
== Physical Plan ==
*(1) Project [col0#209, col1#210, col2#211, col3#212, col4#213, (cast(col1#210 as double) + 1.0) AS new_col1#313, (cast(col2#211 as double) + 2.0) AS new_col2#295, (cast(col3#212 as double) + 3.0) AS new_col3#304]
+- *(1) Scan ExistingRDD[col0#209,col1#210,col2#211,col3#212,col4#213]


Physical plan after adding new_col2:
== Physical Plan ==
*(1) Project [col0#209, col1#210, col2#211, col3#212, col4#213, (cast(col1#210 as double) + 1.0) AS new_col1#313, (cast(col2#211 as double) + 2.0) AS new_col2#322, (cast(col3#212 as double) + 3.0) AS new_col3#304]
+- *(1) Scan ExistingRDD[col0#209,col1#210,col2#211,col3#212,col4#213]


Physical plan after adding new_col3:
== Physical Plan ==
*(1) Project [col0#209, col1#210, col2#211, col3#212, col4#213, (cast(col1#210 as double) + 1.0) AS new_col1#313, (cast(col2#211 as double) + 2.0) AS new_col2#322, (cast(col3#212 as double) + 3.0) AS new_col3#331]
+- *(1) Scan ExistingRDD[col0#209,col1#21

In [13]:
df.explain()

== Physical Plan ==
*(1) Project [col0#209, col1#210, col2#211, col3#212, col4#213, (cast(col1#210 as double) + 1.0) AS new_col1#239, (cast(col2#211 as double) + 2.0) AS new_col2#257, (cast(col3#212 as double) + 3.0) AS new_col3#277]
+- *(1) Scan ExistingRDD[col0#209,col1#210,col2#211,col3#212,col4#213]




In [6]:
def format_text(raw_text):
    # Decode the byte string to a normal string
    decoded_text = raw_text.decode('utf-8')

    # Print each line in a more readable format
    formatted_text = decoded_text.replace(' | ', '\n | ')

    # Print the result
    print(formatted_text)


In [7]:
format_text(raw_text)

(8) MapPartitionsRDD[13] at javaToPython at NativeMethodAccessorImpl.java:0 []

 |  MapPartitionsRDD[12] at javaToPython at NativeMethodAccessorImpl.java:0 []

 |  SQLExecutionRDD[11] at javaToPython at NativeMethodAccessorImpl.java:0 []

 |  MapPartitionsRDD[10] at javaToPython at NativeMethodAccessorImpl.java:0 []

 |  MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []

 |  MapPartitionsRDD[3] at map at SerDeUtil.scala:69 []

 |  MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:117 []

 |  PythonRDD[1] at RDD at PythonRDD.scala:53 []

 |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289 []


In [8]:
df.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[col0#0,col1#1,col2#2,col3#3,col4#4]




In [11]:
df = df.select(
  "col1",
   "col2") \
   .filter(col("col1")>0.2)\
    .filter(col("col2")>0.2)

df.explain()

== Physical Plan ==
*(1) Project [col1#1, col2#2]
+- *(1) Filter ((isnotnull(col1#1) AND isnotnull(col2#2)) AND ((cast(col1#1 as double) > 0.2) AND (cast(col2#2 as double) > 0.2)))
   +- *(1) Scan ExistingRDD[col0#0,col1#1,col2#2,col3#3,col4#4]




In [12]:
# raw_text = df.rdd.toDebugString()
# format_text(raw_text)

In [13]:
df = df \
       .withColumn("col6", col("col1") + col("col2")) \
       .filter((col("col1") > 0.2) & (col("col2") > 0.3))

raw_text = df.rdd.toDebugString()
format_text(raw_text)

(8) MapPartitionsRDD[23] at javaToPython at NativeMethodAccessorImpl.java:0 []

 |  MapPartitionsRDD[22] at javaToPython at NativeMethodAccessorImpl.java:0 []

 |  SQLExecutionRDD[21] at javaToPython at NativeMethodAccessorImpl.java:0 []

 |  MapPartitionsRDD[20] at javaToPython at NativeMethodAccessorImpl.java:0 []

 |  MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []

 |  MapPartitionsRDD[3] at map at SerDeUtil.scala:69 []

 |  MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:117 []

 |  PythonRDD[1] at RDD at PythonRDD.scala:53 []

 |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289 []


In [18]:
for col in df.columns:
    df = df.withColumn(col, df[col].cast(FloatType()))

In [19]:
df.explain()

== Physical Plan ==
*(1) Project [cast(col1#1 as float) AS col1#85, cast(col2#2 as float) AS col2#89, cast((cast(col1#1 as double) + cast(col2#2 as double)) as float) AS col6#93]
+- *(1) Filter ((isnotnull(col1#1) AND isnotnull(col2#2)) AND ((cast(col1#1 as double) > 0.2) AND ((cast(col2#2 as double) > 0.2) AND (cast(col2#2 as double) > 0.3))))
   +- *(1) Scan ExistingRDD[col0#0,col1#1,col2#2,col3#3,col4#4]




In [10]:
# Inefficient method chaining
df = df.withColumn("new_col1", col("col1") + 1) \
       .withColumn("new_col2", col("col2") + 2) \
       .withColumn("new_col3", col("col1") + 3) \
       .withColumn("new_col4", col("col1") + 4)

# Show the physical plan
df.explain(True)


== Parsed Logical Plan ==
'Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, new_col2#27, new_col3#35, ('col1 + 4) AS new_col4#44]
+- Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, new_col2#27, (cast(col1#1 as double) + cast(3 as double)) AS new_col3#35]
   +- Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, (cast(col2#2 as double) + cast(2 as double)) AS new_col2#27]
      +- Project [col0#0, col1#1, col2#2, col3#3, col4#4, (cast(col1#1 as double) + cast(1 as double)) AS new_col1#20]
         +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Analyzed Logical Plan ==
col0: string, col1: string, col2: string, col3: string, col4: string, new_col1: double, new_col2: double, new_col3: double, new_col4: double
Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, new_col2#27, new_col3#35, (cast(col1#1 as double) + cast(4 as double)) AS new_col4#44]
+- Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, new_col2#27, (c

In [11]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Sample UDF to simulate a complex transformation
def add_one_udf(col_val):
    return col_val + 1

# Register the UDF
add_one = udf(add_one_udf, DoubleType())

# Inefficient chaining using UDFs
df = df.withColumn("new_col1", add_one(df["col1"])) \
       .withColumn("new_col2", add_one(df["col2"])) \
       .withColumn("new_col3", add_one(df["col1"])) \
       .withColumn("new_col4", add_one(df["col1"]))

# Show the physical plan
df.explain(True)


== Parsed Logical Plan ==
Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#55, new_col2#66, new_col3#77, add_one_udf(col1#1)#87 AS new_col4#88]
+- Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#55, new_col2#66, add_one_udf(col1#1)#76 AS new_col3#77, new_col4#44]
   +- Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#55, add_one_udf(col2#2)#65 AS new_col2#66, new_col3#35, new_col4#44]
      +- Project [col0#0, col1#1, col2#2, col3#3, col4#4, add_one_udf(col1#1)#54 AS new_col1#55, new_col2#27, new_col3#35, new_col4#44]
         +- Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, new_col2#27, new_col3#35, (cast(col1#1 as double) + cast(4 as double)) AS new_col4#44]
            +- Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, new_col2#27, (cast(col1#1 as double) + cast(3 as double)) AS new_col3#35]
               +- Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, (cast(col2#2 as double) + cast(2 as double)) AS new_col2

In [5]:
from pyspark.sql.functions import col

# List of columns to be transformed
columns = ['col1', 'col2', 'col3']

# Loop to add 1 to each column and create new columns
for i, column in enumerate(columns, start=1):
    df = df.withColumn(f"new_col{i}", col(column) + i)

# Show the physical plan
df.explain(True)


== Parsed Logical Plan ==
'Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, new_col2#27, ('col3 + 3) AS new_col3#35]
+- Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, (cast(col2#2 as double) + cast(2 as double)) AS new_col2#27]
   +- Project [col0#0, col1#1, col2#2, col3#3, col4#4, (cast(col1#1 as double) + cast(1 as double)) AS new_col1#20]
      +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Analyzed Logical Plan ==
col0: string, col1: string, col2: string, col3: string, col4: string, new_col1: double, new_col2: double, new_col3: double
Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, new_col2#27, (cast(col3#3 as double) + cast(3 as double)) AS new_col3#35]
+- Project [col0#0, col1#1, col2#2, col3#3, col4#4, new_col1#20, (cast(col2#2 as double) + cast(2 as double)) AS new_col2#27]
   +- Project [col0#0, col1#1, col2#2, col3#3, col4#4, (cast(col1#1 as double) + cast(1 as double)) AS new_col1#20]
      +- LogicalRDD [col0#0, c

In [8]:

df2 = df.select("col1", "col2") \
       .filter((col("col1") > 0.2) & (col("col2") > 0.2))

df2.explain(True)


== Parsed Logical Plan ==
'Filter (('col1 > 0.2) AND ('col2 > 0.2))
+- Project [col1#1, col2#2]
   +- Filter (cast(col2#2 as double) > 0.2)
      +- Filter (cast(col1#1 as double) > 0.2)
         +- Project [col1#1, col2#2]
            +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Analyzed Logical Plan ==
col1: string, col2: string
Filter ((cast(col1#1 as double) > 0.2) AND (cast(col2#2 as double) > 0.2))
+- Project [col1#1, col2#2]
   +- Filter (cast(col2#2 as double) > 0.2)
      +- Filter (cast(col1#1 as double) > 0.2)
         +- Project [col1#1, col2#2]
            +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Optimized Logical Plan ==
Project [col1#1, col2#2]
+- Filter ((isnotnull(col1#1) AND isnotnull(col2#2)) AND ((cast(col1#1 as double) > 0.2) AND (cast(col2#2 as double) > 0.2)))
   +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Physical Plan ==
*(1) Project [col1#1, col2#2]
+- *(1) Filter ((isnotnull(col1#1) AND isnot

In [10]:

df3 = df.select("col1", "col2") \
       .withColumn("col6", col("col1") + col("col2")) \
       .filter((col("col1") > 0.2) & (col("col2") > 0.3)) \


df3.explain(True)



== Parsed Logical Plan ==
'Filter (('col1 > 0.2) AND ('col2 > 0.3))
+- Project [col1#1, col2#2, (cast(col1#1 as double) + cast(col2#2 as double)) AS col6#43]
   +- Project [col1#1, col2#2]
      +- Filter (cast(col2#2 as double) > 0.2)
         +- Filter (cast(col1#1 as double) > 0.2)
            +- Project [col1#1, col2#2]
               +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Analyzed Logical Plan ==
col1: string, col2: string, col6: double
Filter ((cast(col1#1 as double) > 0.2) AND (cast(col2#2 as double) > 0.3))
+- Project [col1#1, col2#2, (cast(col1#1 as double) + cast(col2#2 as double)) AS col6#43]
   +- Project [col1#1, col2#2]
      +- Filter (cast(col2#2 as double) > 0.2)
         +- Filter (cast(col1#1 as double) > 0.2)
            +- Project [col1#1, col2#2]
               +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Optimized Logical Plan ==
Project [col1#1, col2#2, (cast(col1#1 as double) + cast(col2#2 as double)) AS col6#43]


In [11]:
df4 = df.select("col1", "col2",
       (col("col1") + col("col2")).alias("col6") )\
       .filter((col("col1") > 0.2) & (col("col2") > 0.3)) \
       

df4.explain(True)

== Parsed Logical Plan ==
'Filter (('col1 > 0.2) AND ('col2 > 0.3))
+- Project [col1#1, col2#2, (cast(col1#1 as double) + cast(col2#2 as double)) AS col6#47]
   +- Filter (cast(col2#2 as double) > 0.2)
      +- Filter (cast(col1#1 as double) > 0.2)
         +- Project [col1#1, col2#2]
            +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Analyzed Logical Plan ==
col1: string, col2: string, col6: double
Filter ((cast(col1#1 as double) > 0.2) AND (cast(col2#2 as double) > 0.3))
+- Project [col1#1, col2#2, (cast(col1#1 as double) + cast(col2#2 as double)) AS col6#47]
   +- Filter (cast(col2#2 as double) > 0.2)
      +- Filter (cast(col1#1 as double) > 0.2)
         +- Project [col1#1, col2#2]
            +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Optimized Logical Plan ==
Project [col1#1, col2#2, (cast(col1#1 as double) + cast(col2#2 as double)) AS col6#47]
+- Filter ((isnotnull(col1#1) AND isnotnull(col2#2)) AND ((cast(col1#1 as double) > 0.

In [38]:
df.rdd.toDebugString()


b'(2) MapPartitionsRDD[36] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[35] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  SQLExecutionRDD[34] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[33] at javaToPython at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[11] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []\n |  MapPartitionsRDD[10] at map at SerDeUtil.scala:69 []\n |  MapPartitionsRDD[9] at mapPartitions at SerDeUtil.scala:117 []\n |  PythonRDD[8] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[7] at readRDDFromFile at PythonRDD.scala:289 []'

In [5]:
for column in ["col1", "col2"]:
    df = df.select(
    "col1",
    "col2") \
    .filter(col(column)>0.4)\
   

df.explain(True)

== Parsed Logical Plan ==
'Filter ('col2 > 0.4)
+- Project [col1#1, col2#2]
   +- Filter (cast(col1#1 as double) > 0.4)
      +- Project [col1#1, col2#2]
         +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Analyzed Logical Plan ==
col1: string, col2: string
Filter (cast(col2#2 as double) > 0.4)
+- Project [col1#1, col2#2]
   +- Filter (cast(col1#1 as double) > 0.4)
      +- Project [col1#1, col2#2]
         +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Optimized Logical Plan ==
Project [col1#1, col2#2]
+- Filter ((isnotnull(col1#1) AND isnotnull(col2#2)) AND ((cast(col1#1 as double) > 0.4) AND (cast(col2#2 as double) > 0.4)))
   +- LogicalRDD [col0#0, col1#1, col2#2, col3#3, col4#4], false

== Physical Plan ==
*(1) Project [col1#1, col2#2]
+- *(1) Filter ((isnotnull(col1#1) AND isnotnull(col2#2)) AND ((cast(col1#1 as double) > 0.4) AND (cast(col2#2 as double) > 0.4)))
   +- *(1) Scan ExistingRDD[col0#0,col1#1,col2#2,col3#3,col4#4]



In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import datetime

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Inefficient Transformations Example") \
    .getOrCreate()

# Sample data
data = [
    (1, 100.0, "2024-01-01", "debit"),
    (2, 200.0, "2024-01-02", "credit"),
    (3, 150.0, "2024-01-03", "debit"),
    (4, 300.0, "2024-01-04", "credit"),
]

# Create DataFrame
df = spark.createDataFrame(data, ["customer_id", "transaction_amount", "transaction_date", "transaction_type"])

# Define a UDF for some complex logic (e.g., applying a discount)
def apply_discount(amount):
    if amount > 250:
        return amount * 0.9  # 10% discount
    return amount

discount_udf = udf(apply_discount, DoubleType())

# Inefficient transformation using for loop
for i in range(1, 4):
    df = df.withColumn(f"transaction_amount_{i}", df["transaction_amount"] + i)  # Adding a constant
    df = df.withColumn(f"discounted_amount_{i}", discount_udf(df["transaction_amount"]))  # Applying discount

# Show the final DataFrame
df.show(truncate=False)

# Show the query plan
df.explain(True)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/14 19:15:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-----------+------------------+----------------+----------------+--------------------+-------------------+--------------------+-------------------+--------------------+-------------------+
|customer_id|transaction_amount|transaction_date|transaction_type|transaction_amount_1|discounted_amount_1|transaction_amount_2|discounted_amount_2|transaction_amount_3|discounted_amount_3|
+-----------+------------------+----------------+----------------+--------------------+-------------------+--------------------+-------------------+--------------------+-------------------+
|1          |100.0             |2024-01-01      |debit           |101.0               |100.0              |102.0               |100.0              |103.0               |100.0              |
|2          |200.0             |2024-01-02      |credit          |201.0               |200.0              |202.0               |200.0              |203.0               |200.0              |
|3          |150.0             |2024-01-03      |d

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 35732)
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/local/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/lib/python3.11/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/lib/python3.11/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyspark/accumulators.p

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create Spark session
spark = SparkSession.builder.appName("ComplexExample").getOrCreate()

# Sample data for transactions
transactions_data = [
    (1, 100.0, "2024-01-01", "purchase"),
    (1, 200.0, "2024-01-02", "purchase"),
    (2, 150.0, "2024-01-01", "refund"),
    (3, 300.0, "2024-01-01", "purchase"),
]

# Sample data for customers
customers_data = [
    (1, "gold", "2023-01-01"),
    (2, "silver", "2023-05-01"),
    (3, "gold", "2022-01-01"),
]

# Create DataFrames
transactions_df = spark.createDataFrame(transactions_data, ["customer_id", "transaction_amount", "transaction_date", "transaction_type"])
customers_df = spark.createDataFrame(customers_data, ["customer_id", "membership_status", "registration_date"])

# Perform the transformations
result_df = (transactions_df
    .join(customers_df, on="customer_id", how="inner")
    .groupBy("customer_id", "membership_status")
    .agg(
        F.sum("transaction_amount").alias("total_amount"),
        F.count("transaction_type").alias("transaction_count")
    )
    .withColumn("discount", 
        F.when(F.col("membership_status") == "gold", F.col("total_amount") * 0.1)  # 10% discount for gold members
         .when(F.col("membership_status") == "silver", F.col("total_amount") * 0.05)  # 5% discount for silver members
         .otherwise(0)
    )
    .withColumn("final_amount", F.col("total_amount") - F.col("discount"))
)

# Show results
result_df.show(truncate=False)

# Explain the execution plan
result_df.explain(True)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/14 19:18:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-----------+-----------------+------------+-----------------+--------+------------+
|customer_id|membership_status|total_amount|transaction_count|discount|final_amount|
+-----------+-----------------+------------+-----------------+--------+------------+
|1          |gold             |300.0       |2                |30.0    |270.0       |
|2          |silver           |150.0       |1                |7.5     |142.5       |
|3          |gold             |300.0       |1                |30.0    |270.0       |
+-----------+-----------------+------------+-----------------+--------+------------+

== Parsed Logical Plan ==
'Project [customer_id#0L, membership_status#9, total_amount#27, transaction_count#29L, discount#34, ('total_amount - 'discount) AS final_amount#40]
+- Project [customer_id#0L, membership_status#9, total_amount#27, transaction_count#29L, CASE WHEN (membership_status#9 = gold) THEN (total_amount#27 * 0.1) WHEN (membership_status#9 = silver) THEN (total_amount#27 * 0.05) ELSE ca

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf

def generate_test_data(spark, names,num_groups=3, overlap=True):
    """
    Generate a list of DataFrames with potential overlapping 'UCIDs' in 'col0'.
    
    Parameters:
        spark (SparkSession): The Spark session object.
        num_cohorts (int): Number of cohorts (DataFrames) to generate.
        overlap (bool): If True, allow overlap of UCIDs between cohorts.
        
    Returns:
        List[DataFrame]: A list of PySpark DataFrames with 'UCID' overlap.
    """
    df_list = []
    
    base_names = names
    
    for i in range(num_groups):
        # Generate a subset of names with potential overlap
        if overlap:
            names = base_names[:i+2]  # Increasing overlap with each cohort
        else:
            ucids = [f'Person_{i}_{j}' for j in range(num_groups)]
        
        # Create a DataFrame for the group
        group_df = spark.createDataFrame([(name, f'Group_{i}') for name in names], ["names", "group_id"])
        df_list.append(group_df)
    
    return df_list


In [35]:
names = person_list = ["Person_"+str(i) for i in range(100)]
test_data = generate_test_data(spark, names)
df = reduce(DataFrame.unionByName, test_data).cache()


In [36]:
df.show()



+--------+--------+
|   names|group_id|
+--------+--------+
|Person_0| Group_0|
|Person_1| Group_0|
|Person_0| Group_1|
|Person_1| Group_1|
|Person_2| Group_1|
|Person_0| Group_2|
|Person_1| Group_2|
|Person_2| Group_2|
|Person_3| Group_2|
+--------+--------+



                                                                                

In [37]:
df .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [names#186, group_id#187]
      +- InMemoryRelation [names#186, group_id#187], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- Union
               :- *(1) Scan ExistingRDD[names#186,group_id#187]
               :- *(2) Scan ExistingRDD[names#190,group_id#191]
               +- *(3) Scan ExistingRDD[names#194,group_id#195]




In [32]:
df = df.groupBy("group_id").agg(sf.count("*").alias("count"))

In [33]:
df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[group_id#141], functions=[count(1)])
   +- Exchange hashpartitioning(group_id#141, 200), ENSURE_REQUIREMENTS, [plan_id=397]
      +- HashAggregate(keys=[group_id#141], functions=[partial_count(1)])
         +- Union
            :- Project [group_id#141]
            :  +- Scan ExistingRDD[names#140,group_id#141]
            :- Project [group_id#145]
            :  +- Scan ExistingRDD[names#144,group_id#145]
            +- Project [group_id#149]
               +- Scan ExistingRDD[names#148,group_id#149]




In [34]:
df.show()



+--------+-----+
|group_id|count|
+--------+-----+
| Group_1|    3|
| Group_2|    4|
| Group_0|    2|
+--------+-----+



                                                                                