In [1]:
!pip install pyspark py4j

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=33a5bad72d916f30af55c44c00af7377e966e63cb8a15a77c4896a6fd5e62cd7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("remove redundant pair") \
    .getOrCreate()

spark.getActiveSession()

In [3]:
data = [
    ('apple', 'samsung', 2020, 1, 2, 1, 2),
    ('samsung', 'apple', 2020, 1, 2, 1, 2),
    ('apple', 'samsung', 2021, 1, 2, 5, 3),
    ('samsung', 'apple', 2021, 5, 3, 1, 2),
    ('google', None, 2020, 5, 9, None, None),
    ('oneplus', 'nothing', 2020, 5, 9, 6, 3)
]
schema = 'brand1 string , brand2 string , year int , custom1 int, custom2 int , custom3 int , custom4 int'

df = spark.createDataFrame(data = data , schema = schema)
df.show()

+-------+-------+----+-------+-------+-------+-------+
| brand1| brand2|year|custom1|custom2|custom3|custom4|
+-------+-------+----+-------+-------+-------+-------+
|  apple|samsung|2020|      1|      2|      1|      2|
|samsung|  apple|2020|      1|      2|      1|      2|
|  apple|samsung|2021|      1|      2|      5|      3|
|samsung|  apple|2021|      5|      3|      1|      2|
| google|   NULL|2020|      5|      9|   NULL|   NULL|
|oneplus|nothing|2020|      5|      9|      6|      3|
+-------+-------+----+-------+-------+-------+-------+



In [10]:
from pyspark.sql.functions import *
# df1 = df.withColumn("key", concat_ws("",df.brand1,df.brand2, df.year))
df1 = df.withColumn("key", when(col("brand1") < col("brand2"), concat(df.brand1, df.brand2, df.year)).otherwise(concat(df.brand2, df.brand1, df.year)))
df1.show()

+-------+-------+----+-------+-------+-------+-------+------------------+
| brand1| brand2|year|custom1|custom2|custom3|custom4|               key|
+-------+-------+----+-------+-------+-------+-------+------------------+
|  apple|samsung|2020|      1|      2|      1|      2|  applesamsung2020|
|samsung|  apple|2020|      1|      2|      1|      2|  applesamsung2020|
|  apple|samsung|2021|      1|      2|      5|      3|  applesamsung2021|
|samsung|  apple|2021|      5|      3|      1|      2|  applesamsung2021|
| google|   NULL|2020|      5|      9|   NULL|   NULL|              NULL|
|oneplus|nothing|2020|      5|      9|      6|      3|nothingoneplus2020|
+-------+-------+----+-------+-------+-------+-------+------------------+



In [15]:
#creating a row_number
from pyspark.sql import Window
window_df = Window.partitionBy("key").orderBy(col("key"))

In [16]:
row_df = df1.withColumn("r_num",row_number().over(window_df))
row_df.show()

+-------+-------+----+-------+-------+-------+-------+------------------+-----+
| brand1| brand2|year|custom1|custom2|custom3|custom4|               key|r_num|
+-------+-------+----+-------+-------+-------+-------+------------------+-----+
| google|   NULL|2020|      5|      9|   NULL|   NULL|              NULL|    1|
|  apple|samsung|2020|      1|      2|      1|      2|  applesamsung2020|    1|
|samsung|  apple|2020|      1|      2|      1|      2|  applesamsung2020|    2|
|  apple|samsung|2021|      1|      2|      5|      3|  applesamsung2021|    1|
|samsung|  apple|2021|      5|      3|      1|      2|  applesamsung2021|    2|
|oneplus|nothing|2020|      5|      9|      6|      3|nothingoneplus2020|    1|
+-------+-------+----+-------+-------+-------+-------+------------------+-----+



In [21]:
final_df = row_df.filter((col("r_num") == 1) | ((col("custom1") != col("custom3")) | (col("custom2") != col("custom4"))))
final_df.show()

+-------+-------+----+-------+-------+-------+-------+------------------+-----+
| brand1| brand2|year|custom1|custom2|custom3|custom4|               key|r_num|
+-------+-------+----+-------+-------+-------+-------+------------------+-----+
| google|   NULL|2020|      5|      9|   NULL|   NULL|              NULL|    1|
|  apple|samsung|2020|      1|      2|      1|      2|  applesamsung2020|    1|
|  apple|samsung|2021|      1|      2|      5|      3|  applesamsung2021|    1|
|samsung|  apple|2021|      5|      3|      1|      2|  applesamsung2021|    2|
|oneplus|nothing|2020|      5|      9|      6|      3|nothingoneplus2020|    1|
+-------+-------+----+-------+-------+-------+-------+------------------+-----+



In [23]:
final_df.select("brand1","brand2","year","custom1","custom2","custom3","custom4").show()

+-------+-------+----+-------+-------+-------+-------+
| brand1| brand2|year|custom1|custom2|custom3|custom4|
+-------+-------+----+-------+-------+-------+-------+
| google|   NULL|2020|      5|      9|   NULL|   NULL|
|  apple|samsung|2020|      1|      2|      1|      2|
|  apple|samsung|2021|      1|      2|      5|      3|
|samsung|  apple|2021|      5|      3|      1|      2|
|oneplus|nothing|2020|      5|      9|      6|      3|
+-------+-------+----+-------+-------+-------+-------+

