# Assignment 3

Import libraries and define common helper functions

In [1]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError

In [2]:
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow.json import read_json

In [3]:
current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)

In [4]:
def read_jsonl_data(src_data_path):
    with open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        
    return records

In [5]:
src_data_path = 'C:/Users/bibek/Documents/GitHub/dsc650/data/processed/openflights/routes.jsonl.gz'
records = read_jsonl_data(src_data_path)

## 3.1

### 3.1.a JSON Schema

In [6]:
import genson
from genson import SchemaBuilder

builder = SchemaBuilder()
builder.add_object(records[0])
schema = builder.to_schema()

schema_path = schema_dir.joinpath('routes-schema.json')
with open(schema_path, "w") as f:
    json.dump(schema, f, indent = 4)

In [7]:
from jsonschema import validate
def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    with open(schema_path) as f:
        schema = json.load(f)
    
    validation_csv_path = results_dir.joinpath('validation-results.csv')
    fields = ["record Number", "Validity"]
    with open(validation_csv_path, 'w', newline='') as f:
        # creating a csv writer object 
        csvwriter = csv.writer(f) 
        
        # writing the fields 
        csvwriter.writerow(fields)
        row = []
        for i, record in enumerate(records):
            try:
                validate(instance=record, schema=schema)
                row.append([i, "valid"])
                pass
            except ValidationError as e:
                ## Print message if invalid record in csv file
                row.append([i, "invalid"])
                pass
         
        csvwriter.writerows(row)



In [8]:
validate_jsonl_data(records)

### 3.1.b Avro

In [9]:
from fastavro import writer, reader, parse_schema
from fastavro.schema import load_schema
def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')
    schema = load_schema(schema_path)
    
    # Writing
    with open(data_path, 'wb') as out:
        writer(out, schema, records)

In [10]:
create_avro_dataset(records)

### 3.1.c Parquet

In [11]:
def create_parquet_dataset(src_data_path):  
    parquet_path = results_dir.joinpath('routes.parquet')
    with open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            pass
            table = read_json(f) 
            pq.write_table(table, parquet_path)  # save json/table as parquet dataset

In [12]:
create_parquet_dataset(src_data_path)

### 3.1.d Protocol Buffers

In [13]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

In [14]:
def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj


def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()

    if airline is None:
        return obj
    if airline.get("airline_id") is None:
        return obj
    
    obj.airline_id = airline.get('airline_id')
    if airline.get('name'):
        obj.name = airline.get('name')
    if airline.get('alias'):
        obj.alias = airline.get('alias')
    if airline.get('iata'):
        obj.iata = airline.get('iata')
    if airline.get('icao'):
        obj.icao = airline.get('icao')
    if airline.get('callsign'):
        obj.callsign = airline.get('callsign')
    if airline.get('country'):
        obj.country = airline.get('country')
    
    obj.active = airline.get('active')
        
    return obj

In [15]:
def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        route.codeshare = record.get('codeshare') #required from .proto file
        
        airline = _airline_to_proto_obj(record.get('airline'))
        if airline:
            route.airline.CopyFrom(airline)
        
        src_airport = _airport_to_proto_obj(record.get('src_airport'))
        if src_airport:
            route.src_airport.CopyFrom(src_airport)
            
        dst_airport = _airport_to_proto_obj(record.get('dst_airport'))
        if dst_airport:
            route.dst_airport.CopyFrom(dst_airport)
            
        equipment = record.get('equipment')
        route.equipment.extend(equipment)     
        
        routes.route.append(route)

    data_path = results_dir.joinpath('routes.pb')
    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
    
    #compressed_path = results_dir.joinpath('routes.pb.snappy')
    #with open(compressed_path, 'wb') as f:
        #f.write(snappy.compress(routes.SerializeToString()))

In [16]:
create_protobuf_dataset(records)

### 3.1.e Output Sizes

In [18]:
comparison_csv_path = results_dir.joinpath('comparison.csv')

files = []
for (dirpath, dirnames, filenames) in os.walk(results_dir):
    files.extend(filenames)
    break
files.pop(0)

