In [20]:
from pyspark import SparkContext
from math import sin, cos, sqrt, atan2, radians
from geopy.distance import geodesic, great_circle

In [21]:
sc = SparkContext.getOrCreate()

In [22]:
capitals_rdd = sc.textFile('datasets/Capitals.txt')

In [23]:
print(capitals_rdd.take(10))

['Afghanistan Flag Icon \tAfghanistan \tKabul \t34,53 \t69,17\t', 'Albania Flag Icon \tAlbania \tTirana \t41,33 \t19,82\t', 'Algeria Flag Icon \tAlgeria \tAlgiers \t36,75 \t3,04\t', 'American Samoa Flag Icon \tAmerican Samoa \tPago Pago \t-14,28 \t-170,70\t', 'Andorra Flag Icon \tAndorra \tAndorra la Vella \t42,51 \t1,52\t', 'Angola Flag Icon \tAngola \tLuanda \t-8,84 \t13,23\t', 'Anguilla Flag Icon \tAnguilla \tThe Valley \t18,22 \t-63,06\t', "Antigua and Barbuda Flag Icon \tAntigua and Barbuda \tSt. John's \t17,12 \t-61,85\t", 'Argentina Flag Icon \tArgentina \tBuenos Aires \t-34,61 \t-58,38\t', 'Armenia Flag Icon \tArmenia \tYerevan \t40,18 \t44,51\t']


# Parsing Country Information

In [24]:
def split_capitals_to_tuple(satir):
    arr = satir.split("\t")
    country = arr[1].strip()
    capital = arr[2].strip()
    lng = float(arr[3].replace(',', '.').strip())
    lat = float(arr[4].replace(',', '.').strip())
    return (country,capital,lng,lat)

In [25]:
def split_capitals_to_tuple_nc(satir):
    #No Country
    arr = satir.split("\t")
    capital = arr[2].strip()
    lng = float(arr[3].replace(',', '.').strip())
    lat = float(arr[4].replace(',', '.').strip())
    return (capital,(lng,lat))

In [26]:
new_capitals_rdd = capitals_rdd.map(lambda line: split_capitals_to_tuple(line))
print(new_capitals_rdd.take(10))

[('Afghanistan', 'Kabul', 34.53, 69.17), ('Albania', 'Tirana', 41.33, 19.82), ('Algeria', 'Algiers', 36.75, 3.04), ('American Samoa', 'Pago Pago', -14.28, -170.7), ('Andorra', 'Andorra la Vella', 42.51, 1.52), ('Angola', 'Luanda', -8.84, 13.23), ('Anguilla', 'The Valley', 18.22, -63.06), ('Antigua and Barbuda', "St. John's", 17.12, -61.85), ('Argentina', 'Buenos Aires', -34.61, -58.38), ('Armenia', 'Yerevan', 40.18, 44.51)]


## Testing to find a capital and it's location

In [27]:
g_city = new_capitals_rdd.filter(lambda country: 'Turkey' in country)
filtered = g_city.collect()
print(filtered[0][1])

Ankara


In [28]:
def city_location(city, pipeline):
    g_city = pipeline.filter(lambda country: city in country)
    filtered = g_city.collect()
    lng = filtered[0][2]
    lat = filtered[0][3]
    return (lng, lat)

In [29]:
loc_ankara = city_location('Ankara', new_capitals_rdd)
loc_paris = city_location('Paris', new_capitals_rdd)
print(loc_ankara)
print(loc_paris)

(39.92, 32.85)
(48.85, 2.35)


# Distance Calculator Function

In [30]:
def cal_dist(loc_1, loc_2):
    R = 6373.0

    lat1 = radians(loc_1[0])
    lon1 = radians(loc_1[1])
    lat2 = radians(loc_2[0])
    lon2 = radians(loc_2[1])
    
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c
    return distance

In [31]:
round(geodesic(loc_ankara, loc_paris).km, 2) 

2603.13

In [32]:
round(cal_dist(loc_ankara, loc_paris),2)

2598.09

# Making a Cartesian RDD

In [33]:
capital_loc_rdd_1 = capitals_rdd.map(lambda line: split_capitals_to_tuple_nc(line))
print(capital_loc_rdd_1.take(3))
capital_loc_rdd_2 = capitals_rdd.map(lambda line: split_capitals_to_tuple_nc(line))
print(capital_loc_rdd_2.take(3))

[('Kabul', (34.53, 69.17)), ('Tirana', (41.33, 19.82)), ('Algiers', (36.75, 3.04))]
[('Kabul', (34.53, 69.17)), ('Tirana', (41.33, 19.82)), ('Algiers', (36.75, 3.04))]


In [34]:
print(capital_loc_rdd_1.cartesian(capital_loc_rdd_2).take(3))

[(('Kabul', (34.53, 69.17)), ('Kabul', (34.53, 69.17))), (('Kabul', (34.53, 69.17)), ('Tirana', (41.33, 19.82))), (('Kabul', (34.53, 69.17)), ('Algiers', (36.75, 3.04)))]


## Calculating Every Distance of Each Capital

In [35]:
distance_rdd = capital_loc_rdd_1.cartesian(capital_loc_rdd_2).map(lambda x: (x[0][0] + '-' + x[1][0],
                                                                            int(round(cal_dist(x[0][1], x[1][1])))))
distance_rdd.take(5)

[('Kabul-Kabul', 0),
 ('Kabul-Tirana', 4336),
 ('Kabul-Algiers', 5860),
 ('Kabul-Pago Pago', 13650),
 ('Kabul-Andorra la Vella', 5798)]

### Sorting

In [36]:
distance_rdd = distance_rdd.filter(lambda x: x[1] > 0)
distance_rdd.take(5)

[('Kabul-Tirana', 4336),
 ('Kabul-Algiers', 5860),
 ('Kabul-Pago Pago', 13650),
 ('Kabul-Andorra la Vella', 5798),
 ('Kabul-Luanda', 7603)]

# Closest Capital Cities and The Distance Between Them.	

In [37]:
distance_rdd = distance_rdd.sortBy(lambda x:x[1])
distance_rdd.take(5)

[('Rome-Vatican', 3),
 ('Vatican-Rome', 3),
 ('Kinshasa-Brazzaville', 6),
 ('Brazzaville-Kinshasa', 6),
 ('The Valley-Marigot', 17)]

# Removing The Double Calculations

In [38]:
distance_rdd2 = distance_rdd.zipWithIndex().map(lambda x : (x[1],x[0]))
distance_rdd2.take(2)


[(0, ('Rome-Vatican', 3)), (1, ('Vatican-Rome', 3))]

In [39]:
distance_rdd3 = distance_rdd2.map(lambda x: (x[0]+1,x[1]))
distance_rdd3.take(3)


[(1, ('Rome-Vatican', 3)),
 (2, ('Vatican-Rome', 3)),
 (3, ('Kinshasa-Brazzaville', 6))]

In [40]:
distance_rdd4 = distance_rdd3.filter(lambda x: x[0]%2 ==0)
distance_rdd4.take(4)


[(2, ('Vatican-Rome', 3)),
 (4, ('Brazzaville-Kinshasa', 6)),
 (6, ('Marigot-The Valley', 17)),
 (8, ('Marigot-Gustavia', 31))]

In [41]:
distance_rdd5 = distance_rdd4.map(lambda x: x[1])
distance_rdd5.take(5)

[('Vatican-Rome', 3),
 ('Brazzaville-Kinshasa', 6),
 ('Marigot-The Valley', 17),
 ('Marigot-Gustavia', 31),
 ('Charlotte Amalie-Road Town', 34)]