In [1]:
import os
import json
from pathlib import Path
import gzip
import shutil
import pandas as pd
import pygeohash

In [2]:
# set directories
current_dir = Path(os.getcwd()).absolute()
results_dir = current_dir.joinpath('results')

if results_dir.exists():
    shutil.rmtree(results_dir)

results_dir.mkdir(parents = True, exist_ok = True)
src_data_path = '/home/jovyan/dsc650/data/processed/openflights/routes.jsonl.gz'

In [3]:
# read routes file
def read_jsonl_data():
    with gzip.open(src_data_path, 'rb') as f:
        records = [json.loads(line) for line in f.readlines()]
        
    return records

In [4]:
# flatten record
def flatten_record(record):
    flat_record = dict()
    
    for key, value in record.items():
        if key in ['airline', 'src_airport', 'dst_airport']:
            if isinstance(value, dict):
                for child_key, child_value in value.items():
                    flat_key = '{}_{}'.format(key, child_key)
                    flat_record[flat_key] = child_value
        else:
            flat_record[key] = value
    
    return flat_record

In [5]:
def create_flattened_dataset():
    records = read_jsonl_data()
    
    parquet_path = results_dir.joinpath('routes-flattened.parquet')
    
    return pd.DataFrame.from_records([flatten_record(record) for record in records])

## Assignment 7.1

In this part of the assignment, you will partition a dataset using different strategies. You will use the routes.parquet dataset you created in a previous assignment. For this dataset, the key for each route will be the three-letter source airport code concatenated with the three-letter destination airport code and the two-letter airline. For instance, a route from Omaha Eppley Airfield (OMA) to Denver International Airport (DEN) on American Airlines (AA) has a key of OMADENAA.

### a.

We will create 16 partitions so that we can compare it to the partitions we create from hashed keys in the next part of the assignment. The partitions are determined by the first letter of the composite key using the following partitions.

In [6]:
# load parquet dataset
df = create_flattened_dataset()

In [7]:
df[['src_airport_iata', 'dst_airport_iata', 'airline_iata']].head(5)

Unnamed: 0,src_airport_iata,dst_airport_iata,airline_iata
0,AER,KZN,2B
1,ASF,KZN,2B
2,ASF,MRV,2B
3,CEK,KZN,2B
4,CEK,OVB,2B


In [8]:
df[['src_airport_iata', 'dst_airport_iata', 'airline_iata']].tail(5)

Unnamed: 0,src_airport_iata,dst_airport_iata,airline_iata
67658,WYA,ADL,ZL
67659,DME,FRU,ZM
67660,FRU,DME,ZM
67661,FRU,OSS,ZM
67662,OSS,FRU,ZM


In [9]:
# Assign keys
df['key'] = df['src_airport_iata'].astype(str) + df['dst_airport_iata'].astype(str) + df['airline_iata'].astype(str)

In [10]:
df['key'].head(5)

0    AERKZN2B
1    ASFKZN2B
2    ASFMRV2B
3    CEKKZN2B
4    CEKOVB2B
Name: key, dtype: object

In [11]:
df['key'].tail(5)

67658    WYAADLZL
67659    DMEFRUZM
67660    FRUDMEZM
67661    FRUOSSZM
67662    OSSFRUZM
Name: key, dtype: object

In [12]:
# define partitions
partitions = (
    ('A', 'A'), ('B', 'B'), ('C', 'D'), ('E', 'F'),
    ('G', 'H'), ('I', 'J'), ('K', 'L'), ('M', 'M'),
    ('N', 'N'), ('O', 'P'), ('Q', 'R'), ('S', 'T'),
    ('U', 'U'), ('V', 'V'), ('W', 'X'), ('Y', 'Z')
)

Results directions should be set up like this:

kv<br />
├── kv_key=A<br />
├── kv_key=B<br />
├── kv_key=C-D<br />
├── kv_key=E-F<br />
├── kv_key=G-H<br />
├── kv_key=I-J<br />
├── kv_key=K-L<br />
├── kv_key=M<br />
├── kv_key=N<br />
├── kv_key=O-P<br />
├── kv_key=Q-R<br />
├── kv_key=S-T<br />
├── kv_key=U<br />
├── kv_key=V<br />
├── kv_key=W-X<br />
└── kv_key=Y-Z<br />


In [13]:
# Set up dictionary of partitions and kv_keys
partition_dict = {}

for i in partitions:
    if i[0] == i[1]:
        partition_dict[i] = i[0]
    else:
        partition_dict[i] = i[0] + '-' + i[1]
        
print(partition_dict)

