In [83]:
airports_path = "/data/students/bigdata_internet/lab5/airports.csv"
airlines_path = "/data/students/bigdata_internet/lab5/airlines.csv"
routes_path = "/data/students/bigdata_internet/lab5/routes.csv"

In [84]:
airports_df = spark.read.load(airports_path, format = "csv", header = True, inferSchema = True)
airlines_df = spark.read.load(airlines_path, format = "csv", header = True, inferSchema = True)
routes_df = spark.read.load(routes_path, format = "csv", header = True, inferSchema = True)


In [85]:
def countAirports(country):
    isinstanceairport = 1
    return isinstanceairport

In [86]:
spark.udf.register("isAirportInstance", lambda country: countAirports(country),"integer")

<function __main__.<lambda>(country)>

In [87]:
country_total_airports_df = airports_df.selectExpr("country","isAirportInstance(country)").withColumnRenamed("isAirportInstance(country)","instance")

In [88]:
country_total_df = country_total_airports_df.groupBy("country").sum("instance").withColumnRenamed("sum(instance)","total_airports")

In [89]:
country_more_than_100 = country_total_df.filter("total_airports>=100")

In [90]:
country_more_than_100.show()

+--------------+--------------+
|       country|total_airports|
+--------------+--------------+
|        Russia|           264|
|       Germany|           249|
|        France|           217|
|         India|           148|
|         China|           241|
| United States|          1512|
|     Indonesia|           145|
|        Canada|           430|
|        Brazil|           264|
|         Japan|           123|
|     Australia|           334|
|United Kingdom|           167|
+--------------+--------------+



In [91]:
def countFlights(airline):
    isFlight = 1
    return isFlight

In [92]:
spark.udf.register("isFlight",lambda airline_id: countFlights(airline_id),"integer")

<function __main__.<lambda>(airline_id)>

In [93]:
airline_flights_df = routes_df.selectExpr("airline_id","isFLight(airline_id)").withColumnRenamed("isFLight(airline_id)","flight")

In [94]:
airline_total_flights_df = airline_flights_df.groupBy("airline_id").sum("flight").withColumnRenamed("sum(flight)","total_flights")

In [95]:
airline_names = airlines_df.select("airline_id","name","icao")

In [96]:
airline_joined = airline_total_flights_df.join(airline_names,airline_total_flights_df.airline_id == airline_names.airline_id, "inner").select("name","icao","total_flights")

In [97]:
airlines_sorted = airline_joined.sort("total_flights",ascending = False)

In [98]:
airlines_sorted.show(10)

+--------------------+----+-------------+
|                name|icao|total_flights|
+--------------------+----+-------------+
|             Ryanair| RYR|         2484|
|   American Airlines| AAL|         2354|
|     United Airlines| UAL|         2180|
|     Delta Air Lines| DAL|         1981|
|          US Airways| USA|         1960|
|China Southern Ai...| CSN|         1454|
|China Eastern Air...| CES|         1263|
|           Air China| CCA|         1260|
|  Southwest Airlines| SWA|         1146|
|             easyJet| EZY|         1130|
+--------------------+----+-------------+
only showing top 10 rows



In [99]:
airline_source_df = routes_df.selectExpr("airport_source","isFLight(airline_id)").withColumnRenamed("isFLight(airline_id)","flight_n")

In [100]:
airport_departures = airline_source_df.groupBy("airport_source").sum("flight_n").withColumnRenamed("sum(flight_n)","total_departures")

In [101]:
ordered_airport_departures = airport_departures.sort("total_departures",ascending = False)

In [102]:
airportName_iata = airports_df.select("name","iata")

In [103]:
airport_name_iata_departingflights = ordered_airport_departures\
.join(airportName_iata,ordered_airport_departures\
      .airport_source == airportName_iata.iata,"inner").select("name","iata","total_departures")

In [104]:
airport_name_iata_departingflights.show(10)

