Problem Statement:

You want to fill the NULL values in the brand column by using product_id as a mapping key. For instance, if there are rows where brand is NULL and the product_id is the same as another row where the brand is not NULL, you want to fill the missing brand based on the product_id.

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("SparkBackFilling").getOrCreate()

In [2]:
# Input:

# +-------+-----------+
# | brand | product_id|
# +-------+-----------+
# | A     | 1234      |
# | B     | 5678      |
# | NULL  | 5678      |
# | C     | 9101      |
# | NULL  | 1234      |
# | NULL  | 9101      |
# | D     | 1112      |
# +-------+-----------+

# output:

# +-------+-----------+
# | brand | product_id|
# +-------+-----------+
# | A     | 1234      |
# | B     | 5678      |
# | B     | 5678      |
# | C     | 9101      |
# | A     | 1234      |
# | C     | 9101      |
# | D     | 1112      |
# +-------+-----------+



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("BrandProductFilling").getOrCreate()

# Sample data with nulls in the brand column
data = [
    ('A', 1234),
    ('B', 5678),
    (None, 5678),
    ('C', 9101),
    (None, 1234),
    (None, 9101),
    ('D', 1112)
]

# Define schema and create DataFrame
columns = ['brand', 'product_id']
df = spark.createDataFrame(data, schema=columns)

df.show(truncate=False)


+-----+----------+
|brand|product_id|
+-----+----------+
|A    |1234      |
|B    |5678      |
|NULL |5678      |
|C    |9101      |
|NULL |1234      |
|NULL |9101      |
|D    |1112      |
+-----+----------+



In [19]:
import pyspark.sql.functions as fx
from pyspark.sql.window import Window

window_spec = Window.partitionBy('product_id')

df_not_missing = df.filter(fx.col('brand').isNotNull())


# Perform a left join to map missing brands based on product_id
df_filled = df.alias('df1').join(
    df_not_missing.alias('df2'),
    on='product_id',
    how='left'
).select(
    'df1.product_id',
    fx.coalesce('df1.brand', 'df2.brand').alias('brand')
)

df_filled.show()

+----------+-----+
|product_id|brand|
+----------+-----+
|      1234|    A|
|      5678|    B|
|      5678|    B|
|      9101|    C|
|      1234|    A|
|      9101|    C|
|      1112|    D|
+----------+-----+



In [21]:
df3 = df.withColumn(
    "brand_filled",
    fx.first("brand", ignorenulls=True).over(Window.partitionBy("product_id"))
)
df3.show()

df4 = df3.withColumn(
    "brand_final", fx.coalesce("brand", "brand_filled")
)

df4.select('product_id','brand_final').show()


+-----+----------+------------+
|brand|product_id|brand_filled|
+-----+----------+------------+
|    D|      1112|           D|
|    A|      1234|           A|
| NULL|      1234|           A|
|    B|      5678|           B|
| NULL|      5678|           B|
|    C|      9101|           C|
| NULL|      9101|           C|
+-----+----------+------------+

+----------+-----------+
|product_id|brand_final|
+----------+-----------+
|      1112|          D|
|      1234|          A|
|      1234|          A|
|      5678|          B|
|      5678|          B|
|      9101|          C|
|      9101|          C|
+----------+-----------+