{('A', 'A'): 'A', ('B', 'B'): 'B', ('C', 'D'): 'C-D', ('E', 'F'): 'E-F', ('G', 'H'): 'G-H', ('I', 'J'): 'I-J', ('K', 'L'): 'K-L', ('M', 'M'): 'M', ('N', 'N'): 'N', ('O', 'P'): 'O-P', ('Q', 'R'): 'Q-R', ('S', 'T'): 'S-T', ('U', 'U'): 'U', ('V', 'V'): 'V', ('W', 'X'): 'W-X', ('Y', 'Z'): 'Y-Z'}


In [14]:
# generate kv_key from key
def kv_key_gen(data_key):
    for key, val in partition_dict.items():
        if data_key[0] == key[0] or data_key[0] == key[1]:
            return val
        
    return ''

In [15]:
# add kv_key column to df
df['kv_key'] = df['key'].apply(kv_key_gen)

In [16]:
df[['kv_key', 'key']].head(5)

Unnamed: 0,kv_key,key
0,A,AERKZN2B
1,A,ASFKZN2B
2,A,ASFMRV2B
3,C-D,CEKKZN2B
4,C-D,CEKOVB2B


In [17]:
df[['kv_key', 'key']].tail(5)

Unnamed: 0,kv_key,key
67658,W-X,WYAADLZL
67659,C-D,DMEFRUZM
67660,E-F,FRUDMEZM
67661,E-F,FRUOSSZM
67662,O-P,OSSFRUZM


In [18]:
# partition dataset
df.to_parquet(results_dir.joinpath('kv'), partition_cols = ['kv_key'])

### b.

We are going to partition the dataset again, but this time we will partition by the hash value of the key.

In [19]:
# Function that will create a SHA256 hash of the input key and return a hexadecimal string representation of the hash.
import hashlib

def hash_key(key):
    m = hashlib.sha256()
    m.update(str(key).encode('utf-8'))
    
    return m.hexdigest()

We will partition the data using the first character of the hexadecimal hash. As such, there are 16 possible partitions. Create a new column called hashed that is a hashed value of the key column. Next, create a partitioned dataset based on the first character of the hashed key and save the results to results/hash. The directory should contain the following folders.

hash<br />
├── hash_key=0<br />
├── hash_key=1<br />
├── hash_key=2<br />
├── hash_key=3<br />
├── hash_key=4<br />
├── hash_key=5<br />
├── hash_key=6<br />
├── hash_key=7<br />
├── hash_key=8<br />
├── hash_key=9<br />
├── hash_key=A<br />
├── hash_key=B<br />
├── hash_key=C<br />
├── hash_key=D<br />
├── hash_key=E<br />


In [20]:
# Add hash column to df
df['hashed'] = df['key'].apply(hash_key)

df[['key', 'hashed']].head(5)

Unnamed: 0,key,hashed
0,AERKZN2B,652cdec02010381f175efe499e070c8cbaac1522bac59a...
1,ASFKZN2B,9eea5dd88177f8d835b2bb9cb27fb01268122b635b241a...
2,ASFMRV2B,161143856af25bd4475f62c80c19f68936a139f653c1d3...
3,CEKKZN2B,39aa99e6ae2757341bede9584473906ef1089e30820c90...
4,CEKOVB2B,143b3389bce68eea3a13ac26a9c76c1fa583ec2bd26ea8...


In [21]:
# Add hash key column to df
df['hash_key'] = df['hashed'].str[0]

df[['key', 'hash_key', 'hashed']].head(5)

Unnamed: 0,key,hash_key,hashed
0,AERKZN2B,6,652cdec02010381f175efe499e070c8cbaac1522bac59a...
1,ASFKZN2B,9,9eea5dd88177f8d835b2bb9cb27fb01268122b635b241a...
2,ASFMRV2B,1,161143856af25bd4475f62c80c19f68936a139f653c1d3...
3,CEKKZN2B,3,39aa99e6ae2757341bede9584473906ef1089e30820c90...
4,CEKOVB2B,1,143b3389bce68eea3a13ac26a9c76c1fa583ec2bd26ea8...


In [22]:
# Partition dataset using hk_hash
df.to_parquet(results_dir.joinpath('hash'), partition_cols=['hash_key'])

### c.

We will simulate multiple geographically distributed data centers. For this example, we will assume we have three data centers located in the western, central, and eastern United States. Google lists the locations of their data centers and we will use the following locations for our three data centers.

##### West
- The Dalles, Oregon
- Latitude: 45.5945645
- Longitude: -121.1786823

##### Central
- Papillion, NE
- Latitude: 41.1544433
- Longitude: -96.0422378

##### East
- Loudoun County, Virginia
- Latitude: 39.08344
- Longitude: -77.6497145