+--------------------+----+----------------+
|                name|iata|total_departures|
+--------------------+----+----------------+
|Hartsfield Jackso...| ATL|             915|
|Chicago O'Hare In...| ORD|             558|
|Beijing Capital I...| PEK|             535|
|London Heathrow A...| LHR|             527|
|Charles de Gaulle...| CDG|             524|
|Frankfurt am Main...| FRA|             497|
|Los Angeles Inter...| LAX|             492|
|Dallas Fort Worth...| DFW|             469|
|John F Kennedy In...| JFK|             456|
|Amsterdam Airport...| AMS|             453|
+--------------------+----+----------------+
only showing top 10 rows



In [105]:
def finderroneouslines(a_source_id,a_dest_id,a_source,a_dest):
    ERRORCODE = str('\\\\N')
    if a_source_id == ERRORCODE or a_dest_id == ERRORCODE or a_source == ERRORCODE or a_dest == ERRORCODE:
        return False
    else:
        return True

In [106]:
spark.udf.register("IsErroneous",lambda airport_source_id,airport_destination_id,airport_source,airport_destination: finderroneouslines(airport_source_id,airport_destination_id,airport_source,airport_destination),"boolean")

<function __main__.<lambda>(airport_source_id, airport_destination_id, airport_source, airport_destination)>

In [107]:
#erroneous_routes = routes_df.filter(lambda airport_source,airport_destination: IsErroneous(airport_source,airport_destination) )
non_erroneous_routes = routes_df.filter( "IsErroneous(airport_source_id,airport_destination_id,airport_source,airport_destination)" )

In [108]:
airports_for_nodes = airports_df.withColumn("id", airports_df.id.cast("string"))

In [109]:
routes_for_edges= routes_df.withColumn("airport_source_id", routes_df.airport_source_id.cast("string"))\
.withColumn("airport_destination_id", routes_df.airport_destination_id.cast("string")).withColumnRenamed("airport_source_id","src")\
.withColumnRenamed("airport_destination_id","dst")

In [110]:
nodes_df = airports_for_nodes
#nodes_df.show(1)
#nodes_df.printSchema()
edges_df = routes_for_edges
#edges_df.show(1)
#edges_df.printSchema()
from graphframes import GraphFrame
g = GraphFrame(nodes_df, edges_df)

In [111]:
airports_id_name = airports_df.select("id","name").withColumn("id", airports_df.id.cast("string")).withColumnRenamed("id","airport_id")

In [112]:
gInDeg = g.inDegrees
gOutDeg = g.outDegrees

In [113]:
gInDegSorted=gInDeg.sort("inDegree", ascending=False)
gInDegSorted_And_Name = gInDegSorted.join(airports_id_name,gInDegSorted.id == airports_id_name.airport_id,"inner").select("airport_id","name","inDegree")
gInDegSorted_And_Name.show(2)

gOutDegSorted=gOutDeg.sort("outDegree", ascending=False)
gOutDegSorted_And_Name = gOutDegSorted.join(airports_id_name,gOutDegSorted.id == airports_id_name.airport_id,"inner").select("airport_id","name","outDegree")
gOutDegSorted_And_Name.show(2)

+----------+--------------------+--------+
|airport_id|                name|inDegree|
+----------+--------------------+--------+
|      3682|Hartsfield Jackso...|     911|
|      3830|Chicago O'Hare In...|     550|
+----------+--------------------+--------+
only showing top 2 rows

+----------+--------------------+---------+
|airport_id|                name|outDegree|
+----------+--------------------+---------+
|      3682|Hartsfield Jackso...|      915|
|      3830|Chicago O'Hare In...|      558|
+----------+--------------------+---------+
only showing top 2 rows



In [114]:
motifs = g.find("(a)-[e]->(b)") 
turin_motif = motifs.filter("a.id = 1526")
turin_motif_distinct = turin_motif.select("a","b").distinct()
turin_motif_distinct.show(200)
turin_motif_distinct.count()

