# Assignment 3

Import libraries and define common helper functions

In [1]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError


endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)


def read_jsonl_data():
    src_data_path = '../../../data/processed/openflights/routes.jsonl.gz'
    with gzip.open(src_data_path, 'rb') as f:
        records = [json.loads(line) for line in f.readlines()]
    return records



'''def read_jsonl_data():
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        

    return records'''

"def read_jsonl_data():\n    s3 = s3fs.S3FileSystem(\n        anon=True,\n        client_kwargs={\n            'endpoint_url': endpoint_url\n        }\n    )\n    src_data_path = 'data/processed/openflights/routes.jsonl.gz'\n    with s3.open(src_data_path, 'rb') as f_gz:\n        with gzip.open(f_gz, 'rb') as f:\n            records = [json.loads(line) for line in f.readlines()]\n        \n\n    return records"

Load the records from https://storage.budsc.midwest-datascience.com/data/processed/openflights/routes.jsonl.gz 

In [2]:
records = read_jsonl_data()


## 3.1

### 3.1.a JSON Schema

In [5]:
from genson import SchemaBuilder

schema_path = schema_dir.joinpath('routes-schema.json')
builder = SchemaBuilder()
builder.add_schema({"type": "object", "properties": {}})
builder.add_object(records)
builder.to_schema()

print(builder.to_json(indent=2))

def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    
    with open(schema_path) as f:
        schema = json.load(f)
        
    with open(validation_csv_path, 'w') as f:    
        for i, record in enumerate(records):
            try:
                json.loads(records) 
                pass
            except ValidationError as e:
                print("Given JSON string is invalid: ",e)
                pass
            

validate_jsonl_data(records)

{
  "$schema": "http://json-schema.org/schema#",
  "anyOf": [
    {
      "type": "object"
    },
    {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "airline": {
            "type": "object",
            "properties": {
              "airline_id": {
                "type": "integer"
              },
              "name": {
                "type": "string"
              },
              "alias": {
                "type": "string"
              },
              "iata": {
                "type": "string"
              },
              "icao": {
                "type": "string"
              },
              "callsign": {
                "type": "string"
              },
              "country": {
                "type": "string"
              },
              "active": {
                "type": "boolean"
              }
            },
            "required": [
              "active",
              "airline_id",
              "alias",


NameError: name 'validation_csv_path' is not defined

### 3.1.b Avro

In [7]:
def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')

from fastavro import writer, reader, parse_schema

for schema in schema_path:
    parsed_schema = parse_schema(schema_path)

# Writing
with open('avro_data', 'wb') as out:
    writer(out, parsed_schema, data_path)

create_avro_dataset(records)

TypeError: 'PosixPath' object is not iterable

### 3.1.c Parquet

In [13]:
def create_parquet_dataset():
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            pass
        
pq.write_table(f_gz, 'example.parquet')

import pyarrow.parquet as pq

create_parquet_dataset()

NameError: name 'f_gz' is not defined

### 3.1.d Protocol Buffers

In [14]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj


def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    if not airline.get('name'):
        return None
    if not airline.get('airline_id'):
        return None
    
    obj.airline_id = airline.get('airline_id')
    obj.name = airline.get('name')
    
    if airline.get('alias'):
        obj.alias = airline.get('alias')
 
    if airline.get('iata'):
        obj.alias = airline.get('iata')

    if airline.get('icao'):
        obj.alias = airline.get('icao')
        
    if airline.get('callsign'):
        obj.alias = airline.get('callsign')
        
    if airline.get('country'):
        obj.alias = airline.get('country')        
        
    if airline.get('active'):
        obj.alias = airline.get('active')
        
    return obj


def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        airline = _airline_to_proto_obj(record.get('airline', {}))
        if airline:
            route.airline.CopyFrom(airline)
        dst_airport = _airport_to_proto_obj(record.get('dst_airport', {}))
        if dst_airport:
            route.dst_airport.CopyFrom(airdst_airportline)
        src_airport = _airport_to_proto_obj(record.get('src_airport', {}))
        if src_airport:
            route.src_airport.CopyFrom(src_airport)
        codeshare = _airport_to_proto_obj(record.get('codeshare', {}))
        if codeshare:
            route.codeshare.CopyFrom(codeshare)
        stops = _airport_to_proto_obj(record.get('stops', {}))
        if stops:
            route.stops.CopyFrom(stops)        
        equipment = _airport_to_proto_obj(record.get('equipment', {}))
        if equipment:
            route.equipment.CopyFrom(equipment)
    
        routes.route.append(route)

    data_path = results_dir.joinpath('routes.pb')

    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
        
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))
        
create_protobuf_dataset(records)

TypeError: True has type bool, but expected one of: bytes, unicode

## 3.2

### 3.2.a Simple Geohash Index

In [15]:
import pygeohash as pgh

def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    for record in records:
        src_airport = record.get('src_airport', {})
        if src_airport:
            latitude = src_airport.get('latitude')
            longitude = src_airport.get('longitude')
            if latitude and longitude:
                
                pgh.encode(latitude, longitude)
    
    hashes.sort()
    three_letter = sorted(list(set([entry[:3] for entry in hashes])))
    hash_index = {value: [] for value in three_letter}
    for record in records:
        geohash = record.get('geohash')
        if geohash:
            hash_index[geohash[:3]].append(record)
    for key, values in hash_index.items():
        output_dir = geoindex_dir.joinpath(str(key[:1])).joinpath(str(key[:2]))
        output_dir.mkdir(exist_ok=True, parents=True)
        output_path = output_dir.joinpath('{}.jsonl.gz'.format(key))
        with gzip.open(output_path, 'w') as f:
            json_output = '\n'.join([json.dumps(value) for value in values])
            f.write(json_output.encode('utf-8'))

### 3.2.b Simple Search Feature

In [20]:
def airport_search(latitude, longitude):
    ## TODO: Create simple search to return nearest airport
    a=pgh.encode(latitude, longitude)
    for hash in a:
        pgh.geohash_approximate_distance(a, hash)
        src_airport = hash.get('src_airport', {})
    return src_airport
    pass
    
airport_search(41.1499988, -95.91779)

AttributeError: 'str' object has no attribute 'get'