# Assignment 3

Import libraries and define common helper functions

In [1]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
from fastavro import writer, reader, parse_schema
from fastavro.schema import load_schema
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError


endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)


def read_jsonl_data():
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        

    return records

Load the records from https://storage.budsc.midwest-datascience.com/data/processed/openflights/routes.jsonl.gz 

In [2]:
records = read_jsonl_data()
records[0:1]

[{'airline': {'airline_id': 410,
   'name': 'Aerocondor',
   'alias': 'ANA All Nippon Airways',
   'iata': '2B',
   'icao': 'ARD',
   'callsign': 'AEROCONDOR',
   'country': 'Portugal',
   'active': True},
  'src_airport': {'airport_id': 2965,
   'name': 'Sochi International Airport',
   'city': 'Sochi',
   'country': 'Russia',
   'iata': 'AER',
   'icao': 'URSS',
   'latitude': 43.449902,
   'longitude': 39.9566,
   'altitude': 89,
   'timezone': 3.0,
   'dst': 'N',
   'tz_id': 'Europe/Moscow',
   'type': 'airport',
   'source': 'OurAirports'},
  'dst_airport': {'airport_id': 2990,
   'name': 'Kazan International Airport',
   'city': 'Kazan',
   'country': 'Russia',
   'iata': 'KZN',
   'icao': 'UWKD',
   'latitude': 55.606201171875,
   'longitude': 49.278701782227,
   'altitude': 411,
   'timezone': 3.0,
   'dst': 'N',
   'tz_id': 'Europe/Moscow',
   'type': 'airport',
   'source': 'OurAirports'},
  'codeshare': False,
  'equipment': ['CR2']}]

## 3.1

### 3.1.a JSON Schema

In [4]:
# Created schema based off of record printed above.
# Used https://json-schema.org/understanding-json-schema/reference/numeric.html to figure out number
def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    with open(schema_path) as f:
        schema = json.load(f)
    
    validation_csv_path = results_dir.joinpath('validation-results.csv')
    with open(validation_csv_path, 'w') as f:
        for i, record in enumerate(records):
            try:
                jsonschema.validate(instance = records[1], schema = schema)
                pass
            except ValidationError as e:
                print('exception')
                traceback.print_exc()
                print('An exception of type {0} occurred.'.format(type(exception).__name__, exception.args));
            # Used code that I was taught to check for errors. Not sure I'm doing this correctly as my validation file is 
            # empty. Sam was telling me I needed to do something with writer.writerow but I couldn't make it work correctly.

validate_jsonl_data(records)

### 3.1.b Avro

In [13]:
def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')
    parsed_schema = load_schema(schema_path)
    #Had a hard time with parse_schema so used load_schema instead https://fastavro.readthedocs.io/en/latest/schema.html
    with open(data_path, 'wb') as out:
        writer(out, parsed_schema, records)
    
        
create_avro_dataset(records)

In [23]:
# Mimicking what Adam Curry did from Teams to check if it loaded correctly
import fastavro
data_path = results_dir.joinpath('routes.avro')
with open(data_path, mode = 'rb') as fp:
    reader = fastavro.reader(fp)
    records = [r for r in reader]
    df = pd.DataFrame.from_records(records)
    print(df.head(1))

                                             airline  \
