# Assignment 3

Import libraries and define common helper functions

In [3]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema import validate
from jsonschema.exceptions import ValidationError

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)


def read_jsonl_data():
    src_data_path = "/home/jovyan/dsc650/data/processed/openflights/routes.jsonl.gz"
    with open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]

    return records

In [4]:
records = read_jsonl_data()

## 3.1

### 3.1.a JSON Schema

In [55]:
def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    with open(schema_path) as f:
        schema = json.load(f)
        
    with open("results/validation.csv", 'w') as f:    
        for i, record in enumerate(records):
            try:
                f.write(f"{jsonschema.validate(record, schema=schema)}")
                pass
            except ValidationError as e:
                print (e)
                pass
            
    message = "the given file is valid"
    
    return True, message
            

validate_jsonl_data(records)

(True, 'the given file is valid')

### 3.1.b Avro

In [5]:
from fastavro import writer, reader, parse_schema

def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')

    with open (schema_path, "r") as s:
        schema_parsed = parse_schema(json.load(s))
        
    with open(data_path, "wb") as out:
        writer(out, schema_parsed, records)
        
    with open(data_path, "rb") as fo:
        i=0
        for record in reader(fo):
            i+=1
            print (record)
            if i > 5:
                break
    
create_avro_dataset(records)

