### Assignment 3.1

In [11]:
import sys
sys.path.insert(0, r'C:\Users\chris\MyNewJNFolder\dsc650\dsc650\assignments\assignment03\schemas')

import routes_pb2

In [39]:
import os
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import fastavro
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError

import routes_pb2

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('dsc650/dsc650/assignments/assignment03/schemas')
results_dir = current_dir.joinpath('dsc650/dsc650/assignments/assignment03/results')
results_dir.mkdir(parents=True, exist_ok=True)


def read_jsonl_data():
    src_data_path = 'dsc650/data/processed/openflights/routes.jsonl.gz'
    with gzip.open(src_data_path, 'rb') as f:
        records = [json.loads(line) for line in f]

    return records


def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    with open(schema_path) as f:
        schema = json.load(f)

    validation_csv_path = results_dir.joinpath('validation.csv')

    with open(validation_csv_path, 'w') as f:
        writer = csv.writer(f)
        writer.writerow(['record_index', 'is_valid', 'validation_errors'])

        for i, record in enumerate(records):
            try:
                jsonschema.validate(record, schema)
                writer.writerow([i, True, ''])
            except ValidationError as e:
                writer.writerow([i, False, str(e)])


def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')

    with open(schema_path) as f:
        schema = json.load(f)

    with open(data_path, 'wb') as avro_file:
        fastavro.writer(avro_file, schema, records)


def create_parquet_dataset(records):
    schema_path = schema_dir.joinpath('routes.parquet')
    parquet_output_path = results_dir.joinpath('routes.parquet')

    df = pd.DataFrame(records)

    # Load Parquet schema
    parquet_schema = pa.Schema.from_pandas(df)

    table = pa.Table.from_pandas(df, schema=parquet_schema)

    pq.write_table(table, parquet_output_path)


def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    if airline is None:
        return None
    if airline.get('airline_id') is None:
        return None

    obj.airline_id = airline.get('airline_id')
    if airline.get('name'):
        obj.name = airline.get('name')
    if airline.get('alias'):
        obj.alias = airline.get('alias')
    if airline.get('iata'):
        obj.iata = airline.get('iata')
    if airline.get('icao'):
        obj.icao = airline.get('icao')
    if airline.get('callsign'):
        obj.callsign = airline.get('callsign')
    if airline.get('country'):
        obj.country = airline.get('country')

    # Set the default value for the 'active' field
    obj.active = airline.get('active', False)

    return obj



def _airport_to_proto_obj(airport):
    if airport is None:
        return None
    
    obj = routes_pb2.Airport()
    obj.airport_id = airport.get('airport_id', -1)
    obj.name = airport.get('name', 'NONE')
    obj.city = airport.get('city', 'NONE')
    obj.iata = airport.get('iata', 'NONE')
    obj.icao = airport.get('icao', 'NONE')
    obj.altitude = airport.get('altitude', 0)
    obj.timezone = airport.get('timezone', 0)
    obj.dst = airport.get('dst', 'NONE')
    obj.tz_id = airport.get('tz_id', 'NONE')
    obj.type = airport.get('type', 'NONE')
    obj.source = airport.get('source', 'NONE')
    obj.latitude = airport.get('latitude', 0.0)
    obj.longitude = airport.get('longitude', 0.0)

    return obj



def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        airline = record['airline']
        src_airport = record['src_airport']
        dst_airport = record.get('dst_airport')
        codeshare = record['codeshare']
        stops = record.get('stops', 0)  # Assign 0 if stops value is missing
        equipment = record['equipment']

        route.airline.MergeFrom(_airline_to_proto_obj(airline))

        if src_airport is not None:
            route.src_airport.MergeFrom(_airport_to_proto_obj(src_airport))

        if dst_airport is not None:
            route.dst_airport.MergeFrom(_airport_to_proto_obj(dst_airport))

        route.codeshare = codeshare
        route.stops = stops
        route.equipment.extend(equipment)

        routes.route.append(route)


    data_path = results_dir.joinpath('routes.pb')

    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())

    compressed_path = results_dir.joinpath('routes.pb.snappy')

    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))