Assume that you have an application that provides routes for each of the source airports and you want to store routes in the data center closest to the source airport. The output folders should look as follows.

geo<br />
├── location=central<br />
├── location=east<br />
└── location=west<br />


In [23]:
# Add geohash key to df
df['airport_geohash'] = df.apply(lambda x: pygeohash.encode(x['src_airport_latitude'], 
                                                            x['src_airport_longitude']), 
                                 axis = 1)

df[['src_airport_latitude', 'src_airport_longitude', 'airport_geohash']].head(5)

Unnamed: 0,src_airport_latitude,src_airport_longitude,airport_geohash
0,43.449902,39.9566,szsrjjzd02b3
1,46.283298,48.006302,v04pk3t5gbjj
2,46.283298,48.006302,v04pk3t5gbjj
3,55.305801,61.5033,v3gdxs17du83
4,55.305801,61.5033,v3gdxs17du83


In [24]:
# Get geohash location info for data centers
data_centers = dict(
    west = pygeohash.encode(45.5945645, -121.1786823),
    central = pygeohash.encode(41.1544433, -96.0422378),
    east = pygeohash.encode(39.08344, -77.6497145)
)

print(data_centers)

{'west': 'c21g6s0rs4c7', 'central': '9z7dnebnj8kb', 'east': 'dqby34cjw922'}


In [25]:
# Get the closest datacenter from the source airport
def get_datacenter_location(geohash): 
    distance_dict = {}
    
    for key in data_centers.keys():
        distance_dict[key] = pygeohash.geohash_haversine_distance(data_centers.get(key), geohash)
        
    sorted_distance = sorted(distance_dict.items(), key = lambda x: x[1])[0][0]
        
    return sorted_distance

In [26]:
# Add new column for closest data center
df['data_center'] = df['airport_geohash'].apply(lambda x: get_datacenter_location(x))

In [27]:
df[['src_airport_latitude', 'src_airport_longitude', 'airport_geohash', 'data_center']].head(5)

Unnamed: 0,src_airport_latitude,src_airport_longitude,airport_geohash,data_center
0,43.449902,39.9566,szsrjjzd02b3,east
1,46.283298,48.006302,v04pk3t5gbjj,east
2,46.283298,48.006302,v04pk3t5gbjj,east
3,55.305801,61.5033,v3gdxs17du83,west
4,55.305801,61.5033,v3gdxs17du83,west


In [28]:
df[['src_airport_latitude', 'src_airport_longitude', 'airport_geohash', 'data_center']].tail(5)

Unnamed: 0,src_airport_latitude,src_airport_longitude,airport_geohash,data_center
67658,-33.058899,137.514008,r41gcjy9uwef,west
67659,55.408798,37.9063,ucfgnwfe8u9e,east
67660,43.061298,74.4776,txsuyz0fjzgd,west
67661,43.061298,74.4776,txsuyz0fjzgd,west
67662,40.609001,72.793297,tx5z02wkwf2p,west


In [29]:
# Partition dataset using closest data center location
df.to_parquet(results_dir.joinpath('geo'), partition_cols=['data_center'])

### d.

Create a Python function that takes as input a list of keys and the number of partitions and returns a list of keys sorted into the specified number of partitions. The partitions should be roughly equal in size. Furthermore, the partitions should have the property that each partition contains all the keys between the least key in the partition and the greatest key in the partition. In other words, the partitions should be ordered.

In [30]:
import itertools

def balance_partitions(keys, num_partitions):
    partition_size = round(len(keys) / num_partitions)
    iterations = iter(keys)
    
    partitions_iterations = iter(lambda: tuple(itertools.islice(iterations, partition_size)), ())
    
    partitions = [sorted(part) for part in partitions_iterations]
    
    return partitions

In [31]:
# Test with sample of 50 airline rows into 5 partitions
airline_names = df.airline_iata.sample(50).to_list()

partitions = balance_partitions(airline_names, 5)

partitions

[['4U', '7H', 'AA', 'AC', 'AY', 'DL', 'DL', 'ET', 'FL', 'QF'],
 ['AA', 'AS', 'AZ', 'JQ', 'JT', 'KY', 'O6', 'SN', 'Y4', 'nan'],
 ['3R', 'AC', 'AF', 'DL', 'GS', 'HU', 'LO', 'MU', 'U6', 'US'],
 ['8L', 'AY', 'BT', 'FR', 'NF', 'P9', 'SI', 'SN', 'VS', 'WS'],
 ['AB', 'DL', 'DL', 'FR', 'GE', 'KL', 'RJ', 'SC', 'SG', 'TK']]