A solution to problem stated in this post - 
https://medium.com/bluekiri/check-your-pyspark-abilities-by-solving-this-quick-challenge-86f563a343dd

In [6]:
import pandas as pd

# create dataset for internal flights
trips = pd.DataFrame({
    "origin": [
        "PMI",
        "ATH",
        "JFK",
        "HND"
    ],
    "destination": [
        "OPO",
        "BCN",
        "MAD",
        "LAX"
    ],
    "internal_flight_ids": [
        [2, 1],
        [3],
        [5, 4, 6],
        [8, 9, 7, 0]
    ]    
})

trips

Unnamed: 0,origin,destination,internal_flight_ids
0,PMI,OPO,"[2, 1]"
1,ATH,BCN,[3]
2,JFK,MAD,"[5, 4, 6]"
3,HND,LAX,"[8, 9, 7, 0]"


In [7]:
# create dataset with public and internal flights reference information
flights = pd.DataFrame({
    "internal_flight_id": [
        0, 1, 2, 3, 4, 5, 6, 7, 8, 9
    ],
    "public_flight_number": [
        "FR5763", "UT9586", "B4325", "RW35675", "LP656",
        "NB4321", "CX4599", "AZ8844", "KH8851", "OP8777"
    ]
})
flights

Unnamed: 0,internal_flight_id,public_flight_number
0,0,FR5763
1,1,UT9586
2,2,B4325
3,3,RW35675
4,4,LP656
5,5,NB4321
6,6,CX4599
7,7,AZ8844
8,8,KH8851
9,9,OP8777


In [8]:
# convert into spark dataframe
tripsDF = spark.createDataFrame(trips)
tripsDF.show(10)

+------+-----------+-------------------+
|origin|destination|internal_flight_ids|
+------+-----------+-------------------+
|   PMI|        OPO|             [2, 1]|
|   ATH|        BCN|                [3]|
|   JFK|        MAD|          [5, 4, 6]|
|   HND|        LAX|       [8, 9, 7, 0]|
+------+-----------+-------------------+



In [9]:
# same for flights
flightDF = spark.createDataFrame(flights)
flightDF.show(5)

+------------------+--------------------+
|internal_flight_id|public_flight_number|
+------------------+--------------------+
|                 0|              FR5763|
|                 1|              UT9586|
|                 2|               B4325|
|                 3|             RW35675|
|                 4|               LP656|
+------------------+--------------------+
only showing top 5 rows



In [13]:
tripsDF.createOrReplaceTempView("trips_table")
flightDF.createOrReplaceTempView("flights_table")

In [22]:
from pyspark.sql.functions import col, explode, posexplode, monotonically_increasing_id, collect_list
from pyspark.sql.window import Window

t = tripsDF.withColumn("rowid", monotonically_increasing_id())
t.show()


+------+-----------+-------------------+-----------+
|origin|destination|internal_flight_ids|      rowid|
+------+-----------+-------------------+-----------+
|   PMI|        OPO|             [2, 1]|          0|
|   ATH|        BCN|                [3]| 8589934592|
|   JFK|        MAD|          [5, 4, 6]|17179869184|
|   HND|        LAX|       [8, 9, 7, 0]|25769803776|
+------+-----------+-------------------+-----------+



In [30]:
exploded = t.select("rowid", explode(col("internal_flight_ids")).alias("internal_flight_id"))
exploded.show()

+-----------+------------------+
|      rowid|internal_flight_id|
+-----------+------------------+
|          0|                 2|
|          0|                 1|
| 8589934592|                 3|
|17179869184|                 5|
|17179869184|                 4|
|17179869184|                 6|
|25769803776|                 8|
|25769803776|                 9|
|25769803776|                 7|
|25769803776|                 0|
+-----------+------------------+



In [33]:
# join exploded with flights data
exploded_with_flight_number = exploded.join(flightDF, on="internal_flight_id")
exploded_with_flight_number.show()

+------------------+-----------+--------------------+
|internal_flight_id|      rowid|public_flight_number|
+------------------+-----------+--------------------+
|                 0|25769803776|              FR5763|
|                 7|25769803776|              AZ8844|
|                 6|17179869184|              CX4599|
|                 9|25769803776|              OP8777|
|                 5|17179869184|              NB4321|
|                 1|          0|              UT9586|
|                 3| 8589934592|             RW35675|
|                 8|25769803776|              KH8851|
|                 2|          0|               B4325|
|                 4|17179869184|               LP656|
+------------------+-----------+--------------------+



In [38]:
# group and collect the list of flights
collected_flights = exploded_with_flight_number.groupBy("rowid")\
    .agg(collect_list("public_flight_number"))\
    .alias("public_fligh_number")

collected_flights.show(5, False)

+-----------+----------------------------------+
|rowid      |collect_list(public_flight_number)|
+-----------+----------------------------------+
|8589934592 |[RW35675]                         |
|0          |[UT9586, B4325]                   |
|25769803776|[FR5763, AZ8844, OP8777, KH8851]  |
|17179869184|[CX4599, NB4321, LP656]           |
+-----------+----------------------------------+