0  {'airline_id': 410, 'name': 'Aerocondor', 'ali...   

                                         src_airport  \
0  {'airport_id': 2965, 'name': 'Sochi Internatio...   

                                         dst_airport  codeshare  stops  \
0  {'airport_id': 2990, 'name': 'Kazan Internatio...      False      0   

  equipment  
0     [CR2]  


### 3.1.c Parquet

In [6]:
def create_parquet_dataset():
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            pass
            ## TODO: Use Apache Arrow to create Parquet table and save the dataset
            table = read_json(f)
    pq.write_table(table, parquet_output_path)
    
create_parquet_dataset()

# Jolene helped me with this part. I had the above, but no good way to check that it worked without opening the file itself.
# Which wasn't possible on here. I downloaded it, and opened in notepad but it looked like a bunch of characters.
parquet_output_path = results_dir.joinpath('routes.parquet')

pq_f = pq.ParquetFile(parquet_output_path)
pq_f.metadata

<pyarrow._parquet.FileMetaData object at 0x7f96019154a0>
  created_by: parquet-cpp version 1.5.1-SNAPSHOT
  num_columns: 38
  num_rows: 67663
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 7571

In [7]:
# Also provided by Jolene. This was nice that I could see everything here. I definitely need to work on being able to add
# this to my own code on 
pq_f.schema

<pyarrow._parquet.ParquetSchema object at 0x7f96019daa00>
required group field_id=0 schema {
  optional group field_id=1 airline {
    optional int64 field_id=2 airline_id;
    optional binary field_id=3 name (String);
    optional binary field_id=4 alias (String);
    optional binary field_id=5 iata (String);
    optional binary field_id=6 icao (String);
    optional binary field_id=7 callsign (String);
    optional binary field_id=8 country (String);
    optional boolean field_id=9 active;
  }
  optional group field_id=10 src_airport {
    optional int64 field_id=11 airport_id;
    optional binary field_id=12 name (String);
    optional binary field_id=13 city (String);
    optional binary field_id=14 country (String);
    optional binary field_id=15 iata (String);
    optional binary field_id=16 icao (String);
    optional double field_id=17 latitude;
    optional double field_id=18 longitude;
    optional int64 field_id=19 altitude;
    optional double field_id=20 timezone;
    opt

### 3.1.d Protocol Buffers

In [23]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj


def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    if not airline.get('name'):
        return None
    if not airline.get('airline_id'):
        return None
    if not airline.get('active'):
        return None
    
    obj.airline_id = airline.get('airline_id')
    obj.name = airline.get('name')
    # Got the code from professor hints, but changed name to mimic the alias and then expanded for the other parts. Going back 
    # to the schema and record I printed above it breaks it down like this under airline.
    if airline.get('name'):
        obj.name = airline.get('name')
    if airline.get('alias'):
        obj.alias = airline.get('alias')
    if airline.get('iata'):
        obj.iata = airline.get('iata')
    if airline.get('icao'):
        obj.icao = airline.get('icao')
    if airline.get('callsign'):
        obj.callsign = airline.get('callsign')
    if airline.get('country'):
        obj.country = airline.get('country')
    if airline.get('active'):
        obj.active = airline.get('active')
        
    ## TODO: Create an Airline obj using Protocol Buffers API
    return obj


def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        airline = _airline_to_proto_obj(record.get('airline', {}))
        if airline:
            route.airline.CopyFrom(airline)
        src_airport = _airport_to_proto_obj(record.get('src_airport', {}))
        if src_airport:
            route.src_airport.CopyFrom(src_airport)
        dst_airport = _airport_to_proto_obj(record.get('dst_airport', {}))
        if dst_airport:
            route.dst_airport.CopyFrom(dst_airport)
        route.codeshare = record.get('codeshare') # Sam provided this tidbit in the chat
        routes.route.append(route) 
    data_path = results_dir.joinpath('routes.pb')
    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))   

create_protobuf_dataset(records)

## 3.1.e

In [31]:
# Compare the output sizes of the different formats

# Defining path to files
j_schema_path = schema_dir.joinpath('routes-schema.json')
a_schema_path = schema_dir.joinpath('routes.avsc')
par_path = results_dir.joinpath('routes.parquet')
p_schema_path = schema_dir.joinpath('routes.proto')

# Getting size for the paths defined above
j_schema_size = os.path.getsize(j_schema_path)
a_schema_size = os.path.getsize(a_schema_path)
par_size = os.path.getsize(par_path)
p_schema_size = os.path.getsize(p_schema_path)

# Create a dataframe out of the sizes
df = pd.DataFrame({'JSON Schema': [j_schema_size], 'Avro': [a_schema_size], 'Parquet': [par_size], 'Protocol Buffer': [p_schema_size]})
df

Unnamed: 0,JSON Schema,Avro,Parquet,Protocol Buffer
0,3385,3191,1975469,1073


In [34]:
# Append comparison file with dataframe
compare = results_dir.joinpath('comparison.csv')
with open (compare, 'w') as f:
    df.to_csv(f, header = True)

## 3.2

### 3.2.a Simple Geohash Index

In [6]:
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    for record in records:
        src_airport = record.get('src_airport', {})
        if src_airport:
            latitude = src_airport.get('latitude')
            longitude = src_airport.get('longitude')
            if latitude and longitude:
                h = pygeohash.encode(latitude,longitude)
                record['geohash'] = h
                hashes.append(h)
                #todo: use pygeohash.encode() to assign geohashes to the rcords and complete the hashes list
    # Provided by professor on 3/31
    hashes.sort()
    three_letter = sorted(list(set([entry[:3] for entry in hashes])))
    hash_index = {value: [] for value in three_letter}
    
    for record in records:
        geohash = record.get('geohash')
        if geohash:
            hash_index[geohash[:3]].append(record)
    
    for key, values in hash_index.items():
        output_dir = geoindex_dir.joinpath(str(key[:1])).joinpath(str(key[:2]))
        output_dir.mkdir(exist_ok=True, parents=True)
        output_path = output_dir.joinpath('{}.jsonl.gz'.format(key))
        with gzip.open(output_path, 'w') as f:
            json_output = '\n'.join([json.dumps(value) for value in values])
            f.write(json_output.encode('utf-8'))
    
            
create_hash_dirs(records)

### 3.2.b Simple Search Feature

In [7]:
def airport_search(latitude, longitude):
    h = pygeohash.encode(latitude,longitude)
    dist = 0
    name = ''
    for i, record in enumerate(records):
        src_airport = record.get('src_airport', {})
        if src_airport:
            latitude2 = src_airport.get('latitude')
            longitude2 = src_airport.get('longitude')
            air_name = src_airport.get('name')
            if latitude2 and longitude2:
                h2 = pygeohash.encode(latitude2, longitude2)
                dist_n = pygeohash.geohash_approximate_distance(h, h2)
                if i == 0:
                    dist = dist_n
                else:
                    if dist>dist_n:
                        dist = dist_n
                        name = air_name
    ## TODO: Create simple search to return nearest airport
    print(name)
    
airport_search(41.1499988, -95.91779)

Eppley Airfield
