In [None]:
!pip install pyspark
!apt-get install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

!pip install graphframes

import pyspark
from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql.functions import col, lit, when
from graphframes import GraphFrame

import pyspark.sql.functions as F
from math import radians, cos, sin, asin, sqrt

Selecting previously unselected package libxtst6:amd64.
(Reading database ... 123629 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u422-b05-1~22.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u422-b05-1~22.04) ...
Selecting previously unselected package openjdk-8-jdk-headless:amd64.
Preparing to unpack .../openjdk-8-jdk-headless_8u422-b05-1~22.04_amd64.deb ...
Unpacking openjdk-8-jdk-headless:amd64 (8u422-b05-1~22.04) ...
Setting up libxtst6:amd64 (2:1.2.3-1build4) ...
Setting up openjdk-8-jre-headless:amd64 (8u422-b05-1~22.04) ...
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/orbd to provide /usr/bin/orbd (orbd) in auto mode
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/servertool to provide /usr/bin/serv

In [None]:
spark = SparkSession.builder.master("local[*]").config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12").getOrCreate()

In [None]:
def createTableFromCsv(tableName, fileLocation):
  infer_schema = True
  first_row_is_header = True
  delimiter = ","

  df = spark.read.format("csv") \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .load(fileLocation)

  df.createOrReplaceTempView(tableName)

In [None]:
createTableFromCsv("airlines", "/content/data/airlines.dat")
createTableFromCsv("airports", "/content/data/airports.dat")
createTableFromCsv("routes", "/content/data/routes.dat")

In [None]:
airlines = spark.sql("SELECT * FROM airlines")
airlines.show()

+---------+--------------------+-----+----+----+---------------+--------------+------+
|AirlineId|                Name|Alias|IATA|ICAO|       CallSign|       Country|Active|
+---------+--------------------+-----+----+----+---------------+--------------+------+
|        1|      Private flight|   \N|   -| N/A|           NULL|          NULL|     Y|
|        2|         135 Airways|   \N|NULL| GNL|        GENERAL| United States|     N|
|        3|       1Time Airline|   \N|  1T| RNX|        NEXTIME|  South Africa|     Y|
|        4|2 Sqn No 1 Elemen...|   \N|NULL| WYT|           NULL|United Kingdom|     N|
|        5|     213 Flight Unit|   \N|NULL| TFU|           NULL|        Russia|     N|
|        6|223 Flight Unit S...|   \N|NULL| CHD| CHKALOVSK-AVIA|        Russia|     N|
|        7|   224th Flight Unit|   \N|NULL| TTF|     CARGO UNIT|        Russia|     N|
|        8|         247 Jet Ltd|   \N|NULL| TWF|   CLOUD RUNNER|United Kingdom|     N|
|        9|         3D Aviation|   \N|NULL|

In [None]:
airports = spark.sql("SELECT * FROM airports")
airports.show()

+---------+--------------------+--------------+----------------+----+----+------------------+-------------------+--------+--------+---+--------------------+-------+-----------+
|AirportId|                Name|          City|         Country|IATA|ICAO|          Latitude|          Longitude|Altitude|Timezone|DST|        TimezoneName|   Type|     Source|
+---------+--------------------+--------------+----------------+----+----+------------------+-------------------+--------+--------+---+--------------------+-------+-----------+
|        1|      Goroka Airport|        Goroka|Papua New Guinea| GKA|AYGA|-6.081689834590001|      145.391998291|    5282|      10|  U|Pacific/Port_Moresby|airport|OurAirports|
|        2|      Madang Airport|        Madang|Papua New Guinea| MAG|AYMD|    -5.20707988739|      145.789001465|      20|      10|  U|Pacific/Port_Moresby|airport|OurAirports|
|        3|Mount Hagen Kagam...|   Mount Hagen|Papua New Guinea| HGU|AYMH|-5.826789855957031| 144.29600524902344|  

In [None]:
routes = spark.sql("SELECT * FROM routes")
routes.show()

