In [1]:
#
# File: Assignment07_1c.py
# Name: Christopher M. Anderson
# Date: 10/18/2020
# Course: DSC650 Big Data
# Week: 7
# Assignment Number: 7.1c


# In this part of the assignment, you will
# partition a dataset using different strategies.
# You will use the routes.parquet dataset you
# created in a previous assignment. For this
# dataset, the key for each route will be the
# three-letter source airport code concatenated
# with the three-letter destination airport code
# and the two-letter airline. For instance, a
# route from Omaha Eppley Airfield (OMA) to Denver
# International Airport (DEN) on American Airlines
# (AA) has a key of OMADENAA.
#
# Finally, we will simulate multiple geographically
# distributed data centers. For this example, we will
# assume we have three data centers located in the
# western, central, and eastern United States. Google
# lists the locations of their data centers and we will
# use the following locations for our three data centers.


import os
import shutil
import json
from pathlib import Path
import gzip
import pandas as pd
import pygeohash as pgh

In [3]:
current_dir = Path(os.getcwd()).absolute()
results_dir = current_dir.joinpath('results')
geo_dir = results_dir.joinpath('geo')
results_dir.mkdir(parents=True, exist_ok=True)
if os.path.exists(geo_dir):
    shutil.rmtree(geo_dir)
os.makedirs(geo_dir)

In [4]:
# 1): Load the dataset
def read_jsonl_data():
    src_data_path = 'data/routes.jsonl.gz'
    with gzip.open(src_data_path, 'rb') as f:
        records = [json.loads(line) for line in f.readlines()]

    return records

In [5]:
# 2): Flatten the records:
def flatten_record(record):
    flat_record = dict()
    for key, value in record.items():
        if key in ['airline', 'src_airport', 'dst_airport']:
            if isinstance(value, dict):
                for child_key, child_value in value.items():
                    flat_key = '{}_{}'.format(key, child_key)
                    flat_record[flat_key] = child_value
        else:
            flat_record[key] = value
    return flat_record


def create_flattened_dataset():
    records = read_jsonl_data()
    return pd.DataFrame.from_records([flatten_record(record) for record in records])


df = create_flattened_dataset()

In [6]:
# 3): Add key column:
df['key'] = df['src_airport_iata'].map(str) + \
            df['dst_airport_iata'].map(str) + \
            df['airline_iata'].map(str)

In [7]:
# 4): GeoHash Source Lat and Long:
df['geoHash'] = df.apply(lambda x: pgh.encode(x['src_airport_latitude'], x['src_airport_longitude']), axis=1)

In [8]:
# 5): Geohash Locations:
df['geoHashWest'] = pgh.encode(45.5945645, -121.1786823)
df['geoHashCentral'] = pgh.encode(41.1544433, -96.0422378)
df['geoHashEast'] = pgh.encode(39.08344, -77.6497145)

geoHashWest = pgh.encode(45.5945645, -121.1786823)
geoHashCentral = pgh.encode(41.1544433, -96.0422378)
geoHashEast = pgh.encode(39.08344, -77.6497145)

In [9]:
# 6): Create Distance Comparison Columns:
dfLocation = pd.DataFrame(columns=[])

dfLocation['distance_to_West'] = df.apply(lambda x: pgh.geohash_haversine_distance(x['geoHash'], x['geoHashWest']), axis=1)
dfLocation['distance_to_Central'] = df.apply(lambda x: pgh.geohash_haversine_distance(x['geoHash'], x['geoHashCentral']), axis=1)
dfLocation['distance_to_East'] = df.apply(lambda x: pgh.geohash_haversine_distance(x['geoHash'], x['geoHashEast']), axis=1)

df['distance_to_West'] = dfLocation['distance_to_West']
df['distance_to_Central'] = dfLocation['distance_to_Central']
df['distance_to_East'] = dfLocation['distance_to_East']

In [10]:
# 7): Find Min Values in Locations:
minValueIndexObj = dfLocation.min(axis=1)
df['location'] = minValueIndexObj

In [11]:
# 8): Update location column for partitioning:
df['location'].mask(df['location'] == dfLocation['distance_to_West'], 'west', inplace=True)
df['location'].mask(df['location'] == dfLocation['distance_to_Central'], 'central', inplace=True)
df['location'].mask(df['location'] == dfLocation['distance_to_East'], 'east', inplace=True)

In [12]:
# 9): Verify all new location data is as expected:
print(df)
pd.set_option('display.max_columns', None)
print(df.head())

       airline_airline_id      airline_name           airline_alias  \
0                     410        Aerocondor  ANA All Nippon Airways   
1                     410        Aerocondor  ANA All Nippon Airways   
2                     410        Aerocondor  ANA All Nippon Airways   
3                     410        Aerocondor  ANA All Nippon Airways   
4                     410        Aerocondor  ANA All Nippon Airways   
...                   ...               ...                     ...   
67658                4178  Regional Express          Qantas Airways   
67659               19016        Apache Air                  Apache   
67660               19016        Apache Air                  Apache   
67661               19016        Apache Air                  Apache   
67662               19016        Apache Air                  Apache   

      airline_iata airline_icao airline_callsign airline_country  \
0               2B          ARD       AEROCONDOR        Portugal   
1          

In [13]:
# 10): Write flattened df to parquet:
def write_parquet_file():
    pq_flat_path = results_dir.joinpath('routes-flattened.parquet')
    df.to_parquet(pq_flat_path)


write_parquet_file()

In [14]:
# 11): Write flattened df to partitioned parquet:
def write_partitioned_parquet_files():
    geo_path = geo_dir.joinpath()
    df.to_parquet(geo_path, partition_cols=['location'])


write_partitioned_parquet_files()