In [32]:
import pandas as pd
trip=pd.DataFrame({
    "origin":["PMI","ATH","JFK","HND"
             ],
    "destination":[
        "OPO",
        "BCN",
        "MAD",
        "LAX"
    ],
    "internal_flight_ids":[
        [2,1],
        [3],
        [5,4,6],
        [8,9,7,0]
    ]
})

In [4]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [34]:
trips=spark.createDataFrame(trip)

In [35]:
trips.show()

+------+-----------+-------------------+
|origin|destination|internal_flight_ids|
+------+-----------+-------------------+
|   PMI|        OPO|             [2, 1]|
|   ATH|        BCN|                [3]|
|   JFK|        MAD|          [5, 4, 6]|
|   HND|        LAX|       [8, 9, 7, 0]|
+------+-----------+-------------------+



In [37]:
flights=pd.DataFrame({
    "internal_flight_id":[0,1,2,3,4,5,6,7,8,9],
    "public_flight_number":[
        "FR5763","UT9586","B4325","RW35675","LP656",
    "NB4321","CX4599","AZ8844","KH8851","OP8777"
    ]
})

In [38]:
flight=spark.createDataFrame(flights)

In [39]:
flight.show()

+------------------+--------------------+
|internal_flight_id|public_flight_number|
+------------------+--------------------+
|                 0|              FR5763|
|                 1|              UT9586|
|                 2|               B4325|
|                 3|             RW35675|
|                 4|               LP656|
|                 5|              NB4321|
|                 6|              CX4599|
|                 7|              AZ8844|
|                 8|              KH8851|
|                 9|              OP8777|
+------------------+--------------------+



In [40]:
from pyspark.sql.functions import col,explode,posexplode,collect_list,monotonically_increasing_id
from pyspark.sql.window import Window

In [41]:
trips=trips.withColumn("row_id",monotonically_increasing_id())

In [42]:
trips.show()

+------+-----------+-------------------+-----------+
|origin|destination|internal_flight_ids|     row_id|
+------+-----------+-------------------+-----------+
|   PMI|        OPO|             [2, 1]| 8589934592|
|   ATH|        BCN|                [3]|25769803776|
|   JFK|        MAD|          [5, 4, 6]|42949672960|
|   HND|        LAX|       [8, 9, 7, 0]|60129542144|
+------+-----------+-------------------+-----------+



In [43]:
trip_explode=trips\
             .select(col("row_id"),explode(col("internal_flight_ids"))\
                    .alias("internal_flight_id"))
trip_explode.show()

+-----------+------------------+
|     row_id|internal_flight_id|
+-----------+------------------+
| 8589934592|                 2|
| 8589934592|                 1|
|25769803776|                 3|
|42949672960|                 5|
|42949672960|                 4|
|42949672960|                 6|
|60129542144|                 8|
|60129542144|                 9|
|60129542144|                 7|
|60129542144|                 0|
+-----------+------------------+



In [47]:
explode_with_flight=trip_explode.join(flight,trip_explode.internal_flight_id==flight.internal_flight_id)

explode_with_flight.show()

+-----------+------------------+------------------+--------------------+
|     row_id|internal_flight_id|internal_flight_id|public_flight_number|
+-----------+------------------+------------------+--------------------+
|60129542144|                 0|                 0|              FR5763|
|60129542144|                 7|                 7|              AZ8844|
|42949672960|                 6|                 6|              CX4599|
|60129542144|                 9|                 9|              OP8777|
|42949672960|                 5|                 5|              NB4321|
| 8589934592|                 1|                 1|              UT9586|
|25769803776|                 3|                 3|             RW35675|
|60129542144|                 8|                 8|              KH8851|
| 8589934592|                 2|                 2|               B4325|
|42949672960|                 4|                 4|               LP656|
+-----------+------------------+------------------+

In [50]:
final=explode_with_flight.groupBy("row_id")\
                          .agg(collect_list("public_flight_number")\
                              .alias("public_flight_numbers"))

final.show()

+-----------+---------------------+
|     row_id|public_flight_numbers|
+-----------+---------------------+
| 8589934592|      [UT9586, B4325]|
|60129542144| [FR5763, AZ8844, ...|
|42949672960| [CX4599, NB4321, ...|
|25769803776|            [RW35675]|
+-----------+---------------------+



In [51]:
trip_with_flight_number=final.join(trips,on="row_id")\
                        .drop("row_id")\
                         .drop("internal_flight_ids")

trip_with_flight_number.show()

+---------------------+------+-----------+
|public_flight_numbers|origin|destination|
+---------------------+------+-----------+
|      [UT9586, B4325]|   PMI|        OPO|
| [FR5763, AZ8844, ...|   HND|        LAX|
| [CX4599, NB4321, ...|   JFK|        MAD|
|            [RW35675]|   ATH|        BCN|
+---------------------+------+-----------+



In [None]:
# correct method

In [56]:
exploded = trips \
    .select(col("row_id"),
            posexplode(col("internal_flight_ids"))) \
    .withColumnRenamed("col", "internal_flight_id") \
    .withColumnRenamed("pos", "position")

exploded.show()

+-----------+--------+------------------+
|     row_id|position|internal_flight_id|
+-----------+--------+------------------+
| 8589934592|       0|                 2|
| 8589934592|       1|                 1|
|25769803776|       0|                 3|
|42949672960|       0|                 5|
|42949672960|       1|                 4|
|42949672960|       2|                 6|
|60129542144|       0|                 8|
|60129542144|       1|                 9|
|60129542144|       2|                 7|
|60129542144|       3|                 0|
+-----------+--------+------------------+



In [59]:
exploded_with_flight_number=exploded.join(flight,on="internal_flight_id")

collected=exploded_with_flight_number.withColumn("public_flight_number",
                                                collect_list("public_flight_number")
                                                .over(Window\
                                                     .partitionBy("row_id")\
                                                     .orderBy("position")\
                                                     .rowsBetween(Window.unboundedPreceding,
                                                                Window.unboundedFollowing)))\
                                 .select(["row_id","public_flight_number"])

collected.show()
                                    

+-----------+--------------------+
|     row_id|public_flight_number|
+-----------+--------------------+
| 8589934592|     [B4325, UT9586]|
| 8589934592|     [B4325, UT9586]|
|60129542144|[KH8851, OP8777, ...|
|60129542144|[KH8851, OP8777, ...|
|60129542144|[KH8851, OP8777, ...|
|60129542144|[KH8851, OP8777, ...|
|42949672960|[NB4321, LP656, C...|
|42949672960|[NB4321, LP656, C...|
|42949672960|[NB4321, LP656, C...|
|25769803776|           [RW35675]|
+-----------+--------------------+



In [60]:
trips_with_flight_numbers = collected \
    .dropDuplicates() \
    .join(trips, on="row_id") \
    .drop("row_id") \
    .drop("internal_flight_ids")
trips_with_flight_numbers.show()

+--------------------+------+-----------+
|public_flight_number|origin|destination|
+--------------------+------+-----------+
|     [B4325, UT9586]|   PMI|        OPO|
|[KH8851, OP8777, ...|   HND|        LAX|
|[NB4321, LP656, C...|   JFK|        MAD|
|           [RW35675]|   ATH|        BCN|
+--------------------+------+-----------+

