# Question
Assume I have Dataframe #1 (csv like) of this type:
Dataframe #1:
```
id1,id2
id2,id3
idX,idY, idZ
etc.
```
(length of each row is different, up to M "columns")

And another Dataframe #2:
```id1,id2 -> data1
id3,id4 -> data2
id5, id6 -> data3
id2 -> data4
id1,id2,id3,id4,id100 -> data1700
etc.
```
(length of each row is different, I have more rows than in Dataframe #1, and they generally longer, up to N columns)

We want to get for each row of "Dataframe #1" the dataX **only if all the ids from Dataframe #1 are fully contained in Dataframe #2 row**.

Example:
First row of Dataframe #1, (id1,id2) are fully contained in first row of Dataframe #2, (id1,id2 -> data1) therefore we **return** data1,
it also fully contained in (id1,id2,id3,id4,id100 -> data1700) so we **return** data1700. It isn't fully contained in (id2 -> data4) therefore we will **NOT return** data4

How would you do such a thing using spark?

# Install Spark

In [4]:
!pip install pyspark==3.3.2


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


# Create a Spark Session

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("InterviewQuestion").getOrCreate() # Check Spark Session Information
spark

# Solution

In [7]:
from pyspark.sql.functions import array_contains
from pyspark.sql.functions import col, array

# create example dataframes
df1 = spark.createDataFrame([(1, 2), (3, 4), (5, 6), (None, 9)], ["T1_id1", "T1_id2"])
df2 = spark.createDataFrame([(1, 2, 3, "data1"), (1, 2, None, "data2") ,(1, 3, None, "data3"), (3, None, None, "data4"), (4, None, None, "data5"), (5,6, None, "dataS"), (5,9, None, "dataX"), (None,9, None, "data9")], ["T2_id1", "T2_id2", "T2_id3", "data"])
# Note I have an implicit assumption that each row in df1/df2 is of the same size, and it's not necessary the case, one workaround could be padding nulls in order to make sure each row have the same amount of columns.

# perform cartesian join between df1 and df2
cartesian_df = df1.crossJoin(df2)

# filter the cartesian join to keep only the rows where the key from df1 is fully contained in the key from df2, if the key is None its considered "contained":
filtered_df = cartesian_df.filter(
    (col("T1_id1").isNull() | array_contains(array(col("T2_id1"), col("T2_id2"), col("T2_id3")), col("T1_id1"))) &
    (col("T1_id2").isNull() | array_contains(array(col("T2_id1"), col("T2_id2"), col("T2_id3")), col("T1_id2"))))

# print the output after every step - for debugging purposes, *NOT for production*
print("df1:")
df1.show(100)
print("df2:")
df2.show(100)
print("cartesian_df:")
cartesian_df.show(100)
print("filtered_df:")
filtered_df.show(100)

df1:
+------+------+
|T1_id1|T1_id2|
+------+------+
|     1|     2|
|     3|     4|
|     5|     6|
|  null|     9|
+------+------+

df2:
+------+------+------+-----+
|T2_id1|T2_id2|T2_id3| data|
+------+------+------+-----+
|     1|     2|     3|data1|
|     1|     2|  null|data2|
|     1|     3|  null|data3|
|     3|  null|  null|data4|
|     4|  null|  null|data5|
|     5|     6|  null|dataS|
|     5|     9|  null|dataX|
|  null|     9|  null|data9|
+------+------+------+-----+

cartesian_df:
+------+------+------+------+------+-----+
|T1_id1|T1_id2|T2_id1|T2_id2|T2_id3| data|
+------+------+------+------+------+-----+
|     1|     2|     1|     2|     3|data1|
|     1|     2|     1|     2|  null|data2|
|     1|     2|     1|     3|  null|data3|
|     1|     2|     3|  null|  null|data4|
|     1|     2|     4|  null|  null|data5|
|     1|     2|     5|     6|  null|dataS|
|     1|     2|     5|     9|  null|dataX|
|     1|     2|  null|     9|  null|data9|
|     3|     4|     1|   