Assignment 7.1

In [1]:
import os
import json
from pathlib import Path
import gzip
import hashlib
import shutil
import pandas as pd
import pygeohash
import s3fs
endpoint_url='https://storage.budsc.midwest-datascience.com'
current_dir = Path(os.getcwd()).absolute()
results_dir = current_dir.joinpath('results')
if results_dir.exists():
    shutil.rmtree(results_dir)
results_dir.mkdir(parents=True, exist_ok=True)
def read_jsonl_data():
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        
    return records
def flatten_record(record):
    flat_record = dict()
    for key, value in record.items():
        if key in ['airline', 'src_airport', 'dst_airport']:
            if isinstance(value, dict):
                for child_key, child_value in value.items():
                    flat_key = '{}_{}'.format(key, child_key)
                    flat_record[flat_key] = child_value
        else:
            flat_record[key] = value
    
    return flat_record
def create_flattened_dataset():
    records = read_jsonl_data()
    parquet_path = results_dir.joinpath('routes-flattened.parquet')
    return pd.DataFrame.from_records([flatten_record(record) for record in records])
df = create_flattened_dataset()
df['key'] = df['src_airport_iata'].astype(str) + df['dst_airport_iata'].astype(str) + df['airline_iata'].astype(str)

In [2]:
df.head()

Unnamed: 0,airline_airline_id,airline_name,airline_alias,airline_iata,airline_icao,airline_callsign,airline_country,airline_active,src_airport_airport_id,src_airport_name,...,dst_airport_longitude,dst_airport_altitude,dst_airport_timezone,dst_airport_dst,dst_airport_tz_id,dst_airport_type,dst_airport_source,codeshare,equipment,key
0,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2965.0,Sochi International Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],AERKZN2B
1,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFKZN2B
2,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2966.0,Astrakhan Airport,...,43.081902,1054.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],ASFMRV2B
3,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,49.278702,411.0,3.0,N,Europe/Moscow,airport,OurAirports,False,[CR2],CEKKZN2B
4,410,Aerocondor,ANA All Nippon Airways,2B,ARD,AEROCONDOR,Portugal,True,2968.0,Chelyabinsk Balandino Airport,...,82.650703,365.0,7.0,N,Asia/Krasnoyarsk,airport,OurAirports,False,[CR2],CEKOVB2B


In [3]:
df.shape

(67663, 39)

In [4]:
df['kv_key']= df['key'].str[:1]


In [5]:
df['kv_key'].replace({'C':'C-D','n':'N','D':'C-D','E':'E-F','F':'E-F','G':'G-H','H':'G-H','I':'I-J','J':'I-J', 'K':'K-L','L':'K-L', 'O':'O-P','P':'O-P', 'Q':'Q-R', 'R':'Q-R', 'S':'S-T','T':'S-T', 'W':'W-X', 'X':'W-X',  'Y':'Y-Z', 'Z':'Y-Z'}, inplace=True)

In [6]:
df['kv_key']

0          A
1          A
2          A
3        C-D
4        C-D
        ... 
67658    W-X
67659    C-D
67660    E-F
67661    E-F
67662    O-P
Name: kv_key, Length: 67663, dtype: object

In [7]:
df.to_parquet(results_dir, partition_cols=['kv_key'])

Assignment 7.1 B

In [8]:
import hashlib

def hash_key(key):
    m = hashlib.sha256()
    m.update(str(key).encode('utf-8'))
    return m.hexdigest()

In [9]:
df['hashed']= df['key'].apply(lambda x: hash_key(x))

In [10]:
df['hashed']

0        652cdec02010381f175efe499e070c8cbaac1522bac59a...
1        9eea5dd88177f8d835b2bb9cb27fb01268122b635b241a...
2        161143856af25bd4475f62c80c19f68936a139f653c1d3...
3        39aa99e6ae2757341bede9584473906ef1089e30820c90...
4        143b3389bce68eea3a13ac26a9c76c1fa583ec2bd26ea8...
                               ...                        
67658    f31527be84c36208c05cac57dfac8a46b48a87dda151f8...
67659    880fc35ca283ad034c90becc4e331b72ee894b9eb69f76...
67660    e976939986fbf947bb9318018cef717c0b34dff91e5e67...
67661    8b0c0b835a58a4250e020d51ec2a896e4ef3f5c3543b8e...
67662    629f14f3fb6f94ebd1522d33a3c50675942e3148d028b4...
Name: hashed, Length: 67663, dtype: object

In [11]:
df['hashed_key']= df['hashed'].str[:1]

In [12]:
df.to_parquet(results_dir, partition_cols=['hashed_key'])

Assignment 7.1 C

In [13]:
df['src_airport_geohash']= df.apply(lambda row: pygeohash.encode(row.src_airport_latitude, row.src_airport_longitude), axis=1)

In [14]:
def determine_location(src_airport_geohash):
    locations=dict(
    central= pygeohash.encode(41.1544433, -96.0422378),
    east= pygeohash.encode(39.08344, -77.6497145),
    west= pygeohash.encode(45.5945645, -121.1786823))
    
    
    
    distances= []
    for location, geohash in locations.items():
        haversine= pygeohash.geohash_haversine_distance(src_airport_geohash, geohash)
        distances.append(tuple((haversine, location)))
    
    distances.sort()
    return distances[0][1]

df['location'] = df['src_airport_geohash'].apply(determine_location)
df.to_parquet('results/geo', partition_cols=['location'])

Assignment 7.1 D

In [15]:
keys= df['hashed'].to_list()

In [16]:
import numpy as np

In [17]:
import math

def balance_partitions(keys, num_partitions):
    n = math.ceil(len(keys)/num_partitions)

    for x in range(0, len(keys), n):
        each_set = keys[x: n+x]

        if len(each_set) < n:
            each_set = each_set + [None for y in range(n-len(each_set))]
        print(each_set)

In [23]:
keys=[1,2,3,'b','c','a','d','m','z','p','q',6,7,8,9,10,11,12,13,14]
num_partitions=10

In [24]:
balance_partitions(keys,num_partitions)

[1, 2]
[3, 'b']
['c', 'a']
['d', 'm']
['z', 'p']
['q', 6]
[7, 8]
[9, 10]
[11, 12]
[13, 14]