{'airline': {'airline_id': 410, 'name': 'Aerocondor', 'alias': 'ANA All Nippon Airways', 'iata': '2B', 'icao': 'ARD', 'callsign': 'AEROCONDOR', 'country': 'Portugal', 'active': True}, 'src_airport': {'airport_id': 2965, 'name': 'Sochi International Airport', 'city': 'Sochi', 'iata': 'AER', 'icao': 'URSS', 'latitude': 43.449902, 'longitude': 39.9566, 'timezone': 3.0, 'dst': 'N', 'tz_id': 'Europe/Moscow', 'type': 'airport', 'source': 'OurAirports'}, 'dst_airport': {'airport_id': 2990, 'name': 'Kazan International Airport', 'city': 'Kazan', 'iata': 'KZN', 'icao': 'UWKD', 'latitude': 55.606201171875, 'longitude': 49.278701782227, 'timezone': 3.0, 'dst': 'N', 'tz_id': 'Europe/Moscow', 'type': 'airport', 'source': 'OurAirports'}, 'codeshare': False, 'stops': 0, 'equipment': ['CR2']}
{'airline': {'airline_id': 410, 'name': 'Aerocondor', 'alias': 'ANA All Nippon Airways', 'iata': '2B', 'icao': 'ARD', 'callsign': 'AEROCONDOR', 'country': 'Portugal', 'active': True}, 'src_airport': {'airport_id'

### 3.1.c Parquet

In [20]:
def create_parquet_dataset():
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            df = pd.read_json(f, lines=True)
            table = pa.Table.from_pandas(df)
            with open(parquet_output_path, "wb") as w:
                pq.write_table(table, w)
    
    table2 = pq.read_table(parquet_output_path)
    print(table2.to_pandas()[0:1]) 
            
            ## TODO: Use Apache Arrow to create Parquet table and save the dataset

create_parquet_dataset()

                                             airline  \
0  {'active': True, 'airline_id': 410, 'alias': '...   

                                         src_airport  \
0  {'airport_id': 2965.0, 'altitude': 89.0, 'city...   

                                         dst_airport  codeshare equipment  
0  {'airport_id': 2990.0, 'altitude': 411.0, 'cit...      False     [CR2]  


### 3.1.d Protocol Buffers

In [99]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airline_to_proto_obj(airline):
    obj = routes_pb2.Airline()
    if airline is None:
        return None
    if airline.get('airline_id') is None:
        return None

    obj.airline_id = airline.get('airline_id')
    if airline.get('name'):
        obj.name = airline.get('name')
    if airline.get('city'):
        obj.city = airline.get('city')
    if airline.get('iata'):
        obj.iata = airline.get('iata')
    if airline.get('icao'):
        obj.icao = airline.get('icao')
    if airline.get('altitude'):
        obj.altitude = airline.get('altitude')
    if airline.get('timezone'):
        obj.timezone = airline.get('timezone')
    if airline.get('dst'):
        obj.dst = airline.get('dst')
    if airline.get('tz_id'):
        obj.tz_id = airline.get('tz_id')
    if airline.get('type'):
        obj.type = airline.get('type')
    if airline.get('source'):
        obj.source = airline.get('source')
    
    if airline.get("latitude"):
        obj.latitude = airline.get('latitude')
    if airline.get("longitude"):
        obj.longitude = airline.get('longitude')

    if airline.get("active") is not None:
        obj.active = airline.get("active")
        
    return obj


def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        airline = _airline_to_proto_obj(record.get('airline', {}))
        if airline:
            route.airline.CopyFrom(airline)
        src_airline = _airline_to_proto_obj(record.get('src_airline', {}))
        route.codeshare = record["codeshare"]
        routes.route.append(route)

    data_path = results_dir.joinpath('routes.pb')

    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
        
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))
    
    with open (data_path, "rb") as f:
        print(f.read(100))
        
    # I'm guessing this is fine???
        
        
create_protobuf_dataset(records)

b'\n\x1e\n\x1a\x08\x9a\x03\x12\nAerocondor"\x022B*\x03ARD@\x01 \x00\n\x1e\n\x1a\x08\x9a\x03\x12\nAerocondor"\x022B*\x03ARD@\x01 \x00\n\x1e\n\x1a\x08\x9a\x03\x12\nAerocondor"\x022B*\x03ARD@\x01 \x00\n\x1e\n\x1a'


In [186]:
# with open ("results/comparison.csv", "w") as w:
#     w.write("1")
df = pd.DataFrame([None])
df["avro"] = str(os.path.getsize("results/routes.avro")/1024/1024)+" MB"
df["parquet"] = str(os.path.getsize("results/routes.parquet")/1024/1024)+" MB"
df["pb"] = str(os.path.getsize("results/routes.pb")/1024/1024)+" MB"
df["snappy_pb"] = str(os.path.getsize("results/routes.pb.snappy")/1024/1024)+" MB"
df = df[df.columns[1:]]
df.to_csv("results/comparison.csv",columns=["avro","parquet","pb","snappy_pb"],index=False)

## 3.2

### 3.2.a Simple Geohash Index

In [112]:
# Assignment 3.2.a
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    for record in records:
        src_airport = record.get('src_airport', {})
        if src_airport:
            latitude = src_airport.get('latitude')
            longitude = src_airport.get('longitude')
            if latitude and longitude:
                hashes.append(pygeohash.encode(latitude, longitude))
    hashes.sort()
    three_letter = sorted(list(set([entry[:3] for entry in hashes])))
    hash_index = {value: [] for value in three_letter}
          
    for record in records:
        src_airport = record.get('src_airport', {})
        if src_airport:
            latitude = src_airport.get('latitude')
            longitude = src_airport.get('longitude')
            name = src_airport.get('name')
            if latitude and longitude and name:
                geohash = pygeohash.encode(latitude, longitude)
                hash_index[geohash[:3]] = [geohash, name]
    
    
    for key, values in hash_index.items():
        output_dir = geoindex_dir.joinpath(str(key[:1])).joinpath(str(key[:2]))
        output_dir.mkdir(exist_ok=True, parents=True)
        output_path = output_dir.joinpath('{}.jsonl.gz'.format(key))
        with gzip.open(output_path, 'w') as f:
            json_output = '\n'.join([json.dumps(value) for value in values])
            f.write(json_output.encode('utf-8'))

In [113]:
create_hash_dirs(records)

### 3.2.b Simple Search Feature

In [194]:
def read_jsonl(airport_file):
    with open(airport_file, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            airport_info = [json.loads(line) for line in f.readlines()]
    
    return airport_info

def airline_search(latitude, longitude):
    geohash = pygeohash.encode(latitude, longitude)
    first = geohash[0]
    second = geohash[1]
    third = geohash[2]
    first_path= f"results/geoindex/{geohash[0]}/{geohash[0:2]}"
    second_path = f"{first_path}/{geohash[0:3]}.jsonl.gz"
    
    print("geohash: "+ geohash)
    while True:
        
        if os.path.exists(second_path):
#             print("second_path:", second_path)
            
            if len(read_jsonl(second_path))>0:
                airport_dict={}
                if type(read_jsonl(second_path)[0]) != "list":
                    airp_hash = read_jsonl(second_path)[0]
                    name = read_jsonl(second_path)[1]
                    dist = pygeohash.geohash_approximate_distance(geohash, airp_hash)
                    airport_dict[f"{dist}"] = name
                print("Closest airport is: " + airport_dict.get(str(min([int(i) for i in airport_dict.keys()]))))
                break
            
            else:
                continue
                
        else:
            if os.path.exists(first_path) is not True:
                print("No 2 letter matches, the coder is too busy(pronounciation: lay-zee), please try a different location...")
                break
            else:
                print("no 3 letter geohash match, getting 2 letter hash matches in", first_path)
    #             if len(read_jsonl(second_path))>0:

                airport_dict= {}

                for i in os.listdir(first_path):
                    cur_airp = read_jsonl(first_path+ f"/{i}")

                    if cur_airp:
                        print(cur_airp)
                        if type(cur_airp[0]) != "list": # to avoid files that contain more than 1 airport, not sure if necessary...
                            airp_hash = cur_airp[0]
                            print(airp_hash)
                            name = cur_airp[1]
                            dist = pygeohash.geohash_approximate_distance(geohash, airp_hash) # FOr some reason, all the distance is the same at this point
                            print(dist)
                            airport_dict[f"{dist}"] = name

                print(airport_dict)

                print("Closest airport is: " + airport_dict.get(str(min([int(i) for i in airport_dict.keys()]))))
            break

    
airline_search(134.142988, -138.90779)

geohash: bzvpzpfrvzux
No 2 letter matches, the coder is too busy(pronounciation: lay-zee), please try a different location...
