The purpose of this file is to construct a DAG (directed acyclic graph) from a user-written Spark program.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

sc = spark.sparkContext

In [140]:
# Insert user-written code in this cell

SEP = ","

def remove_header(rdd):
    header = rdd.take(1)[0]
    return rdd.filter(lambda x: x != header)

# Get a table containing the latitude and longitude coordinates of each airport
airports_data = sc.textFile("/data/airports_data.csv")
airports_data = remove_header(airports_data)
# Remember that latitudes and longitudes are swapped
lat_long_timezone = airports_data.map(lambda x: (float(x.split(SEP)[4]), float(x.split(SEP)[3]), x.split(SEP)[5]))

lat_long_timezone_doubled = lat_long_timezone.map(lambda x: (x[0] * 2, x[1] * 2, x[2]))
timezone_lat_long_doubled = lat_long_timezone_doubled.map(lambda x: (x[2], (x[0], x[1])))
total_by_timezone = timezone_lat_long_doubled.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

In [141]:
# Use this cell to assign the last RDD in the user-written program to the variable final_rdd for the DAG generator to use
final_rdd = total_by_timezone

In [154]:
# Generate a DAG of the user program
previous = final_rdd._jrdd.dependencies.container
previous.count()

8

In [152]:
total_by_timezone.collect()

[('Asia/Sakhalin', (93.7773971557618, 285.436004638672)),
 ('Europe/Moscow', (4763.7318579295625, 3758.1768314548453)),
 ('Asia/Yekaterinburg', (2617.237235239701, 2911.494415856768)),
 ('Europe/Samara', (530.5711975097352, 497.3150024414366)),
 ('Asia/Anadyr', (129.4698028564454, 355.481994628906)),
 ('Asia/Novosibirsk', (110.025199890136, 165.301406860352)),
 ('Asia/Magadan', (119.8219985961914, 301.440002441406)),
 ('Asia/Chita', (104.052598, 226.612)),
 ('Asia/Irkutsk', (550.8784103393543, 1043.640014648428)),
 ('Asia/Krasnoyarsk', (906.6271972656834, 1395.7023925781223)),
 ('Asia/Omsk', (109.9339981079102, 146.6210021972656)),
 ('Europe/Volgograd', (200.6949996948242, 180.7844009399414)),
 ('Asia/Yakutsk', (596.7353897094844, 1216.332000731822)),
 ('Asia/Vladivostok', (284.67199707031295, 808.5400085449239)),
 ('Asia/Kamchatka', (106.3358001708984, 316.907989501954)),
 ('Europe/Kaliningrad', (109.7799987792968, 41.1851997375488)),
 ('Asia/Novokuznetsk', (218.1630020141602, 345.968