In [8]:
#Importing Necessary libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext 
from pyspark.sql import SQLContext 
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.master("yarn")\
        .config("spark.port.maxRetries", 100)\
        .config("spark.executor.instances", "6")\
        .config("spark.executor.cores", "4")\
        .config("spark.executor.memory", "16G")\
        .config("spark.driver.memory", "4G")\
        .config("spark.dynamicAllocation.enabled", "false")\
        .config("spark.yarn.queue", "Low")\
        .config("spark.port.maxRetries", 100)\
        .appName("TestJoin")\
        .getOrCreate()

In [9]:
# Create test dataframes

list_1 = [('Bob', 1),('Sue', 3), ('Paul', 4),('Alice', 2),('Josh', 2)]
df1 = spark.createDataFrame(list_1, ['colA', 'colB'])

list_2 = [('Bob', 3,'XYZ'),('Sue', 2,'XYZ'), ('Pam', 2,'XYZ'),('Arthur', 2,'XYZ'), ('Josh',1,'ABC')]
df2 = spark.createDataFrame(list_2, ['colA', 'colB', 'colC'])

df1.show()
df2.show()

+-----+----+
| colA|colB|
+-----+----+
|  Bob|   1|
|  Sue|   3|
| Paul|   4|
|Alice|   2|
| Josh|   2|
+-----+----+

+------+----+----+
|  colA|colB|colC|
+------+----+----+
|   Bob|   3| XYZ|
|   Sue|   2| XYZ|
|   Pam|   2| XYZ|
|Arthur|   2| XYZ|
|  Josh|   1| ABC|
+------+----+----+



In [10]:
#Temporary dataframe to join df1 and df2, update df1.colB to zero if null

temp_df = df2.join(df1, "colA", how = "left").select("*", df1.colB.alias("df1_colB")).drop(df1.colB)
# .fillna(0)

temp_df.show()

+------+----+----+--------+
|  colA|colB|colC|df1_colB|
+------+----+----+--------+
|   Sue|   2| XYZ|       3|
|  Josh|   1| ABC|       2|
|Arthur|   2| XYZ|    null|
|   Bob|   3| XYZ|       1|
|   Pam|   2| XYZ|    null|
+------+----+----+--------+



In [11]:
# Update df2 if following coditions satisfy
# Set df2.colB = df1.colB + df2.colB if
# 1) df2.colB is less than df1.colB
# 2) where df1.colA = df2.colA
# 3) df.colC = XYZ

df2 = temp_df.withColumn("colB", when((col("colB") <= col("df1_colB")) \
                                         & (temp_df.df1_colB.isNotNull()) \
                                         & (temp_df.colC == 'XYZ'), col("colB") + col("df1_colB")).otherwise(col("colB"))) \
                                        .drop("df1_colB")
    
df2.show()

+------+----+----+
|  colA|colB|colC|
+------+----+----+
|   Sue|   5| XYZ|
|  Josh|   1| ABC|
|Arthur|   2| XYZ|
|   Bob|   3| XYZ|
|   Pam|   2| XYZ|
+------+----+----+

