# Assignment 3

Import libraries and define common helper functions

In [1]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv
import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError
import genson #This is for the creation of the schema


endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)

#editing the function to use the github resource as the original function wasn't working
def read_jsonl_data():
    src_data_path = '../../../data/processed/openflights/routes.jsonl.gz'
    with gzip.open(src_data_path, 'rb') as f:
        records = [json.loads(line) for line in f.readlines()]
    return records

Load the records from https://storage.budsc.midwest-datascience.com/data/processed/openflights/routes.jsonl.gz 

In [2]:
records = read_jsonl_data()

In [3]:
#want to see what the data actually looks like
records[10]

{'airline': {'airline_id': 410,
  'name': 'Aerocondor',
  'alias': 'ANA All Nippon Airways',
  'iata': '2B',
  'icao': 'ARD',
  'callsign': 'AEROCONDOR',
  'country': 'Portugal',
  'active': True},
 'src_airport': {'airport_id': 6156,
  'name': 'Belgorod International Airport',
  'city': 'Belgorod',
  'country': 'Russia',
  'iata': 'EGO',
  'icao': 'UUOB',
  'latitude': 50.643798828125,
  'longitude': 36.5900993347168,
  'altitude': 735,
  'timezone': 3.0,
  'dst': 'N',
  'tz_id': 'Europe/Moscow',
  'type': 'airport',
  'source': 'OurAirports'},
 'dst_airport': {'airport_id': 2990,
  'name': 'Kazan International Airport',
  'city': 'Kazan',
  'country': 'Russia',
  'iata': 'KZN',
  'icao': 'UWKD',
  'latitude': 55.606201171875,
  'longitude': 49.278701782227,
  'altitude': 411,
  'timezone': 3.0,
  'dst': 'N',
  'tz_id': 'Europe/Moscow',
  'type': 'airport',
  'source': 'OurAirports'},
 'codeshare': False,
 'equipment': ['CR2']}

## 3.1

### 3.1.a JSON Schema

In [4]:
pip install genson

Note: you may need to restart the kernel to use updated packages.


In [5]:
import genson #This is for the creation of the schema

In [6]:
def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    with open(schema_path) as file:
        schema = json.load(file)
    #creating txt file
    valCSV = "validation.txt"
    with open(valCSV, 'w') as file:    
        for i, record in enumerate(records):
            try:
                ## TODO: Validate record 
                jsonschema.validate(record, schema)
                pass
            except ValidationError as e:
                ## Print message if invalid record
                file.write(f"Record {i} has failed. Please review.: {str(e)}\n")
                pass

    # Putting the actual records into the designated file path
    schema = genson.Schema()
    
    for x in records:
        schema.add_object(x)
    
    # Creating the dictionary for JSON
    jsonLook = schema.to_dict()

    with open(schema_path, 'w') as file:
        json.dump(jsonLook, file, indent=2)

validate_jsonl_data(records)

### 3.1.b Avro

In [9]:
def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')

 

    with open(schema_path) as f:
        schema = json.load(f)

 

    with open(data_path, 'wb') as avro_file:
        fastavro.writer(avro_file, schema, records)

### 3.1.c Parquet

In [10]:
def create_parquet_dataset():
    src_data_path = '../../../data/processed/openflights/routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
    
    # s3 wasn't working for me so i went back to a previous semester's channel and found this workaround
    with open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:

            # Iterate through and save the records one by one
            #Initializing the blank list
            record_batches = []
            
            #Here's where the actual iteration is happening
            for line in f:
                record = json.loads(line)
                record_batch = pa.record_batch([pa.array([json.loads(line) for line in f.readlines()])], names=['Flights'])
                record_batches.append(record_batch)
            
            # Collect all the record batches into the table.
            table = pa.Table.from_batches(record_batches)
            
    # Write the Parquet table to a file
    pq.write_table(table, str(parquet_output_path))

create_parquet_dataset()

### 3.1.d Protocol Buffers

In [11]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj


