# Assignment 7.1A

In [1]:
import os
import json
from pathlib import Path
import gzip
import hashlib
import shutil
import pandas as pd
import pygeohash
import s3fs
import uuid
import math

In [2]:
os.getcwd()

'/home/jovyan/dsc650/dsc650/assignments/assignment07'

In [3]:
current_dir = Path(os.getcwd()).absolute()
results_dir = current_dir.joinpath('results')

if results_dir.exists():
    shutil.rmtree(results_dir)
results_dir.mkdir(parents=True, exist_ok=True)

In [11]:
def read_jsonl_data():
    src_data_path = '/home/jovyan/dsc650/data/processed/openflights/routes.jsonl.gz'
    with gzip.open(src_data_path, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
    return records

def flatten_record(record):
    flat_record = dict()
    for key, value in record.items():
        if key in ['airline', 'src_airport', 'dst_airport']:
            if isinstance(value, dict):
                for child_key, child_value in value.items():
                    flat_key = '{}_{}'.format(key, child_key)
                    flat_record[flat_key] = child_value
        else:
            flat_record[key] = value
    return flat_record

def create_flattened_dataset():
    records = read_jsonl_data()
    parquet_path = results_dir.joinpath('routes-flattened.parquet')
    return pd.DataFrame.from_records([flatten_record(record) for record in records])

In [12]:
# Create the flattened dataset
df = create_flattened_dataset()
df['key'] = df['src_airport_iata'].astype(str) + df['dst_airport_iata'].astype(str) + df['airline_iata'].astype(str)

In [13]:
# partitions copied from assignment instructions
partitions = (
        ('A', 'A'), ('B', 'B'), ('C', 'D'), ('E', 'F'),
        ('G', 'H'), ('I', 'J'), ('K', 'L'), ('M', 'M'),
        ('N', 'N'), ('O', 'P'), ('Q', 'R'), ('S', 'T'),
        ('U', 'U'), ('V', 'V'), ('W', 'X'), ('Y', 'Z')
)

In [None]:
# Correction proposed by Professor
# def get_partition(key):
#    partitions = (
#        ('A', 'A'), ('B', 'B'), ('C', 'D'), ('E', 'F'),
#        ('G', 'H'), ('I', 'J'), ('K', 'L'), ('M', 'M'),
#        ('N', 'N'), ('O', 'P'), ('Q', 'R'), ('S', 'T'),
#        ('U', 'U'), ('V', 'V'), ('W', 'X'), ('Y', 'Z')
#    )
    
#    for start_partition, end_partition in partitions:
#        if start_partition == end_partition:
#            kv_key = start_partition
#        else:
#            kv_key = '{}-{}'.format(start_partition, end_partition)
            
        
#        if key[0] >= start_partition and key[0] <= end_partition:
#            return kv_key

# df['kv_key'] = df['key'].apply(get_partition)
# df.to_parquet('results/kv', partition_cols=['kv_key'])

In [14]:
# Remove NAN values from the dataset so no errors occur
df = df[df['src_airport_iata'].isna() == False]

In [15]:
# View df
df.head()

Unnamed: 0,airline_airline_id,airline_name,airline_alias,airline_iata,airline_icao,airline_callsign,airline_country,airline_active,src_airport_airport_id,src_airport_name,...,dst_airport_longitude,dst_airport_altitude,dst_airport_timezone,dst_airport_dst,dst_airport_tz_id,dst_airport_type,dst_airport_source,codeshare,equipment,key
0,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2965.0,Sochi International Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],AERKZN2B
1,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFKZN2B
2,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,43.081902,1054.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFMRV2B
3,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],CEKKZN2B
4,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,82.650703,365.0,7.0,N,Asia/Krasnoyarsk,airport,OurAirports,False,[CR2],CEKOVB2B


In [17]:
# Get appropriate values for the partitions

# Set kv-key equal to the first letter
df['kv_key'] = df['key'].str[0]

# Assign a value from the partitions list of tuples
df['kv_key'] = df['kv_key'].apply(lambda x: [str('-'.join(partition)) for partition in partitions if (str(x) >= partition[0]) & (str(x) <= partition[1])])
df['kv_key'] = [''.join(partition) for partition in df['kv_key']]

# Replace any partitions with the same start & end with a single letter
df['kv_key'] = [partition[0] if partition[0] == partition[2] else partition for partition in df['kv_key']]

In [18]:
df.to_parquet(
    path='results/kv',
    partition_cols=['kv_key'])

# Assignment 7.1B

In [19]:
import hashlib

In [20]:
# Create Hash key function (copied from assignment instructions)
def hash_key(key):
    m = hashlib.sha256()
    m.update(str(key).encode('utf-8'))
    return m.hexdigest()