In [39]:
# notice the order of flights is wrong in last column; this is because of explode funtion runs in parallel in spark
t.join(collected_flights, on="rowid").drop("rowid").show()

+------+-----------+-------------------+----------------------------------+
|origin|destination|internal_flight_ids|collect_list(public_flight_number)|
+------+-----------+-------------------+----------------------------------+
|   ATH|        BCN|                [3]|                         [RW35675]|
|   PMI|        OPO|             [2, 1]|                   [UT9586, B4325]|
|   HND|        LAX|       [8, 9, 7, 0]|              [FR5763, AZ8844, ...|
|   JFK|        MAD|          [5, 4, 6]|              [CX4599, NB4321, ...|
+------+-----------+-------------------+----------------------------------+



## Correct Solution is 

In [40]:
# correct solution is using posexplode
tt = tripsDF.withColumn("rowid", monotonically_increasing_id())
tt.show()

+------+-----------+-------------------+-----------+
|origin|destination|internal_flight_ids|      rowid|
+------+-----------+-------------------+-----------+
|   PMI|        OPO|             [2, 1]|          0|
|   ATH|        BCN|                [3]| 8589934592|
|   JFK|        MAD|          [5, 4, 6]|17179869184|
|   HND|        LAX|       [8, 9, 7, 0]|25769803776|
+------+-----------+-------------------+-----------+



In [42]:
# next posexplode the internal_flight_ids
exploded_flights = tt.select("rowid", posexplode("internal_flight_ids"))\
    .withColumnRenamed("pos", "position")\
    .withColumnRenamed("col", "internal_flight_id")

exploded_flights.show()

+-----------+--------+------------------+
|      rowid|position|internal_flight_id|
+-----------+--------+------------------+
|          0|       0|                 2|
|          0|       1|                 1|
| 8589934592|       0|                 3|
|17179869184|       0|                 5|
|17179869184|       1|                 4|
|17179869184|       2|                 6|
|25769803776|       0|                 8|
|25769803776|       1|                 9|
|25769803776|       2|                 7|
|25769803776|       3|                 0|
+-----------+--------+------------------+



In [44]:
# now map it with flights data
exploded_with_flights = exploded_flights.join(flightDF, on="internal_flight_id")
exploded_with_flights.show()

+------------------+-----------+--------+--------------------+
|internal_flight_id|      rowid|position|public_flight_number|
+------------------+-----------+--------+--------------------+
|                 0|25769803776|       3|              FR5763|
|                 7|25769803776|       2|              AZ8844|
|                 6|17179869184|       2|              CX4599|
|                 9|25769803776|       1|              OP8777|
|                 5|17179869184|       0|              NB4321|
|                 1|          0|       1|              UT9586|
|                 3| 8589934592|       0|             RW35675|
|                 8|25769803776|       0|              KH8851|
|                 2|          0|       0|               B4325|
|                 4|17179869184|       1|               LP656|
+------------------+-----------+--------+--------------------+



In [46]:
# next group and collect list order by position
collected = exploded_with_flights\
    .withColumn("public_flight_numbers", collect_list("public_flight_number")\
    .over(Window \
         .partitionBy("rowid")\
         .orderBy("position")\
         .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))\
    .select("rowid", "public_flight_numbers")

collected.show(10, False)
    

+-----------+--------------------------------+
|rowid      |public_flight_numbers           |
+-----------+--------------------------------+
|8589934592 |[RW35675]                       |
|0          |[B4325, UT9586]                 |
|0          |[B4325, UT9586]                 |
|25769803776|[KH8851, OP8777, AZ8844, FR5763]|
|25769803776|[KH8851, OP8777, AZ8844, FR5763]|
|25769803776|[KH8851, OP8777, AZ8844, FR5763]|
|25769803776|[KH8851, OP8777, AZ8844, FR5763]|
|17179869184|[NB4321, LP656, CX4599]         |
|17179869184|[NB4321, LP656, CX4599]         |
|17179869184|[NB4321, LP656, CX4599]         |
+-----------+--------------------------------+



In [52]:
# remove duplicates and combine it with actual trips data
final = collected.dropDuplicates()\
    .join(tt, on="rowid")\
    .drop("rowid")\
    .select("origin", "destination", "internal_flight_ids", "public_flight_numbers")

final.show(truncate=False)

+------+-----------+-------------------+--------------------------------+
|origin|destination|internal_flight_ids|public_flight_numbers           |
+------+-----------+-------------------+--------------------------------+
|ATH   |BCN        |[3]                |[RW35675]                       |
|PMI   |OPO        |[2, 1]             |[B4325, UT9586]                 |
|HND   |LAX        |[8, 9, 7, 0]       |[KH8851, OP8777, AZ8844, FR5763]|
|JFK   |MAD        |[5, 4, 6]          |[NB4321, LP656, CX4599]         |
+------+-----------+-------------------+--------------------------------+



In [65]:
#final.rdd.map(lambda x: [flightDF['public_flight_number'] for i in x if i == flightDF['internal_flight_id'] ]).take(2)