def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    # TODO: Create hash index


def airport_search(latitude, longitude):
    # TODO: Create simple search to return nearest airport
    pass


# Read the JSONL data
records = read_jsonl_data()

# Validate the JSONL data against the JSON Schema
validate_jsonl_data(records)

# Creating the Avro dataset
create_avro_dataset(records)

# Creating the Parquet dataset
create_parquet_dataset(records)

# Creating the Protocol Buffers dataset
create_protobuf_dataset(records)

# Creating the hash index directories
create_hash_dirs(records)

# Perform airport search
airport_search(41.1499988, -95.91779)


### Assignment 3.2

In [31]:
pip install pygeohash

Note: you may need to restart the kernel to use updated packages.


In [40]:
import os
import sys
import gzip
import json
from pathlib import Path
import pygeohash

# Function to create the hash directories
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    hash_index = {}

    for record in records:
        src_airport = record.get('src_airport', {})

        if src_airport:
            latitude = src_airport.get('latitude')
            longitude = src_airport.get('longitude')

            if latitude and longitude:
                geohash = pygeohash.encode(latitude, longitude, precision=6)
                record['geohash'] = geohash
                hashes.append(geohash)

    hashes.sort()
    three_letter = sorted(list(set([entry[:3] for entry in hashes])))
    hash_index = {value: [] for value in three_letter}

    for record in records:
        geohash = record.get('geohash')

        if geohash:
            hash_index[geohash[:3]].append(record)

    return hash_index

# Function to get neighboring geohashes
def get_neighbors(geohash):
    neighbors = []
    directions = {
        'n': (0, 1),
        'ne': (1, 1),
        'e': (1, 0),
        'se': (1, -1),
        's': (0, -1),
        'sw': (-1, -1),
        'w': (-1, 0),
        'nw': (-1, 1)
    }

    for direction in directions:
        dx, dy = directions[direction]
        lat, lon = pygeohash.decode(geohash)
        neighbor = pygeohash.encode(lat + dx, lon + dy, precision=6)
        neighbors.append(neighbor)

    return neighbors

def airport_search(latitude, longitude, distance_km, hash_index, max_results=10):
    geohash = pygeohash.encode(latitude, longitude, precision=6)
    neighbors = get_neighbors(geohash)
    nearby_airports = []
    num_results = 0

    for neighbor in neighbors:
        airports = hash_index.get(neighbor[:3], [])
        nearby_airports.extend(airports)
        num_results += len(airports)

        if num_results >= max_results:
            break

    return nearby_airports[:max_results]


# Perform the airport search
latitude = 47.6062
longitude = -122.3321
distance_km = 100
hash_index = create_hash_dirs(records)  # Replace 'records' with your actual data
nearby_airports = airport_search(latitude, longitude, distance_km, hash_index)

# Print the nearby airports
for airport in nearby_airports:
    print(airport)



{'airline': {'airline_id': 24, 'name': 'American Airlines', 'alias': '\\N', 'iata': 'AA', 'icao': 'AAL', 'callsign': 'AMERICAN', 'country': 'United States', 'active': True}, 'src_airport': {'airport_id': 3577, 'name': 'Seattle Tacoma International Airport', 'city': 'Seattle', 'country': 'United States', 'iata': 'SEA', 'icao': 'KSEA', 'latitude': 47.449001, 'longitude': -122.308998, 'altitude': 433, 'timezone': -8.0, 'dst': 'A', 'tz_id': 'America/Los_Angeles', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 3448, 'name': 'General Edward Lawrence Logan International Airport', 'city': 'Boston', 'country': 'United States', 'iata': 'BOS', 'icao': 'KBOS', 'latitude': 42.36429977, 'longitude': -71.00520325, 'altitude': 20, 'timezone': -5.0, 'dst': 'A', 'tz_id': 'America/New_York', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': True, 'equipment': ['737'], 'geohash': 'c23jbv'}
{'airline': {'airline_id': 24, 'name': 'American Airlines', 'alias': '\\N', 'iata