+--------------------+--------------------+
|                   a|                   b|
+--------------------+--------------------+
|[1526, Turin Airp...|[1606, Malta Inte...|
|[1526, Turin Airp...|[1508, Lamezia Te...|
|[1526, Turin Airp...|[1655, Iaşi Airpo...|
|[1526, Turin Airp...|[1515, Vincenzo F...|
|[1526, Turin Airp...|[1561, Naples Int...|
|[1526, Turin Airp...|[340, Frankfurt a...|
|[1526, Turin Airp...|[1218, Barcelona ...|
|[1526, Turin Airp...|[1520, Olbia Cost...|
|[1526, Turin Airp...|[1506, Brindisi –...|
|[1526, Turin Airp...|[502, London Gatw...|
|[1526, Turin Airp...|[548, London Stan...|
|[1526, Turin Airp...|[1555, Leonardo d...|
|[1526, Turin Airp...|[345, Düsseldorf ...|
|[1526, Turin Airp...|[1514, Reggio Cal...|
|[1526, Turin Airp...|[580, Amsterdam A...|
|[1526, Turin Airp...|[1509, Catania-Fo...|
|[1526, Turin Airp...|[1701, Atatürk In...|
|[1526, Turin Airp...|[1074, Mohammed V...|
|[1526, Turin Airp...|[1512, Falcone–Bo...|
|[1526, Turin Airp...|[1382, Cha

29

In [115]:
motifsTwoFlights = g.find("(a)-[e]->(b); (b)-[e2]->(c)") 
turin_motifTwoFlights = motifsTwoFlights.filter("a.id = 1526")
turin_motifTwoFlights.show(1)



turin_motifTwoFlights_distinct = turin_motifTwoFlights.select("a","c").distinct()
turin_motifTwoFlights_distinct.show(5)
turin_motifTwoFlights_distinct.count()


+--------------------+--------------------+--------------------+--------------------+--------------------+
|                   a|                   e|                   b|                  e2|                   c|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[1526, Turin Airp...|[4U, 2548, TRN, 1...|[345, Düsseldorf ...|[YM, 3539, DUS, 3...|[1741, Podgorica ...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row

+--------------------+--------------------+
|                   a|                   c|
+--------------------+--------------------+
|[1526, Turin Airp...|[1606, Malta Inte...|
|[1526, Turin Airp...|[286, Monastir Ha...|
|[1526, Turin Airp...|[2121, Esfahan Sh...|
|[1526, Turin Airp...|[3751, Denver Int...|
|[1526, Turin Airp...|[1206, Split Airp...|
+--------------------+--------------------+
only showing top 5 rows



590

In [116]:
motifsThreeFlights = g.find("(a)-[e]->(b); (b)-[e2]->(c); (c)-[e3]->(d)") 
turin_motifThreeFlights = motifsThreeFlights.filter("a.id = 1526")
turin_motifThreeFlights.show(1)


turin_motifThreeFlights_distinct = turin_motifThreeFlights.select("a","d").distinct()
turin_motifThreeFlights_distinct.show(5)
turin_motifThreeFlights_distinct.count()





+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                   a|                   e|                   b|                  e2|                   c|                  e3|                   d|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|[1526, Turin Airp...|[4U, 2548, TRN, 1...|[345, Düsseldorf ...|[YM, 3539, DUS, 3...|[1741, Podgorica ...|[YM, 3539, TGD, 1...|[1678, Zürich Air...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row

+--------------------+--------------------+
|                   a|                   d|
+--------------------+--------------------+
|[1526, Turin Airp...|[1606, Malta Inte...|
|[1526, Turin Airp...|[286, Monastir Ha...|
|[1526, Tur

2210

In [117]:
#list of landmark nodes
landmarks=["1526"]
results = g.shortestPaths(landmarks=landmarks)

In [118]:
res = results.select("id","name","city","country","distances")
res.show(1)
res.count()

+----+------------------+----------+---------+-----------+
|  id|              name|      city|  country|  distances|
+----+------------------+----------+---------+-----------+
|6240|Birdsville Airport|Birdsville|Australia|[1526 -> 7]|
+----+------------------+----------+---------+-----------+
only showing top 1 row



7698

In [119]:
def numberOfHops(dist_dict): 
    if len(dist_dict)>0:
        v = dist_dict["1526"]
    else:
        v = 0
    return (int(v))

In [120]:
spark.udf.register("NHops",lambda distances: numberOfHops(distances),"integer" )

<function __main__.<lambda>(distances)>

In [121]:
num_hops = res.selectExpr("id","name","city","country","distances","NHops(distances)").withColumnRenamed("NHops(distances)","n_hops")

In [122]:
num_hops.sort("n_hops",ascending = False).show(10)

+----+--------------------+------------+-------------+-----------+------+
|  id|                name|        city|      country|  distances|n_hops|
+----+--------------------+------------+-------------+-----------+------+
|5522|   Peawanuck Airport|   Peawanuck|       Canada|[1526 -> 8]|     8|
|5482|Attawapiskat Airport|Attawapiskat|       Canada|[1526 -> 7]|     7|
|  10|      Thule Air Base|       Thule|    Greenland|[1526 -> 7]|     7|
|8199|   Nightmute Airport|   Nightmute|United States|[1526 -> 7]|     7|
|6321|    Portland Airport|    Portland|    Australia|[1526 -> 7]|     7|
|6329|Thargomindah Airport|Thargomindah|    Australia|[1526 -> 7]|     7|
|6240|  Birdsville Airport|  Birdsville|    Australia|[1526 -> 7]|     7|
|5535|     Salluit Airport|     Salluit|       Canada|[1526 -> 7]|     7|
|6333|    Windorah Airport|    Windorah|    Australia|[1526 -> 6]|     6|
|5893|   Mota Lava Airport|       Ablow|      Vanuatu|[1526 -> 6]|     6|
+----+--------------------+-----------

In [123]:
#list of landmark nodes
landmarks=["1526","2537"]
results_2 = g.shortestPaths(landmarks=landmarks)
res_2 = results_2.select("id","name","city","country","distances")
#res_2.show(2)

In [124]:
def getdictofdistances(distances_map):
    if len(distances_map)>0:
        v = distances_map["2537"]
    else:
        v = 0
    return (int(v))

In [125]:
spark.udf.register("getHops", lambda distances: getdictofdistances(distances),"integer")

<function __main__.<lambda>(distances)>

In [126]:
turin_belo_n_hops = res_2.selectExpr("id","name","city","country","distances","getHops(distances)","NHops(distances)")\
.withColumnRenamed("getHops(distances)","n_hops_belo").withColumnRenamed("NHops(distances)","n_hops_turin").filter("n_hops_belo!=0 and  n_hops_turin!=0")

In [127]:
#turin_belo_n_hops.show(5)

In [128]:
turin_less_belo = turin_belo_n_hops.filter("n_hops_turin<n_hops_belo")
#turin_less_belo.show(5)
turin_less_belo.count()

1278

In [129]:
belo_less_turin = turin_belo_n_hops.filter("n_hops_turin > n_hops_belo")
#belo_less_turin.show(5)
belo_less_turin.count()

281

In [130]:
belo_same_turin = turin_belo_n_hops.filter("n_hops_turin = n_hops_belo")
#belo_same_turin.show(5)
belo_same_turin.count()

1608

In [None]:
sc.setCheckpointDir("tmp_ckpts")
gg = g.dropIsolatedVertices()
connected = gg.connectedComponents()

In [None]:
cc = connected.select("id", "component").filter("component>2").orderBy("component")

In [None]:
nComp=connected.select("component").distinct().count() 
print("Number of connected components: ", nComp)

In [None]:
subg = g.filterEdges("airline_iata == 'EN' or airline_iata =='H2'").dropIsolatedVertices()
print(g.vertices.count(), g.edges.count())
print(subg.vertices.count(), subg.edges.count())

In [None]:
from graphviz import Digraph
def vizGraph(edge_list,node_list): 
    Gplot=Digraph() 
    edges=edge_list.collect() 
    nodes=node_list.collect()
    for row in edges: 
        Gplot.edge(row['src'],row['dst'],label=row['airline_iata'])
    for row in nodes: 
        Gplot.node(row['id'],label=row['city'])
    return Gplot
Gplot=vizGraph(subg.edges,subg.vertices) 

In [None]:
Gplot