###1. The Exploding GroupBy without using flatten

In [0]:
data = [
    ("A", [1, 2, 3]),
    ("A", [4]),
    ("B", [5, 6]),
    ("B", []),
    ("C", None)
]
df = spark.createDataFrame(data, ["key", "values"])
display(df)

key,values
A,"List(1, 2, 3)"
A,List(4)
B,"List(5, 6)"
B,List()
C,


In [0]:
from pyspark.sql.functions import collect_list, flatten

df1 = df.groupBy("key").agg(collect_list("values").alias("values"))
display(df1)

key,values
A,"List(List(1, 2, 3), List(4))"
B,"List(List(5, 6), List())"
C,List()


In [0]:
from pyspark.sql.functions import flatten

df2 = df1.withColumn("values", flatten("values"))
display(df2)

key,values
A,"List(1, 2, 3, 4)"
B,"List(5, 6)"
C,List()


In [0]:
#without using the flatten functions

from pyspark.sql.functions import *

df3 = df.withColumn("flat_values",explode_outer("values").alias("values"))
display(df3)

key,values,flat_values
A,"List(1, 2, 3)",1.0
A,"List(1, 2, 3)",2.0
A,"List(1, 2, 3)",3.0
A,List(4),4.0
B,"List(5, 6)",5.0
B,"List(5, 6)",6.0
B,List(),
C,,


In [0]:
df4 = df3.groupBy("key").agg(collect_list("flat_values").alias("values1")).drop("values")
display(df4)

key,values1
A,"List(1, 2, 3, 4)"
B,"List(5, 6)"
C,List()


In [0]:
%sql
CREATE TABLE Sales (
    id INT,
    product VARCHAR(50),
    amount INT
);

-- Insert the provided data
INSERT INTO Sales (id, product, amount) VALUES
(1, 'A', 100),
(2, 'A', NULL),
(3, 'A', NULL),
(4, 'B', 200),
(5, 'B', NULL);

num_affected_rows,num_inserted_rows
5,5


###2. The Phantom NULLs 

In [0]:
%sql
select * from sales 

id,product,amount
1,A,100.0
2,A,
3,A,
4,B,200.0
5,B,


In [0]:
%sql
WITH ProductCounts AS (
    SELECT 
        product,
        COUNT(CASE WHEN amount IS NULL THEN 1 END) AS null_count,
        COUNT(CASE WHEN amount IS NOT NULL THEN 1 END) AS non_null_count
    FROM Sales
    GROUP BY product
)
SELECT COUNT(*) AS products_with_more_nulls
FROM ProductCounts
WHERE null_count > non_null_count;

products_with_more_nulls
1


###3. Mask the Phone Number in Pyspark with optimal way

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def mask_phone_number(phone):
    if phone and len(phone) >= 4:
        return '*' * (len(phone) - 4) + phone[-4:]
    return phone

mask_phone_udf = udf(mask_phone_number, StringType())

In [0]:
data = [("Alice", "1234567890"), 
        ("Bob", "9876543210"), 
        ("Charlie", "5551234567")]
columns = ["name", "phone"]

df = spark.createDataFrame(data, columns)
display(df)

df_masked = df.withColumn("masked_phone", mask_phone_udf("phone")).drop("phone")
display(df_masked)

name,phone
Alice,1234567890
Bob,9876543210
Charlie,5551234567


name,masked_phone
Alice,******7890
Bob,******3210
Charlie,******4567


In [0]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("string")
def mask_phone_pandas_udf(phone_series: pd.Series) -> pd.Series:
    return phone_series.apply(lambda phone: '*' * (len(phone) - 4) + phone[-4:] if phone and len(phone) >= 4 else phone)

df_masked = df.withColumn("masked_phone", mask_phone_pandas_udf("phone"))
display(df_masked)

name,phone,masked_phone
Alice,1234567890,******7890
Bob,9876543210,******3210
Charlie,5551234567,******4567
