In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org//dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q pyspark
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

import findspark
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lit, col, array, round, size, when, concat, concat_ws, array_except, element_at

[K     |████████████████████████████████| 212.4 MB 65 kB/s 
[K     |████████████████████████████████| 198 kB 29.1 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
findspark.init()
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("spark_TC") \
    .getOrCreate()

In [None]:
stations = [(0, "BAutogara"), \
            (1, "BVAutogara"), \
            (2, "SBAutogara"), \
            (3, "CJAutogara"), \
            (4, "MMAutogara"), \
            (5, "ISAutogara"), \
            (6, "CTAutogara"), \
            (7, "TMAutogara"), \
            (8, "BCAutogara"), \
            (9, "MSAutogara")]

stationsColumns = ["internal_bus_station_id", "public_bus_station"]
stationsDF = spark.createDataFrame(data=stations, schema=stationsColumns)

w = Window().orderBy(lit('A'))
stationsDF = stationsDF.withColumn("row_num", row_number().over(w))
stationsDF = stationsDF.select("row_num", "internal_bus_station_id", "public_bus_station")

In [None]:
trips = [("B", "MM", [0,2,4], [datetime(2020, 3, 1, 10, 10, 00), datetime(2020, 3, 1, 12, 20, 10), datetime(2020, 3, 1, 14, 10, 10)]), \
        ("BV", "IS", [1,8,3,5], [datetime(2020, 3, 1, 8, 10, 00), datetime(2020, 3, 1, 12, 20, 10), datetime(2020, 3, 1, 15, 10, 10), datetime(2020, 3, 1, 15, 45, 10)]), \
        ("TM", "CT", [7,2,9,4,6], [datetime(2020, 4, 1, 10, 45, 00), datetime(2020, 4, 1, 12, 20, 10), datetime(2020, 4, 1, 19, 30, 10), datetime(2020, 4, 1, 21, 30, 10), datetime(2020, 4, 1, 22, 00, 10)]), \
        ("CJ", "BC", [3,9,5,6,7,8], [datetime(2020, 5, 1, 7, 10, 00), datetime(2020, 5, 1, 12, 20, 10), datetime(2020, 5, 1, 13, 20, 10), datetime(2020, 5, 1, 14, 20, 10), datetime(2020, 5, 1, 15, 20, 10), datetime(2020, 5, 1, 21, 20, 10)])]

tripsColumns = ["origin", "destination", "internal_bus_stations_ids", "triptimes"]
tripsDF = spark.createDataFrame(data=trips, schema=tripsColumns)
w = Window().orderBy(lit('A'))
tripsDF = tripsDF.withColumn("row_num", row_number().over(w))
tripsDF = tripsDF.select("row_num", "origin", "destination", "internal_bus_stations_ids", "triptimes")

In [None]:
stationsDF.show()
# stationsDF.printSchema()
      
tripsDF = tripsDF.withColumn("duration", lit(round(element_at(tripsDF.triptimes, -1).cast("int") - tripsDF.triptimes[0].cast("int"))/60))
tripsDF = tripsDF.withColumn("duration", round(tripsDF["duration"],0))
tripsDF = tripsDF.withColumn("duration", concat(col("duration"), lit(" min")))
# tripsDF = tripsDF.withColumn("duration_in_h", lit(element_at(tripsDF.triptimes, -1) - tripsDF.triptimes[0]))
tripsDF = tripsDF.withColumn("unique_key", concat_ws("", col("internal_bus_stations_ids")))
tripsDF.show(truncate=False)
# tripsDF.printSchema()
stations = stationsDF.alias("stations")
trips = tripsDF.alias("trips")

+-------+-----------------------+------------------+
|row_num|internal_bus_station_id|public_bus_station|
+-------+-----------------------+------------------+
|      1|                      0|         BAutogara|
|      2|                      1|        BVAutogara|
|      3|                      2|        SBAutogara|
|      4|                      3|        CJAutogara|
|      5|                      4|        MMAutogara|
|      6|                      5|        ISAutogara|
|      7|                      6|        CTAutogara|
|      8|                      7|        TMAutogara|
|      9|                      8|        BCAutogara|
|     10|                      9|        MSAutogara|
+-------+-----------------------+------------------+

+-------+------+-----------+-------------------------+------------------------------------------------------------------------------------------------------------------------------+---------+----------+
|row_num|origin|destination|internal_bus_stations_ids|

Identify the arrays length and the maximum length from the `internal_bus_station_ids` column for a dynamic number of bus stops, in order to prevent hardcoding the array elements that will be selected.

In [None]:
columns = trips.select(trips.internal_bus_stations_ids, size("internal_bus_stations_ids").alias("size"))
columns.show()
max = columns.agg({"size": "max"}).collect()[0]
maxArrLength = max["max(size)"]

+-------------------------+----+
|internal_bus_stations_ids|size|
+-------------------------+----+
|                [0, 2, 4]|   3|
|             [1, 8, 3, 5]|   4|
|          [7, 2, 9, 4, 6]|   5|
|       [3, 9, 5, 6, 7, 8]|   6|
+-------------------------+----+



Create a custom string to be passed to `spark.sql()` for retrieving and splitting all array elements into individual columns, regadless of the bus stations count.

In [None]:
queryString = ""
tempViewName = "trips"
trips.createTempView(tempViewName)

for i in range(maxArrLength):
    queryString += f"{tempViewName}.internal_bus_stations_ids[{str(i)}] as column_{i+1}"
    if i != maxArrLength - 1:
        queryString += ", "

print(f"Dynamic SQL query: \n <SELECT {queryString} FROM {tempViewName}>")

Dynamic SQL query: 
 <SELECT trips.internal_bus_stations_ids[0] as column_1, trips.internal_bus_stations_ids[1] as column_2, trips.internal_bus_stations_ids[2] as column_3, trips.internal_bus_stations_ids[3] as column_4, trips.internal_bus_stations_ids[4] as column_5, trips.internal_bus_stations_ids[5] as column_6 FROM trips>


Split the internal_bus_stations_ids arrays into individual columns in a temporary table `tripsNameDF`.

In [None]:
tripsNameDF = spark.sql(f"SELECT {queryString} FROM {tempViewName}")
tripsNameDF.show()

+--------+--------+--------+--------+--------+--------+
|column_1|column_2|column_3|column_4|column_5|column_6|
+--------+--------+--------+--------+--------+--------+
|       0|       2|       4|    null|    null|    null|
|       1|       8|       3|       5|    null|    null|
|       7|       2|       9|       4|       6|    null|
|       3|       9|       5|       6|       7|       8|
+--------+--------+--------+--------+--------+--------+



**No longer in use.** Hardcoded array indexes PySpark SELECT and JOIN queries.

Use the individual IDs as maching terms for the left joins against the original `stations` data set.

In [None]:
columnNames = tripsNameDF.schema.names
joinedName = ""

for name in columnNames:
    joinedName = name + "_public"
    tripsNameDF = tripsNameDF.join(stations, tripsNameDF[name] == stations.internal_bus_station_id, how="left") \
            .select(tripsNameDF["*"], stations["public_bus_station"].alias(joinedName))

tripsNameDF = tripsNameDF.na.fill("")
tripsNameDF.show()

+--------+--------+--------+--------+--------+--------+---------------+---------------+---------------+---------------+---------------+---------------+
|column_1|column_2|column_3|column_4|column_5|column_6|column_1_public|column_2_public|column_3_public|column_4_public|column_5_public|column_6_public|
+--------+--------+--------+--------+--------+--------+---------------+---------------+---------------+---------------+---------------+---------------+
|       0|       2|       4|    null|    null|    null|      BAutogara|     SBAutogara|     MMAutogara|               |               |               |
|       1|       8|       3|       5|    null|    null|     BVAutogara|     BCAutogara|     CJAutogara|     ISAutogara|               |               |
|       7|       2|       9|       4|       6|    null|     TMAutogara|     SBAutogara|     MSAutogara|     MMAutogara|     CTAutogara|               |
|       3|       9|       5|       6|       7|       8|     CJAutogara|     MSAutogara| 

Add the matching values in an array, saved in a new column called "public_bus_stops"

In [None]:
columnNames = tripsNameDF.schema.names
internal = []
public = []

for name in columnNames:
    if name.find("public") != -1:
        public.append(name)
    else:
        internal.append(name)

tripsNameDF = tripsNameDF.select(array(internal).alias("internal_bus_stations"), \
                                array_except(array(public), array(lit(""))).alias("public_bus_stops"))

tripsNameDF = tripsNameDF.withColumn("unique_key_public_stops", concat_ws("", col("internal_bus_stations")))

tripsNameDF.show(truncate=False)

+---------------------------+------------------------------------------------------------------------+-----------------------+
|internal_bus_stations      |public_bus_stops                                                        |unique_key_public_stops|
+---------------------------+------------------------------------------------------------------------+-----------------------+
|[0, 2, 4, null, null, null]|[BAutogara, SBAutogara, MMAutogara]                                     |024                    |
|[1, 8, 3, 5, null, null]   |[BVAutogara, BCAutogara, CJAutogara, ISAutogara]                        |1835                   |
|[7, 2, 9, 4, 6, null]      |[TMAutogara, SBAutogara, MSAutogara, MMAutogara, CTAutogara]            |72946                  |
|[3, 9, 5, 6, 7, 8]         |[CJAutogara, MSAutogara, ISAutogara, CTAutogara, TMAutogara, BCAutogara]|395678                 |
+---------------------------+------------------------------------------------------------------------+---------

In [None]:
columns = trips.select(trips.internal_bus_stations_ids.alias("internal_bus_stations"), trips.triptimes, size("triptimes").alias("size"))
columns.show(truncate=False)
max = columns.agg({"size": "max"}).collect()[0]
maxArrLength = max["max(size)"]

+---------------------+------------------------------------------------------------------------------------------------------------------------------+----+
|internal_bus_stations|triptimes                                                                                                                     |size|
+---------------------+------------------------------------------------------------------------------------------------------------------------------+----+
|[0, 2, 4]            |[2020-03-01 10:10:00, 2020-03-01 12:20:10, 2020-03-01 14:10:10]                                                               |3   |
|[1, 8, 3, 5]         |[2020-03-01 08:10:00, 2020-03-01 12:20:10, 2020-03-01 15:10:10, 2020-03-01 15:45:10]                                          |4   |
|[7, 2, 9, 4, 6]      |[2020-04-01 10:45:00, 2020-04-01 12:20:10, 2020-04-01 19:30:10, 2020-04-01 21:30:10, 2020-04-01 22:00:10]                     |5   |
|[3, 9, 5, 6, 7, 8]   |[2020-05-01 07:10:00, 2020-05-01 12:20:10

In [None]:
tempViewName = "triptimes"
queryString = tempViewName + ".internal_bus_stations_ids as internal_bus_stations, "
trips.createTempView(tempViewName)

for i in range(maxArrLength):
    queryString += f"{tempViewName}.triptimes[{str(i)}] as column_{i+1}"
    if i != maxArrLength - 1:
        queryString += ", "

print(f"Dynamic SQL query: \n <SELECT {queryString} FROM {tempViewName}>")

Dynamic SQL query: 
 <SELECT triptimes.internal_bus_stations_ids as internal_bus_stations, triptimes.triptimes[0] as column_1, triptimes.triptimes[1] as column_2, triptimes.triptimes[2] as column_3, triptimes.triptimes[3] as column_4, triptimes.triptimes[4] as column_5, triptimes.triptimes[5] as column_6 FROM triptimes>


In [None]:
tripsDurationDF = spark.sql(f"SELECT {queryString} FROM {tempViewName}")
tripsDurationDF.show()

+---------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|internal_bus_stations|           column_1|           column_2|           column_3|           column_4|           column_5|           column_6|
+---------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|            [0, 2, 4]|2020-03-01 10:10:00|2020-03-01 12:20:10|2020-03-01 14:10:10|               null|               null|               null|
|         [1, 8, 3, 5]|2020-03-01 08:10:00|2020-03-01 12:20:10|2020-03-01 15:10:10|2020-03-01 15:45:10|               null|               null|
|      [7, 2, 9, 4, 6]|2020-04-01 10:45:00|2020-04-01 12:20:10|2020-04-01 19:30:10|2020-04-01 21:30:10|2020-04-01 22:00:10|               null|
|   [3, 9, 5, 6, 7, 8]|2020-05-01 07:10:00|2020-05-01 12:20:10|2020-05-01 13:20:10|2020-05-01 14:20:10|2020-05-01 15:20:10|2020-05-01 21

In [None]:
columnNames = tripsDurationDF.schema.names
maxIndex = len(columnNames)-1

tripsDurationDF = tripsDurationDF.withColumn("duration", \
                                             col(columnNames[len(columnNames)-1]).cast("int") - \
                                             col(columnNames[1]).cast("int"))
tripsDurationDF.show()

for i in range(maxIndex, 0, -1):
    tripsDurationDF = tripsDurationDF.withColumn("duration", \
                                            when(tripsDurationDF["duration"].isNull(), \
                                            col(columnNames[i]).cast("int") - col(columnNames[1]).cast("int")) \
                                            .otherwise(tripsDurationDF["duration"]))

tripsDurationDF = tripsDurationDF.withColumn("duration", round(tripsDurationDF["duration"])/60)
tripsDurationDF = tripsDurationDF.withColumn("duration", round(tripsDurationDF["duration"],2))
tripsDurationDF = tripsDurationDF.withColumn("duration", concat(col("duration"), lit(" min")))
tripsDurationDF.show(truncate=False)

+---------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+
|internal_bus_stations|           column_1|           column_2|           column_3|           column_4|           column_5|           column_6|duration|
+---------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+
|            [0, 2, 4]|2020-03-01 10:10:00|2020-03-01 12:20:10|2020-03-01 14:10:10|               null|               null|               null|    null|
|         [1, 8, 3, 5]|2020-03-01 08:10:00|2020-03-01 12:20:10|2020-03-01 15:10:10|2020-03-01 15:45:10|               null|               null|    null|
|      [7, 2, 9, 4, 6]|2020-04-01 10:45:00|2020-04-01 12:20:10|2020-04-01 19:30:10|2020-04-01 21:30:10|2020-04-01 22:00:10|               null|    null|
|   [3, 9, 5, 6, 7, 8]|2020-05-01 07:10:00|2020-05-01 12:20:10|2020-05-01 13:20:10

In [None]:
columnNames = tripsDurationDF.schema.names
timestamps = []

for i in range(1, len(columnNames)-1):
        timestamps.append(columnNames[i])

tripsDurationDF = tripsDurationDF.select(tripsDurationDF.internal_bus_stations, \
                                array(timestamps).alias("triptimes"), tripsDurationDF.duration)

tripsDurationDF.show(truncate=False)

+---------------------+------------------------------------------------------------------------------------------------------------------------------+----------+
|internal_bus_stations|triptimes                                                                                                                     |duration  |
+---------------------+------------------------------------------------------------------------------------------------------------------------------+----------+
|[0, 2, 4]            |[2020-03-01 10:10:00, 2020-03-01 12:20:10, 2020-03-01 14:10:10, null, null, null]                                             |240.17 min|
|[1, 8, 3, 5]         |[2020-03-01 08:10:00, 2020-03-01 12:20:10, 2020-03-01 15:10:10, 2020-03-01 15:45:10, null, null]                              |455.17 min|
|[7, 2, 9, 4, 6]      |[2020-04-01 10:45:00, 2020-04-01 12:20:10, 2020-04-01 19:30:10, 2020-04-01 21:30:10, 2020-04-01 22:00:10, null]               |675.17 min|
|[3, 9, 5, 6, 7, 8]   |[2020

In [None]:
trips = trips.join(tripsDurationDF, trips.internal_bus_stations_ids == tripsDurationDF.internal_bus_stations) \
        .select(trips["row_num"], trips["unique_key"], trips["internal_bus_stations_ids"], trips["origin"], trips["destination"], tripsDurationDF["duration"])
trips.orderBy(["row_num"]).show(truncate=False)

+-------+----------+-------------------------+------+-----------+----------+
|row_num|unique_key|internal_bus_stations_ids|origin|destination|duration  |
+-------+----------+-------------------------+------+-----------+----------+
|1      |024       |[0, 2, 4]                |B     |MM         |240.17 min|
|2      |1835      |[1, 8, 3, 5]             |BV    |IS         |455.17 min|
|3      |72946     |[7, 2, 9, 4, 6]          |TM    |CT         |675.17 min|
|4      |395678    |[3, 9, 5, 6, 7, 8]       |CJ    |BC         |850.17 min|
+-------+----------+-------------------------+------+-----------+----------+



In [None]:
trips = trips.join(tripsNameDF, trips.unique_key == tripsNameDF.unique_key_public_stops) \
        .select(trips["row_num"], trips["origin"], trips["destination"], tripsNameDF["public_bus_stops"], trips["duration"])
trips.orderBy(["row_num"]).show(truncate=False)

+-------+------+-----------+------------------------------------------------------------------------+----------+
|row_num|origin|destination|public_bus_stops                                                        |duration  |
+-------+------+-----------+------------------------------------------------------------------------+----------+
|1      |B     |MM         |[BAutogara, SBAutogara, MMAutogara]                                     |240.17 min|
|2      |BV    |IS         |[BVAutogara, BCAutogara, CJAutogara, ISAutogara]                        |455.17 min|
|3      |TM    |CT         |[TMAutogara, SBAutogara, MSAutogara, MMAutogara, CTAutogara]            |675.17 min|
|4      |CJ    |BC         |[CJAutogara, MSAutogara, ISAutogara, CTAutogara, TMAutogara, BCAutogara]|850.17 min|
+-------+------+-----------+------------------------------------------------------------------------+----------+