In [21]:
df['hashed'] = df['key'].apply(lambda x: hash_key(x))
df['hash_key'] = df['hashed'].str[0]

In [22]:
# view df & view new columns
df.head()

Unnamed: 0,airline_airline_id,airline_name,airline_alias,airline_iata,airline_icao,airline_callsign,airline_country,airline_active,src_airport_airport_id,src_airport_name,...,dst_airport_dst,dst_airport_tz_id,dst_airport_type,dst_airport_source,codeshare,equipment,key,kv_key,hashed,hash_key
0,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2965.0,Sochi International Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],AERKZN2B,A,652cdec02010381f175efe499e070c8cbaac1522bac59a...,6
1,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFKZN2B,A,9eea5dd88177f8d835b2bb9cb27fb01268122b635b241a...,9
2,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFMRV2B,A,161143856af25bd4475f62c80c19f68936a139f653c1d3...,1
3,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,N,Europe/Moscow,airport,OurAirports,False,[CR2],CEKKZN2B,C-D,39aa99e6ae2757341bede9584473906ef1089e30820c90...,3
4,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,N,Asia/Krasnoyarsk,airport,OurAirports,False,[CR2],CEKOVB2B,C-D,143b3389bce68eea3a13ac26a9c76c1fa583ec2bd26ea8...,1


In [23]:
# create results/hash directory
df.to_parquet(path='results/hash',
              partition_cols = ['hash_key'])

# Assignment 7.1C

In [24]:
# Datacenters - lat/long given in instructions
datacenters = {}
datacenters['west'] = pygeohash.encode(45.5945645, -121.1786823)
datacenters['central'] = pygeohash.encode(41.1544433, -96.0422378)
datacenters['east'] = pygeohash.encode(39.08344, -77.6497145)
print(datacenters)

{'west': 'c21g6s0rs4c7', 'central': '9z7dnebnj8kb', 'east': 'dqby34cjw922'}


In [25]:
# Provide routes for each of the source airports & store routes
# in the data center closest to the source airport

def closest_datacenter(latitude, longitude):
    geohash = pygeohash.encode(latitude, longitude)
    dist_dict = {}
    closest_datacenter = ''
    last_distance = None
    for key, value in datacenters.items():
        dist = pygeohash.geohash_approximate_distance(str(geohash), str(value))
        dist_dict[key] = dist
        if (last_distance == None) or (dist < last_distance):
            closest_datacenter = key
            last_distance = dist
    return closest_datacenter

In [26]:
df['datacenter'] = df[['src_airport_latitude', 'src_airport_longitude']].apply(lambda x: closest_datacenter(x[0], x[1]), axis=1)

In [27]:
# create results/geo directory
df.to_parquet(
    path='results/geo',
    partition_cols = ['datacenter'])

In [28]:
df['datacenter'].value_counts()

west       51684
east       10009
central     5487
Name: datacenter, dtype: int64

# Assignment 7.1D

In [29]:
# Create function (copied outline from assignment instructions)
def balance_partitions(keys, num_partitions):
    partitions = []
    
    # Get the ideal number of records in each partition
    partition_size = len(keys) / num_partitions
    
    # Get the count of records for each key
    key_grp_cnts = []
    for key in set(keys):
        occurences = keys.count(key)
        key_grp_cnts.append(tuple([key, occurences]))
        
    key_grp_cnts.sort(key=lambda v: v[0].lower())
    
    total = 0
    
    partition_list = []
    
    # loop through the group counts until you exceed partition_size
    for grp in key_grp_cnts:
        
        # If total is 0, then it's the first key in the group
        if total == 0:
            min_grp = grp[0]
            last_group = grp[0]
            
        # If the incremented total exceeds the ideal partition size, then this key is the max group & reset the total
        if (total + grp[1]) > partition_size:
            max_grp = last_group
            partition_list.append(tuple([min_grp, max_grp]))
            last_group = grp[0]
            total=0
        else:
            last_group = grp[0]
            total += grp[1]
            
    # Last partition
    partition_list.append(tuple([min_grp, last_group]))
    
    return partition_list

In [30]:
# Start by using a series from the df above as the list of keys
keys = list(df['airline_name'])
num_partitions = 10

In [31]:
# print balance_partitions
print(balance_partitions(keys, num_partitions))

[('40-Mile Air', 'Air Foyle'), ('Air Greenland', 'Amaszonas'), ('Amerijet International', 'China Eastern Airlines'), ('China SSS', 'Eurowings'), ('Excel Airways', 'Jet Airways'), ('JetBlue Airways', 'Omni Air International'), ('Onur Air', 'Shaheen Air International'), ('Shanghai Airlines', 'TransAsia Airways'), ('Transavia Holland', 'UTair-Express'), ('Valuair', 'Zoom Airlines')]