fields = ['file_name', 'size_MB']
with open(comparison_csv_path, 'w', newline='') as file:
    # creating a csv writer object 
    csvwriter = csv.writer(file) 
    # writing the fields 
    csvwriter.writerow(fields)
    row = []
    for i in range(len(files)):
        path = results_dir.joinpath(files[i])
        file_size = os.path.getsize(path)/(1024*1024)
        print("'{}' Size is : {} MB".format(files[i],file_size))
        row.append([files[i], "{} MB".format(file_size)])
    
    csvwriter.writerows(row)
    

'routes.avro' Size is : 18.73610210418701 MB
'routes.parquet' Size is : 1.8192329406738281 MB
'routes.pb' Size is : 21.479753494262695 MB
'validation-results.csv' Size is : 0.8299989700317383 MB


## 3.2

### 3.2.a Simple Geohash Index

In [19]:
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    
    hashes = []
    for record in records:
        
        #getting src airport from records
        src_airport = record.get('src_airport')
        if src_airport:
            latitude = src_airport.get('latitude')
            longitude = src_airport.get('longitude')
            if latitude and longitude:
                
                # use pygeohash.encode() to assign geohashes to the records and complete the hashes list
                geo_hash = pygeohash.encode(latitude, longitude)
                hashes.append(geo_hash)
                i = records.index(record)
                records[i]['geohash'] = geo_hash #assigning geohash to each record
                   
    
    hashes.sort()
    three_letter = sorted(list(set([entry[:3] for entry in hashes])))
    
    # dictionary with geohash "first" 3 letters and record as a value
    hash_index = {value: [] for value in three_letter} 
    for record in records:
        geohash = record.get('geohash')
        if geohash:
            hash_index[geohash[:3]].append(record)
    
    for key, values in hash_index.items():
        #creating zip files for geohashes
        output_dir = geoindex_dir.joinpath(str(key[:1])).joinpath(str(key[:2]))
        output_dir.mkdir(exist_ok=True, parents=True)
        output_path = output_dir.joinpath('{}.jsonl.gz'.format(key))
        with gzip.open(output_path, 'w') as f:
            json_output = '\n'.join([json.dumps(value) for value in values])
            f.write(json_output.encode('utf-8'))
            
create_hash_dirs(records)

### 3.2.b Simple Search Feature

In [39]:
import pygeohash
def airport_search(latitude, longitude):
    geohash = pygeohash.encode(latitude, longitude)
    try:
        search_dir = results_dir.joinpath('geoindex').joinpath(geohash[0]).joinpath(geohash[0:2])
        filename = search_dir.joinpath('{}.jsonl.gz'.format(geohash[:3]))
        records = read_jsonl_data(filename)
        Airport = {}
        UniqueHash = [] #uniquehash list for optimal solution
        Distance = [] #list for all nearest airports
        index_dist = {} # dictionary for record index and distance of airport
        for record in records:
            i = records.index(record)
            hash1 = records[i]['geohash']
            if hash1[:3] == geohash[:3]:
                if hash1 not in UniqueHash:
                    UniqueHash.append(hash1)
                    d = pygeohash.geohash_approximate_distance(hash1, geohash)/1000
                    index_dist[i] = d
                    Distance.append(d)
        
        for k, v in index_dist.items():
            if v == min(Distance): #finds minimum distance
                name = records[int(k)]['src_airport']['name']
                Airport[name] = '{} km'.format(v)
        print("Nearest Airport: {}".format(Airport))
    
    except FileNotFoundError:
        print("Airports Not Found")
                    

In [40]:
airport_search(41.1499988, -95.91779)

Nearest Airport: {'Eppley Airfield': '19.545 km'}


In [23]:
airport_search(38.5565, -95.91779)

Nearest Airport: {'Manhattan Regional Airport': '123.264 km', 'Topeka Regional Airport - Forbes Field': '123.264 km'}


In [35]:
airport_search(32.6549, -95.91779)

Nearest Airport: {'Dallas Love Field': '123.264 km'}


In [38]:
airport_search(36.6549, -97.91779)

Nearest Airport: {'Wichita Eisenhower National Airport': '123.264 km'}
