# Taylor Imhof
# Bellevue University | DSC 650

# Assignment 3

Import libraries and define common helper functions

In [11]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
#import s3fs
import pyarrow as pa
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError


endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)

# reads zipped json data from path to records variable
def get_jsonl_flight_records():
    zipped_json_path = 'C:\\Users\\taylo\\OneDrive\\Documents\\dsc650\\data\\processed\\openflights\\routes.jsonl.gz'

    with gzip.open(zipped_json_path, 'r') as fin:
        records = [json.loads(line) for line in fin.readlines()]

    return records


# def read_jsonl_data():
#     s3 = s3fs.S3FileSystem(
#         anon=True,
#         client_kwargs={
#             'endpoint_url': endpoint_url
#         }
#     )
#     src_data_path = 'C:\\Users\\taylo\\OneDrive\\Documents\\dsc650\\data\\processed\\openflights\\routes.jsonl.gz'
#     with s3.open(src_data_path, 'rb') as f_gz:
#         with gzip.open(f_gz, 'rb') as f:
#             records = [json.loads(line) for line in f.readlines()]
#
#
#     return records

In [12]:
# store flight records data using utility function
records = get_jsonl_flight_records()
print(records[0])
one_json = records[0]

{'airline': {'airline_id': 410, 'name': 'Aerocondor', 'alias': 'ANA All Nippon Airways', 'iata': '2B', 'icao': 'ARD', 'callsign': 'AEROCONDOR', 'country': 'Portugal', 'active': True}, 'src_airport': {'airport_id': 2965, 'name': 'Sochi International Airport', 'city': 'Sochi', 'country': 'Russia', 'iata': 'AER', 'icao': 'URSS', 'latitude': 43.449902, 'longitude': 39.9566, 'altitude': 89, 'timezone': 3.0, 'dst': 'N', 'tz_id': 'Europe/Moscow', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 2990, 'name': 'Kazan International Airport', 'city': 'Kazan', 'country': 'Russia', 'iata': 'KZN', 'icao': 'UWKD', 'latitude': 55.606201171875, 'longitude': 49.278701782227, 'altitude': 411, 'timezone': 3.0, 'dst': 'N', 'tz_id': 'Europe/Moscow', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': False, 'equipment': ['CR2']}


## 3.1

### 3.1.a JSON Schema

In [17]:
def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    # add validation path
    validation_csv_path = results_dir.joinpath('validation-results.csv')
    with open(schema_path) as f:
        schema = json.load(f)

    with open(validation_csv_path, 'w', encoding='utf-8', newline='') as f:
        # assign column names for results.csv
        cols = ['row_num', 'is_valid', 'msg']

        # create csv writer object
        writer = csv.DictWriter(f, fieldnames=cols)
        writer.writeheader()

        # validate each json record using the json schema
        for i, record in enumerate(records):
            try:
                ## TODO: Validate record
                jsonschema.validate(instance=record, schema=schema)
                # write validation info to results.csv
                result = dict(
                    row_num = i,
                    is_valid = True,
                    msg = record
                )
            except ValidationError as e:
                ## Print message if invalid record
                result = dict(
                    row_num = i,
                    is_valid = False,
                    msg = record
                )
            finally:
                # write the entry to results csv
                writer.writerow(result)

validate_jsonl_data(records)

### 3.1.b Avro

In [24]:
def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')
    ## TODO: Use fastavro to create Avro dataset
    # get avro schema into variable
    with open(schema_path, 'r') as f:
        parsed_schema = fastavro.schema.load_schema(schema_path)

    with open(data_path, 'wb') as fout:
        fastavro.writer(fout, parsed_schema, records)

    # used to check if data was written properly
    # with open(data_path, 'rb') as fo:
    #     avro_reader = fastavro.reader(fo)
    #     for record in avro_reader:
    #         print(record)

create_avro_dataset(records)

### 3.1.c Parquet

In [38]:
def create_parquet_dataset():
    src_data_path = 'C:\\Users\\taylo\\OneDrive\\Documents\\dsc650\\data\\processed\\openflights\\routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
    zipped_json_path = 'C:\\Users\\taylo\\OneDrive\\Documents\\dsc650\\data\\processed\\openflights\\routes.jsonl.gz'


    with gzip.open(zipped_json_path, 'r') as fin:
        records = [json.loads(line) for line in fin.readlines()]

    df = pd.DataFrame(records)
    df.head()
    table = pa.Table.from_pandas(df)
    pq.write_table(table, parquet_output_path)

    # with gzip.open(zipped_json_path, 'r') as fin:
    #     for line in fin:
    #         records = [json.loads(line) for line in fin.readlines()]
    #
    # # create pandas df from records data
    # df = pd.DataFrame(records)
    # # create parquet table from pandas df
    # table = pa.Table.from_pandas(df)
    # # write parquet table to output path
    # with open(parquet_output_path, 'rb') as f:
    #     pq.write_table(table, f)

    # write table to parquet output path
    # with open(parquet_output_path, 'rb', encoding='utf-8') as fout:
    #     pq.write_table(table, fout)

create_parquet_dataset()

NotImplementedError: struct<active: bool, airline_id: int64, alias: string, callsign: string, country: string, iata: string, icao: string, name: string>

### 3.1.d Protocol Buffers

In [None]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj


def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    ## TODO: Create an Airline obj using Protocol Buffers API
    return obj


def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        ## TODO: Implement the code to create the Protocol Buffers Dataset

        routes.route.append(route)

    data_path = results_dir.joinpath('routes.pb')

    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
        
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))
        
create_protobuf_dataset(records)

## 3.2

### 3.2.a Simple Geohash Index

In [None]:
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    ## TODO: Create hash index
            
create_hash_dirs(records)

### 3.2.b Simple Search Feature

In [None]:
def airport_search(latitude, longitude):
    ## TODO: Create simple search to return nearest airport
    pass
    
airport_search(41.1499988, -95.91779)