def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    #Got stuck here -- also found this in a previous channel. Worked hard to finally understand though!
    if not airline.get('name'):
        return None
    if not airline.get('airline_id'):
        return None
    
    #actually grabbing the airline id
    obj.airline_id = airline.get('airline_id')
    #grabbing t he name
    obj.name = airline.get('name')
    #checking for an alias
    if airline.get('alias'):
        obj.alias = airline.get('alias')
    #putting the rest of the attributes in there
    if airline.get('iata'):
        obj.iata = airline.get('iata')
    if airline.get('icao'):
        obj.icao = airline.get('icao')
    if airline.get('callsign'):
        obj.callsign = airline.get('callsign')
    if airline.get('country'):
        obj.country = airline.get('country')
    
    # if not active set to false
    if airline.get('active'):
        obj.active = airline.get('active')
    else:
        obj.active = False
    
    return obj


def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        ## TODO: Implement the code to create the Protocol Buffers Dataset
        # Professor's code from previous semester
        airline = _airline_to_proto_obj(record.get('airline', {}))
        if airline:
            route.airline.CopyFrom(airline)
        src_airport = _airport_to_proto_obj(record.get('src_airport', {}))
        
        
        # get the remaining pieces in there
        if record.get('dst_airport'):
            route.dst_airport.CopyFrom(_airport_to_proto_obj(record["dst_airport"]))
        if 'codeshare' in record and record['codeshare'] is not None:
            route.codeshare = record['codeshare']
        else:
            route.codeshare = False

        if record.get('stops'):
            route.stops = record["stops"]
        if record.get('equipment'):
            route.equipment.append("equipment")
        
        routes.route.append(route)

    data_path = results_dir.joinpath('routes.pb')

    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
        
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))
        
create_protobuf_dataset(records)

## 3.2

### 3.2.a Simple Geohash Index

In [13]:
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    #initializing the blank list
    hashes = []
    ## TODO: Create hash index
    for x in records:
        src_airport = x.get('src_airport', {})
        if src_airport:
            lat = src_airport.get('latitude')
            long = src_airport.get('longitude')
            if lat and long:
                # .encode to give the geohashes
                geo = pygeohash.encode(lat, long)
                x['geohash'] = geo
                #appending the results into the blank list
                hashes.append(geo)
    #sorting the list            
    hashes.sort()
    #sorting the list and cutting to only show first 3
    three = sorted(list(set([entry[:3] for entry in hashes])))
    hash_index = {value: [] for value in three}
    for x in records:
        geohash = x.get('geohash')
        if geohash:
            hash_index[geohash[:3]].append(x)
    for k, v in hash_index.items():
        output_dir = geoindex_dir.joinpath(str(k[:1])).joinpath(str(k[:2]))
        output_dir.mkdir(exist_ok=True, parents=True)
        output_path = output_dir.joinpath('{}.jsonl.gz'.format(k))
        with gzip.open(output_path, 'w') as file:
            json_output = '\n'.join([json.dumps(value) for value in v])
            file.write(json_output.encode('utf-8')) #utf encoding so it doesn't break
            
create_hash_dirs(records)

### 3.2.b Simple Search Feature

In [16]:
def airport_search(latitude, longitude):
    
    # location
    location_temp = pygeohash.encode(latitude, longitude)
    
    # Pulling in the records
    records = read_jsonl_data()
    
    # setting distance and initializing blank dictionary
    sDist = 10000000
    sRecord = {}
    
    # Loop over the records to find the nearest one based on lat and long.
    for x in records:
        if x.get('src_airport'):
            temp_loc = pygeohash.encode(x["src_airport"]["latitude"], x["src_airport"]["longitude"])
            if pygeohash.geohash_approximate_distance(location_temp, temp_loc, check_validity=False) < sDist:
                sDist = pygeohash.geohash_approximate_distance(location_temp, temp_loc, check_validity=False)
                sRecord = x

    # Message for testing
    print(f"Closest airport: {sRecord['src_airport']['name']},  Distance: {round(sDist/1000,2)} Kilometers")

# Test the airport search function.
airport_search(41.1499988, -95.91779)

Closest airport: Eppley Airfield,  Distance: 19.55 Kilometers
