In [1]:
# I could not install Spark on local PyCharm application. 
# That's why I am using Colab as you already suggested in the lecture.
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Install specific Java and Spark for Python.
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version
!pip install pyspark

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Wait                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [2 In0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to                                                                               Hit:4 https://developer.download.nvidia.com/comp

In [3]:
# Check to see if I installed Spark correctly or not.
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
sc

In [4]:
# Inspect capital dataset.
capitalRDD = sc.textFile("/content/drive/MyDrive/Colab Notebooks/BDP/Datasets-20210410/Capitals.txt")
print(capitalRDD.collect()) # Since I am not working on Hadoop cluster, I can safely use collect method instead of take method 

# After the data inspection, it is realized that words in each line are seperated by tab (\t) and 
# the first element in each line is not necessaray.

# First float value is latitude and second float value is longitude of each capital.

# There is no header or no space lines in the dataset. So, no need for filtering these.

# The numbers must be in decimal degrees format and range from -90 to 90 for latitude and -180 to 180 for longitude. So, we need to
# filter out wrong values (out of interval) for latitude and longitude

# Let's preprocess our data by considering earlier inspection
def split_and_take_important_columns(line):
  arr = line.split("\t")
  lat = float(arr[3].replace(",",  "."))
  longt = float(arr[4].replace(",",  "."))
  city = arr[2].strip()
  return (city, lat, longt)

capitalRDD = capitalRDD.filter(lambda x: len(x) > 1) # Filter empty lines if they exits
capitalRDD = capitalRDD.map(lambda line: split_and_take_important_columns(line))
# Filter out wrong values for latitude and longitude if they exit.
capitalRDD = capitalRDD.filter(lambda line: -90<=line[1]<=90)
capitalRDD = capitalRDD.filter(lambda line: -180<=line[2]<=180)
print(capitalRDD.take(10))

['Afghanistan Flag Icon \tAfghanistan \tKabul \t34,53 \t69,17\t', 'Albania Flag Icon \tAlbania \tTirana \t41,33 \t19,82\t', 'Algeria Flag Icon \tAlgeria \tAlgiers \t36,75 \t3,04\t', 'American Samoa Flag Icon \tAmerican Samoa \tPago Pago \t-14,28 \t-170,70\t', 'Andorra Flag Icon \tAndorra \tAndorra la Vella \t42,51 \t1,52\t', 'Angola Flag Icon \tAngola \tLuanda \t-8,84 \t13,23\t', 'Anguilla Flag Icon \tAnguilla \tThe Valley \t18,22 \t-63,06\t', "Antigua and Barbuda Flag Icon \tAntigua and Barbuda \tSt. John's \t17,12 \t-61,85\t", 'Argentina Flag Icon \tArgentina \tBuenos Aires \t-34,61 \t-58,38\t', 'Armenia Flag Icon \tArmenia \tYerevan \t40,18 \t44,51\t', 'Aruba Flag Icon \tAruba \tOranjestad \t12,52 \t-70,03\t', 'Australia Flag Icon \tAustralia \tCanberra \t-35,28 \t149,13\t', 'Austria Flag Icon \tAustria \tVienna \t48,21 \t16,37\t', 'Azerbaijan Flag Icon \tAzerbaijan \tBaku \t40,38 \t49,89\t', 'Bahamas Flag Icon \tBahamas \tNassau \t25,06 \t-77,34\t', 'Bahrain Flag Icon \tBahrain \tM

In [13]:
from geopy import distance
# Take a cartesian of tuple capital dataset
capitalDistRDD = capitalRDD.cartesian(capitalRDD)
print(capitalDistRDD.take(10))

def calculate_dist(sample):
    loc1 = sample[0][1:3]
    loc2 = sample[1][1:3]
    dist = distance.distance(loc1, loc2).km # in km
    dist = round(dist, 2) # round it to 2 decimal points
    return (sample[0][0], sample[1][0], dist)

capitalDistRDD = capitalDistRDD.filter(lambda x: x[0][0] != x[1][0]) # Filter same capital city pairs
capitalDistRDD = capitalDistRDD.map(lambda line: calculate_dist(line))
capitalDistRDD = capitalDistRDD.filter(lambda line: int(line[2])>=3) # Filter same capital cities for two different contries
# where distance is smaller than 3 km. Ie. eliminate Italy and Israil cases
print(capitalDistRDD.take(10))

[(('Kabul', 34.53, 69.17), ('Kabul', 34.53, 69.17)), (('Kabul', 34.53, 69.17), ('Tirana', 41.33, 19.82)), (('Kabul', 34.53, 69.17), ('Algiers', 36.75, 3.04)), (('Kabul', 34.53, 69.17), ('Pago Pago', -14.28, -170.7)), (('Kabul', 34.53, 69.17), ('Andorra la Vella', 42.51, 1.52)), (('Kabul', 34.53, 69.17), ('Luanda', -8.84, 13.23)), (('Kabul', 34.53, 69.17), ('The Valley', 18.22, -63.06)), (('Kabul', 34.53, 69.17), ("St. John's", 17.12, -61.85)), (('Kabul', 34.53, 69.17), ('Buenos Aires', -34.61, -58.38)), (('Kabul', 34.53, 69.17), ('Yerevan', 40.18, 44.51))]
[('Kabul', 'Tirana', 4344.48), ('Kabul', 'Algiers', 5870.86), ('Kabul', 'Pago Pago', 13651.19), ('Kabul', 'Andorra la Vella', 5809.34), ('Kabul', 'Luanda', 7593.19), ('Kabul', 'The Valley', 12295.01), ('Kabul', "St. John's", 12302.53), ('Kabul', 'Buenos Aires', 15269.26), ('Kabul', 'Yerevan', 2264.6), ('Kabul', 'Oranjestad', 13256.55)]


In [14]:
# Hocam, since you sad that it is not important to have duplicates in the lecture, I keep (A, B) and (B, A) pairs
# Since you requested different notebook for each question, I put furthest capital city in different notebook by keeping everything same
capitalDistRDD = capitalDistRDD.sortBy(lambda x: x[2], ascending=True)
print("Two closest capital cities (distance is in km) in the dataset:")
print(capitalDistRDD.take(1))
capitalDistRDD = capitalDistRDD.sortBy(lambda x: x[2], ascending=False)
print("Two furthest capital cities (distance is in km) in the dataset:")
print(capitalDistRDD.take(1))

Two closest capital cities (distance is in km) in the dataset:
[('Kinshasa', 'Brazzaville', 6.45)]
Two furthest capital cities (distance is in km) in the dataset:
[('Asunción', 'Taipei', 19937.98)]