+-------+---------+-------------+---------------+------------------+--------------------+---------+-----+---------+
|Airline|AirlineId|SourceAirport|SourceAirportId|DestinationAirport|DestinationAirportId|Codeshare|Stops|Equipment|
+-------+---------+-------------+---------------+------------------+--------------------+---------+-----+---------+
|     2B|      410|          AER|           2965|               KZN|                2990|     NULL|    0|      CR2|
|     2B|      410|          ASF|           2966|               KZN|                2990|     NULL|    0|      CR2|
|     2B|      410|          ASF|           2966|               MRV|                2962|     NULL|    0|      CR2|
|     2B|      410|          CEK|           2968|               KZN|                2990|     NULL|    0|      CR2|
|     2B|      410|          CEK|           2968|               OVB|                4078|     NULL|    0|      CR2|
|     2B|      410|          DME|           4029|               KZN|    

# **Task 1**
Find the airline with the largest total flight distance. Do the same for the smallest total flight distance..

In [None]:
def haversine(lat1, lon1, lat2, lon2):
    R = 6371.0
    delta_lat = radians(lat2 - lat1)
    delta_lon = radians(lon2 - lon1)

    a = sin(delta_lat / 2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(delta_lon / 2) ** 2
    c = 2 * asin(sqrt(a))

    distance = R * c

    return distance

haversine_udf = F.udf(haversine, "double")


In [None]:
routesWithDistance = routes.join(airports, routes.SourceAirportId == airports.AirportId) \
  .select("Airline", "AirlineId", "SourceAirport", "SourceAirportId",
          "DestinationAirport", "DestinationAirportId",
          "Codeshare", "Stops", "Equipment",
          F.col("Latitude").cast("double").alias("SourceLatitude"),
          F.col("Longitude").cast("double").alias("SourceLongitude")) \
  .join(airports, routes.DestinationAirportId == airports.AirportId) \
  .select("Airline", "AirlineId", "SourceAirport", "SourceAirportId",
          "DestinationAirport", "DestinationAirportId",
          "Codeshare", "Stops", "Equipment",
          "SourceLatitude", "SourceLongitude",
          F.col("Latitude").cast("double").alias("DestinationLatitude"),
          F.col("Longitude").cast("double").alias("Destinationongitude")) \
  .withColumn("Distance", haversine_udf(
    F.col("SourceLatitude"), F.col("SourceLongitude"), F.col("DestinationLatitude"), F.col("Destinationongitude")
  )) \
  .select("Airline", "AirlineId", "SourceAirport", "SourceAirportId",
          "DestinationAirport", "DestinationAirportId",
          "Codeshare", "Stops", "Equipment", "Distance")

routesWithDistance.show()

+-------+---------+-------------+---------------+------------------+--------------------+---------+-----+---------+------------------+
|Airline|AirlineId|SourceAirport|SourceAirportId|DestinationAirport|DestinationAirportId|Codeshare|Stops|Equipment|          Distance|
+-------+---------+-------------+---------------+------------------+--------------------+---------+-----+---------+------------------+
|     2B|      410|          AER|           2965|               KZN|                2990|     NULL|    0|      CR2|1506.8256414050907|
|     2B|      410|          ASF|           2966|               KZN|                2990|     NULL|    0|      CR2|1040.4383197669076|
|     2B|      410|          ASF|           2966|               MRV|                2962|     NULL|    0|      CR2|  448.164908709546|
|     2B|      410|          CEK|           2968|               KZN|                2990|     NULL|    0|      CR2| 770.5085001497806|
|     2B|      410|          CEK|           2968|      

Let's prepare a graph that we will work with.

In [None]:
vertices = airports.selectExpr(
  "AirportId as id",
  "Name",
  "City",
  "Country",
  "IATA",
  "ICAO",
  "Latitude",
  "Longitude",
  "Altitude",
  "Timezone",
  "DST",
  "TimezoneName",
  "Type",
  "Source"
)

edges = routesWithDistance.selectExpr(
    "SourceAirportId as src",
    "DestinationAirportId as dst",
    "AirlineId",
    "Distance"
)

g = GraphFrame(vertices, edges)
g.edges.show()



+----+----+---------+------------------+
| src| dst|AirlineId|          Distance|
+----+----+---------+------------------+
|2965|2990|      410|1506.8256414050907|
|2966|2990|      410|1040.4383197669076|
|2966|2962|      410|  448.164908709546|
|2968|2990|      410| 770.5085001497806|
|2968|4078|      410|1338.6314665862067|
|4029|2990|      410| 715.6493504840489|
|4029|6969|      410| 892.3827877543457|
|4029|6160|      410| 951.4321982829719|
|6156|2952|      410|1171.8814952837668|
|6156|2990|      410| 1008.253110089581|
|2922|6969|      410|1685.4265243719856|
|2952|6156|      410|1171.8814952837668|
|2990|2965|      410|1506.8256414050907|
|2990|2966|      410|1040.4383197669076|
|2990|2968|      410| 770.5085001497806|
|2990|4029|      410| 715.6493504840489|
|2990|6156|      410| 1008.253110089581|
|2990|2948|      410| 1216.844567482482|
|2990|2975|      410| 723.5232035200229|
|2948|2990|      410| 1216.844567482482|
+----+----+---------+------------------+
only showing top

In [None]:
airlineDistances = g.edges.groupBy("AirlineId").agg(F.sum("Distance").alias("TotalDistance"))

maxDistanceAirline = airlineDistances.orderBy(F.desc("TotalDistance")).limit(1)
minDistanceAirline = airlineDistances.orderBy(F.asc("TotalDistance")).limit(1)

maxDistanceAirline.show()
minDistanceAirline.show()

+---------+-----------------+
|AirlineId|    TotalDistance|
+---------+-----------------+
|       24|5433778.009419079|
+---------+-----------------+

+---------+----------------+
|AirlineId|   TotalDistance|
+---------+----------------+
|    18700|38.1607383153748|
+---------+----------------+



# **Task 2**
Find all possible flights between Poland and Belgium (with no more than 2 stopovers) and a path length of <5. Use motif and other solution approaches, then compare their results in terms of performance and implementation complexity.

Let's search for all possible routes using the **motif** approach.

In [None]:
def find_routes_motif(start_country, end_country, max_transfers):
    def create_path_pattern(num_transfers):
        return ";".join([f"(v{index})-[e{index}]->(v{index + 1})" for index in range(num_transfers + 1)])

    def create_filters(num_transfers):
        if num_transfers == 0:
            return ""
        return " AND ".join([f"v{index + 1} != v{index - 1}" for index in range(1, num_transfers + 1)])

    for i in range(max_transfers + 1):
        path_pattern = create_path_pattern(i)
        print(f"Routes for path: {path_pattern}")

        flights = g.find(path_pattern).filter(
            f"(v0.Country = '{start_country}') AND (v{i + 1}.Country = '{end_country}')"
        )

        filters = create_filters(i)
        if filters:
            flights = flights.filter(filters)

        flights = flights.drop_duplicates()
        flights.show()

find_routes_motif("Poland", "Belgium", 2)


Routes for path: (v0)-[e0]->(v1)
+--------------------+--------------------+--------------------+
|                  v0|                  e0|                  v1|
+--------------------+--------------------+--------------------+
|{679, Warsaw Chop...|{679, 304, 5461, ...|{304, Brussels So...|
|{668, Gdańsk Lech...|{668, 302, 2245, ...|{302, Brussels Ai...|
|{8414, Modlin Air...|{8414, 304, 4296,...|{304, Brussels So...|
|{669, John Paul I...|{669, 304, 4296, ...|{304, Brussels So...|
|{680, Copernicus ...|{680, 302, 2245, ...|{302, Brussels Ai...|
|{679, Warsaw Chop...|{679, 302, 3210, ...|{302, Brussels Ai...|
+--------------------+--------------------+--------------------+

Routes for path: (v0)-[e0]->(v1);(v1)-[e1]->(v2)
+--------------------+--------------------+--------------------+--------------------+--------------------+
|                  v0|                  e0|                  v1|                  e1|                  v2|
+--------------------+--------------------+----------

Let's search for all possible routes using an **iterative approach**.

In [None]:
def find_routes_iterative(start_country, end_country, max_transfers):
    start_airports = g.vertices.filter(F.col("Country") == start_country).select("id")

    routes = g.edges.join(start_airports, g.edges.src == start_airports.id) \
        .select(F.concat_ws("->", F.col("src"), F.col("dst")).alias("path"), F.col("dst").alias("current_airport"))

    for transfer in range(max_transfers):
        new_routes = routes.alias("r") \
            .join(g.edges.alias("e"), F.col("r.current_airport") == F.col("e.src")) \
            .select(F.concat_ws("->", F.col("r.path"), F.col("e.dst")).alias("path"),
                    F.col("e.dst").alias("current_airport"))

        routes = routes.union(new_routes)

    end_airports = g.vertices.filter(F.col("Country") == end_country).select("id")

    valid_routes = routes.join(end_airports, routes.current_airport == end_airports.id) \
                         .select("path")

    valid_routes.drop_duplicates().show(truncate=False)

find_routes_iterative("Poland", "Belgium", 2)


+---------------+
|path           |
+---------------+
|8414->304      |
|679->304       |
|668->302       |
|680->302       |
|679->302       |
|669->304       |
|669->1198->304 |
|675->478->302  |
|8414->1515->304|
|668->478->304  |
|669->523->302  |
|675->523->302  |
|679->3953->302 |
|668->535->304  |
|680->1606->302 |
|679->580->302  |
|679->340->302  |
|8414->1218->304|
|8414->1562->304|
|8414->3998->310|
+---------------+
only showing top 20 rows



# **Task 3**
Find the airports with the **fewest** and **most** flights.

In [None]:
outDegrees = g.outDegrees
inDegress = g.inDegrees

totalDegrees = outDegrees.join(inDegress, "id", "outer") \
    .na.fill(0, ["outDegree", "inDegree"]) \
    .withColumn("totalDegree", F.col("outDegree") + F.col("inDegree"))

maxDegreeValue = totalDegrees.agg(F.max("totalDegree")).collect()[0][0]
minDegreeValue = totalDegrees.agg(F.min("totalDegree")).collect()[0][0]

maxDegreeVertices = totalDegrees.filter(F.col("totalDegree") == maxDegreeValue)
minDegreeVertices = totalDegrees.filter(F.col("totalDegree") == minDegreeValue)

Airports with the **fewest** flights.

In [None]:
minDegreeVertices.join(airports, maxDegreeVertices.id == airports.AirportId) \
  .select("AirportId", "Name", F.col("totalDegree").alias("RoutesCount")).show()

+---------+--------------------+-----------+
|AirportId|                Name|RoutesCount|
+---------+--------------------+-----------+
|     1040|     Kalemie Airport|          1|
|     1463|    Filippos Airport|          1|
|     1602|Ovda Internationa...|          1|
|     2697|Teniente Coronel ...|          1|
|     3968|Massawa Internati...|          1|
|     4125|       Utila Airport|          1|
|     5653|       Lodja Airport|          1|
|     5884|Niue Internationa...|          1|
|     6321|    Portland Airport|          1|
|     6372| Dalanzadgad Airport|          1|
|     6766| Port Heiden Airport|          1|
|     7152|North Whale Seapl...|          1|
|     7158| Pilot Point Airport|          1|
|     7161|      Karluk Airport|          1|
|     7172|Port Williams Sea...|          1|
|     7176|Zachar Bay Seapla...|          1|
|     7369|Santana do Aragua...|          1|
|     7370|      Breves Airport|          1|
|     7374|Santa Terezinha A...|          1|
|     8207

Airports with the **most** flights.

In [None]:
maxDegreeVertices.join(airports, maxDegreeVertices.id == airports.AirportId) \
  .select("AirportId", "Name", F.col("totalDegree").alias("RoutesCount")).show()

+---------+--------------------+-----------+
|AirportId|                Name|RoutesCount|
+---------+--------------------+-----------+
|     3682|Hartsfield Jackso...|       1826|
+---------+--------------------+-----------+



# **Task 4**
Find the **shortest** and **longest** route between two given airports, considering the flight **distance** as the route length. The search should include routes with **layovers** (up to **4 stopovers**, meaning a maximum of **5 flights**).

In [None]:
def find_routes_with_bfs(source_airport_iata, destination_airport_iata, max_transfers):
    possible_routes = g.bfs(
        fromExpr=f"IATA = '{source_airport_iata}'",
        toExpr=f"IATA = '{destination_airport_iata}'",
        maxPathLength=max_transfers + 1
    )

    distance_columns = [f"e{i}" for i in range(max_transfers + 1) if f"e{i}" in possible_routes.columns]

    routes_with_total_distance = possible_routes.withColumn(
        "total_distance",
        sum(F.expr(f"{col}['Distance']") for col in distance_columns)
    )

    shortest_route = routes_with_total_distance.orderBy(F.col("total_distance").asc()).first()

    print(f"Shortest route distance: {shortest_route['total_distance']} km")
    print(shortest_route)


def find_routes(source_airport_iata, destination_airport_iata, max_transfers):
    path_patterns = []
    filters = []

    for i in range(max_transfers + 1):
        path_pattern = ";".join(f"(v{j})-[e{j}]->(v{j + 1})" for j in range(i + 1))
        path_patterns.append(path_pattern)

        filter_condition = " AND ".join(f"v{j + 1} != v{j - 1}" for j in range(1, i + 1))
        filters.append(filter_condition)

    global_shortest_distance = None
    global_longest_distance = None

    for i, path_pattern in enumerate(path_patterns):
        possible_routes = g.find(path_pattern) \
            .filter(F.col("v0.IATA") == source_airport_iata) \
            .filter(F.col(f"v{i + 1}.IATA") == destination_airport_iata)

        if filters[i]:
            possible_routes = possible_routes.filter(filters[i])

        distance_columns = [f"e{j}" for j in range(i + 1) if f"e{j}" in possible_routes.columns]

        routes_with_total_distance = possible_routes.withColumn(
            "total_distance",
            sum(F.expr(f"{col}['Distance']") for col in distance_columns)
        )

        shortest_distance = routes_with_total_distance.orderBy(F.col("total_distance").asc()).first()
        if global_shortest_distance is None or global_shortest_distance["total_distance"] > shortest_distance["total_distance"]:
            global_shortest_distance = shortest_distance

        longest_distance = routes_with_total_distance.orderBy(F.col("total_distance").desc()).first()
        if global_longest_distance is None or global_longest_distance["total_distance"] < longest_distance["total_distance"]:
            global_longest_distance = longest_distance


    print(f"Shortest route distance: {global_shortest_distance['total_distance']} km")
    print(global_shortest_distance)
    print(f"Longest route distance: {global_longest_distance['total_distance']} km")
    print(global_longest_distance)


The **shortest path** found using the **BFS algorithm**:

In [None]:
find_routes_with_bfs("KBP", "FLR", 3)

Shortest route distance: 1700.271218171912 km
Row(from=Row(id=2939, Name='Boryspil International Airport', City='Kiev', Country='Ukraine', IATA='KBP', ICAO='UKBB', Latitude=50.345001220703125, Longitude=30.894699096679688, Altitude=427, Timezone='2', DST='E', TimezoneName='Europe/Kiev', Type='airport', Source='OurAirports'), e0=Row(src='2939', dst='1613', AirlineId='5282', Distance=1067.6599203771211), v1=Row(id=1613, Name='Vienna International Airport', City='Vienna', Country='Austria', IATA='VIE', ICAO='LOWW', Latitude=48.110298156738, Longitude=16.569700241089, Altitude=600, Timezone='1', DST='E', TimezoneName='Europe/Vienna', Type='airport', Source='OurAirports'), e1=Row(src='1613', dst='1563', AirlineId='214', Distance=632.611297794791), to=Row(id=1563, Name='Peretola Airport', City='Florence', Country='Italy', IATA='FLR', ICAO='LIRQ', Latitude=43.810001, Longitude=11.2051, Altitude=142, Timezone='1', DST='E', TimezoneName='Europe/Rome', Type='airport', Source='OurAirports'), tota

The **shortest and longest paths** found using the **motif approach**:

In [None]:
find_routes("KBP", "FLR", 3)

Shortest route distance: 1700.271218171912 km
Row(v0=Row(id=2939, Name='Boryspil International Airport', City='Kiev', Country='Ukraine', IATA='KBP', ICAO='UKBB', Latitude=50.345001220703125, Longitude=30.894699096679688, Altitude=427, Timezone='2', DST='E', TimezoneName='Europe/Kiev', Type='airport', Source='OurAirports'), e0=Row(src='2939', dst='1613', AirlineId='491', Distance=1067.6599203771211), v1=Row(id=1613, Name='Vienna International Airport', City='Vienna', Country='Austria', IATA='VIE', ICAO='LOWW', Latitude=48.110298156738, Longitude=16.569700241089, Altitude=600, Timezone='1', DST='E', TimezoneName='Europe/Vienna', Type='airport', Source='OurAirports'), e1=Row(src='1613', dst='1563', AirlineId='491', Distance=632.611297794791), v2=Row(id=1563, Name='Peretola Airport', City='Florence', Country='Italy', IATA='FLR', ICAO='LIRQ', Latitude=43.810001, Longitude=11.2051, Altitude=142, Timezone='1', DST='E', TimezoneName='Europe/Rome', Type='airport', Source='OurAirports'), total_d

# **Task 5**
Identify large airport clusters (at least 5 airports) based on flight connections. Then, analyze the results of clustering..

In [None]:
communities = g.labelPropagation(maxIter=10)

community_sizes = communities.groupBy("label").agg(F.count("id").alias("size"))

print("The list of  graphs clusters and their sizes");
community_sizes.filter(F.col("size") >= 5).orderBy(F.col("size").asc()).show()

large_communities = communities.join(
    community_sizes.filter(F.col("size") >= 5),
    on="label"
)

large_communities_summary = large_communities.select("label", "IATA", "size") \
    .groupBy("label").agg(
        F.concat_ws(" ", F.collect_set("IATA")).alias("airports"),
        F.max("size").alias("size")
    ).orderBy(F.col("size").asc(), F.col("label").asc())

large_communities_summary.show(truncate=False)


Список кластерів графа і їх розмірів:
+-----+----+
|label|size|
+-----+----+
| 7424|   5|
|  897|   5|
| 1638|   5|
| 7239|   5|
| 3794|   6|
| 7111|   6|
| 3043|   6|
| 5520|   6|
| 3322|   6|
| 3576|   6|
|  155|   6|
|  916|   6|
| 2851|   6|
| 3304|   6|
| 3615|   7|
|  532|   7|
|  149|   7|
| 7146|   7|
| 6374|   7|
| 2003|   7|
+-----+----+
only showing top 20 rows

+-----+---------------------------+----+
|label|airports                   |size|
+-----+---------------------------+----+
|897  |POG DLA MDK GOU LJA        |5   |
|1638 |FMI TSH GOM KGA BKY        |5   |
|7239 |SLQ RDV CKD CHU SRV        |5   |
|7424 |KTL MRE LKG ASV UKA        |5   |
|155  |YUB YSY YPC YGH ZFM YVQ    |6   |
|916  |RRG RUN TNR ZSE WMR TLE    |6   |
|2851 |SNV CAJ VLN PZO CCS PBL    |6   |
|3043 |DIB IXS GAU PBH CCU IMF    |6   |
|3304 |LBP LKH BBN LGL MUR ODN    |6   |
|3322 |AUU KWM ONG EDR IRG CUQ    |6   |
|3576 |TME RVE ACR LET SVI LCR    |6   |
|3794 |EGX KCL WSN IGG KPV AKN    |6   |
|5520 |YQ

Airports clusters:

In [None]:
large_communities_summary.show(truncate=False)

+-----+---------------------------+----+
|label|airports                   |size|
+-----+---------------------------+----+
|897  |POG DLA GOU LJA MDK        |5   |
|1638 |FMI TSH GOM KGA BKY        |5   |
|7239 |SLQ RDV CKD CHU SRV        |5   |
|7424 |KTL MRE LKG ASV UKA        |5   |
|155  |YUB YSY YPC YGH ZFM YVQ    |6   |
|916  |RRG RUN ZSE TNR WMR TLE    |6   |
|2851 |SNV CAJ VLN PZO CCS PBL    |6   |
|3043 |DIB IXS GAU PBH CCU IMF    |6   |
|3304 |BBN LBP LKH LGL MUR ODN    |6   |
|3322 |AUU KWM ONG EDR CUQ IRG    |6   |
|3576 |TME RVE ACR LET SVI LCR    |6   |
|3794 |EGX KCL WSN IGG KPV AKN    |6   |
|5520 |YQC AKV YPH YTQ XGR YZG    |6   |
|7111 |HUS GAL NUL HSL KYU KAL    |6   |
|149  |YKU YUY YMO ZEM YPO YMT ZKE|7   |
|169  |KEW YRL YHP ZSJ YPM YNO YVZ|7   |
|532  |KOI NRL PPW EOI SOY NDY WRY|7   |
|2003 |TGJ TOU KNQ ILP KOC MEE UVE|7   |
|3615 |WAA UNK KKA OME SKK GLV SMK|7   |
|6374 |COQ ULO BYN MXV HVD DLZ ULG|7   |
+-----+---------------------------+----+
only showing top

# **Task 6**
Find subgraphs with at least 2 nodes that are connected to each other but have no connections to other airports.

In [None]:

spark.sparkContext.setCheckpointDir(dirName="/content/temp/graphframes_cps")

components = g.connectedComponents()

component_sizes = components.groupBy("component").agg(F.count("id").alias("size")) \
    .filter(F.col("size") >= 2)


print("Список компонентів графа і їх розмірів:")
component_sizes.orderBy(F.col("component").asc()).show()


subgraphs = components.join(F.broadcast(component_sizes), on="component") \
    .select("component", "IATA", "size") \
    .groupBy("component").agg(
        F.concat_ws(" ", F.collect_set("IATA")).alias("airports"),
        F.max("size").alias("size")
    ).orderBy(F.col("size").asc(), F.col("component").asc())

subgraphs.show(truncate=False)


Список компонентів графа і їх розмірів:
+---------+----+
|component|size|
+---------+----+
|        1|3304|
|     1998|  10|
|     3726|   4|
|     3860|   4|
|     5642|   4|
|     6448|   2|
|     7309|   2|
+---------+----+

+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Isolated graphs

In [None]:
subgraphs.show()

+---------+--------------------+----+
|component|            airports|size|
+---------+--------------------+----+
|     6448|             GCW BLD|   2|
|     7309|             SSB SPB|   2|
|     3726|     ESD CLM BFI FRD|   4|
|     3860|     AKB DUT KQA IKO|   4|
|     5642|     MPA ERS NDU OND|   4|
|     1998|TGJ TOU GEA KNQ L...|  10|
|        1|BRW ANS TPQ OLA M...|3304|
+---------+--------------------+----+

