# Week3. Schemas

## Gloria P Moore

In [1]:
import os
import sys
import gzip
import json
from pathlib import Path
import csv

import pandas as pd
import s3fs
import pyarrow as pa
from pyarrow.json import read_json
import pyarrow.parquet as pq
import fastavro
import pygeohash
import snappy
import jsonschema
from jsonschema.exceptions import ValidationError


endpoint_url='https://storage.budsc.midwest-datascience.com'

current_dir = Path(os.getcwd()).absolute()
schema_dir = current_dir.joinpath('schemas')
results_dir = current_dir.joinpath('results')
results_dir.mkdir(parents=True, exist_ok=True)


def read_jsonl_data():
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            records = [json.loads(line) for line in f.readlines()]
        

    return records

In [54]:
records = read_jsonl_data()

In [55]:
records[0:1]

[{'airline': {'airline_id': 410,
   'name': 'Aerocondor',
   'alias': 'ANA All Nippon Airways',
   'iata': '2B',
   'icao': 'ARD',
   'callsign': 'AEROCONDOR',
   'country': 'Portugal',
   'active': True},
  'src_airport': {'airport_id': 2965,
   'name': 'Sochi International Airport',
   'city': 'Sochi',
   'country': 'Russia',
   'iata': 'AER',
   'icao': 'URSS',
   'latitude': 43.449902,
   'longitude': 39.9566,
   'altitude': 89,
   'timezone': 3.0,
   'dst': 'N',
   'tz_id': 'Europe/Moscow',
   'type': 'airport',
   'source': 'OurAirports'},
  'dst_airport': {'airport_id': 2990,
   'name': 'Kazan International Airport',
   'city': 'Kazan',
   'country': 'Russia',
   'iata': 'KZN',
   'icao': 'UWKD',
   'latitude': 55.606201171875,
   'longitude': 49.278701782227,
   'altitude': 411,
   'timezone': 3.0,
   'dst': 'N',
   'tz_id': 'Europe/Moscow',
   'type': 'airport',
   'source': 'OurAirports'},
  'codeshare': False,
  'equipment': ['CR2']}]

In [56]:
validation_csv_path

PosixPath('/home/jovyan/results/validation-results.csv')

In [57]:
def validate_jsonl_data(records):
    schema_path = schema_dir.joinpath('routes-schema.json')
    validation_csv_path = results_dir.joinpath('validation-results.csv')
    with open(schema_path) as f:
        schema = json.load(f)
 
    
    with open(validation_csv_path, 'w') as f:    
        for i, record in enumerate(records):
            try:
                jsonschema.validate(instance=record, schema=schema) 
                pass
            except ValidationError as e:
                print("ERROR!!")
                pass
            

validate_jsonl_data(records)

In [58]:
schema1 = {
  "type": "record",
  "name": "Route",
  "namespace": "edu.bellevue.dsc650",
  "fields": [
    {
      "name": "airline",
      "type": {
        "type": "record",
        "name": "Airline",
        "fields": [
          {
            "name": "airline_id",
            "type": "int",
            "default": -1
          },
          {
            "name": "name",
            "type": "string",
            "default": "NONE"
          },
          {
            "name": "alias",
            "type": "string",
            "default": "NONE"
          },
          {
            "name": "iata",
            "type": "string",
            "default": "NONE"
          },
          {
            "name": "icao",
            "type": "string",
            "default": "NONE"
          },
          {
            "name": "callsign",
            "type": "string",
            "default": "NONE"
          },
          {
            "name": "country",
            "type": "string",
            "default": "NONE"
          },
          {
            "name": "active",
            "type": "boolean",
            "default": False
          }
        ]
      },
      "default": "NONE"
    },
    {
      "name": "src_airport",
      "type": [
        {
          "type": "record",
          "name": "Airport",
          "fields": [
            {
              "name": "airport_id",
              "type": "int",
              "default": -1
            },
            {
              "name": "name",
              "type": "string",
              "default": "NONE"
            },
            {
              "name": "city",
              "type": "string",
              "default": "NONE"
            },
            {
              "name": "iata",
              "type": "string",
              "default": "NONE"
            },
            {
              "name": "icao",
              "type": "string",
              "default": "NONE"
            },
            {
              "name": "latitude",
              "type": "double"
            },
            {
              "name": "longitude",
              "type": "double"
            },
            {
              "name": "timezone",
              "type": "double"
            },
            {
              "name": "dst",
              "type": "string",
              "default": "NONE"
            },
            {
              "name": "tz_id",
              "type": "string",
              "default": "NONE"
            },
            {
              "name": "type",
              "type": "string",
              "default": "NONE"
            },
            {
              "name": "source",
              "type": "string",
              "default": "NONE"
            }
          ]
        },
        "null"
      ],
      "default": "NONE"
    },
    {
      "name": "dst_airport",
      "type": [
        "Airport",
        "null"
      ],
      "default": "NONE"
    },
    {
      "name": "codeshare",
      "type": "boolean",
      "default": False
    },
    {
      "name": "stops",
      "type": "int",
      "default": 0
    },
    {
      "name": "equipment",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

# AVRO

In [59]:
from fastavro import parse_schema
from fastavro import writer

parsed_schema = parse_schema(schema1)
with open('routes.avsc', 'wb') as out:
    writer(out, parsed_schema, records)

In [60]:
from fastavro import writer, reader, parse_schema

In [61]:
def create_avro_dataset(records):
    schema_path = schema_dir.joinpath('routes.avsc')
    data_path = results_dir.joinpath('routes.avro')
    
    parsed_schema = parse_schema(schema1)
    with open('routes.avro', 'wb') as out:
        writer(out, parsed_schema, records[0:2])    
        
create_avro_dataset(records)

In [62]:
records[1]

{'airline': {'airline_id': 410,
  'name': 'Aerocondor',
  'alias': 'ANA All Nippon Airways',
  'iata': '2B',
  'icao': 'ARD',
  'callsign': 'AEROCONDOR',
  'country': 'Portugal',
  'active': True},
 'src_airport': {'airport_id': 2966,
  'name': 'Astrakhan Airport',
  'city': 'Astrakhan',
  'country': 'Russia',
  'iata': 'ASF',
  'icao': 'URWA',
  'latitude': 46.2832984924,
  'longitude': 48.0063018799,
  'altitude': -65,
  'timezone': 4.0,
  'dst': 'N',
  'tz_id': 'Europe/Samara',
  'type': 'airport',
  'source': 'OurAirports'},
 'dst_airport': {'airport_id': 2990,
  'name': 'Kazan International Airport',
  'city': 'Kazan',
  'country': 'Russia',
  'iata': 'KZN',
  'icao': 'UWKD',
  'latitude': 55.606201171875,
  'longitude': 49.278701782227,
  'altitude': 411,
  'timezone': 3.0,
  'dst': 'N',
  'tz_id': 'Europe/Moscow',
  'type': 'airport',
  'source': 'OurAirports'},
 'codeshare': False,
 'equipment': ['CR2']}

# PARQUET

In [63]:
def create_parquet_dataset():
    src_data_path = 'data/processed/openflights/routes.jsonl.gz'
    parquet_output_path = results_dir.joinpath('routes.parquet')
    s3 = s3fs.S3FileSystem(
        anon=True,
        client_kwargs={
            'endpoint_url': endpoint_url
        }
    )
    
    with s3.open(src_data_path, 'rb') as f_gz:
        with gzip.open(f_gz, 'rb') as f:
            pass
            df = pd.DataFrame(records)
            table = pa.Table.from_pandas(df)
            pq.write_table(table,'routes.parquet')

create_parquet_dataset()

# Protocol Buffers

In [64]:
sys.path.insert(0, os.path.abspath('routes_pb2'))

import routes_pb2

def _airport_to_proto_obj(airport):
    obj = routes_pb2.Airport()
    if airport is None:
        return None
    if airport.get('airport_id') is None:
        return None

    obj.airport_id = airport.get('airport_id')
    if airport.get('name'):
        obj.name = airport.get('name')
    if airport.get('city'):
        obj.city = airport.get('city')
    if airport.get('iata'):
        obj.iata = airport.get('iata')
    if airport.get('icao'):
        obj.icao = airport.get('icao')
    if airport.get('altitude'):
        obj.altitude = airport.get('altitude')
    if airport.get('timezone'):
        obj.timezone = airport.get('timezone')
    if airport.get('dst'):
        obj.dst = airport.get('dst')
    if airport.get('tz_id'):
        obj.tz_id = airport.get('tz_id')
    if airport.get('type'):
        obj.type = airport.get('type')
    if airport.get('source'):
        obj.source = airport.get('source')

    obj.latitude = airport.get('latitude')
    obj.longitude = airport.get('longitude')

    return obj


def create_protobuf_dataset(records):
    routes = routes_pb2.Routes()
    for record in records:
        route = routes_pb2.Route()
        airline = _airline_to_proto_obj(record.get('airline', {}))
        if airline:
            route.airline.CopyFrom(airline)
        src_airport = _airport_to_proto_obj(record.get('src_airport', {}))
        if src_airport:
            route.src_airport.CopyFrom(src_airport)
        dst_airport = _airport_to_proto_obj(record.get('dst_airport', {}))
        if dst_airport:
            route.dst_airport.CopyFrom(dst_airport)
            
        if record.get('codeshare'):
            route.codeshare = record.get('codeshare')
        else:
            route.codeshare = False
            
        if record.get('stops'):
            route.stops = record.get('stops')
        
        equipment = record.get('equipment')
        
        if len(equipment) > 1:
            for i, v in enumerate(equipment):
                route.equipment.append(v)
        else:
            equipment = record.get('equipment')

        routes.route.append(route)
        
    data_path = results_dir.joinpath('routes.pb')
    
    with open(data_path, 'wb') as f:
        f.write(routes.SerializeToString())
        
    compressed_path = results_dir.joinpath('routes.pb.snappy')
    
    with open(compressed_path, 'wb') as f:
        f.write(snappy.compress(routes.SerializeToString()))
        
create_protobuf_dataset(records)

ModuleNotFoundError: No module named 'routes_pb2'

# OUTPUT SIZES

I don't know how to make this part

# 3.2 gEOhASH

In [65]:
def create_hash_dirs(records):
    geoindex_dir = results_dir.joinpath('geoindex')
    geoindex_dir.mkdir(exist_ok=True, parents=True)
    hashes = []
    for record in records:
        src_airport = record.get('src_airport', {})
        if src_airport:
            latitude = src_airport.get('latitude')
            longitude = src_airport.get('longitude')
            if latitude and longitude:
                hash1 = pygeohash.encode(latitude,longitude)
                record['geohash']=hash1
                hashes.append(hash1)
    hashes.sort()
    three_letter = sorted(list(set([entry[:3] for entry in hashes])))
    hash_index = {value: [] for value in three_letter}
    for record in records:
        geohash = record.get('geohash')
        if geohash:
            hash_index[geohash[:3]].append(record)
    for key, values in hash_index.items():
        output_dir = geoindex_dir.joinpath(str(key[:1])).joinpath(str(key[:2]))
        output_dir.mkdir(exist_ok=True, parents=True)
        output_path = output_dir.joinpath('{}.jsonl.gz'.format(key))
        with gzip.open(output_path, 'w') as f:
            json_output = '\n'.join([json.dumps(value) for value in values])
            f.write(json_output.encode('utf-8'))
            
create_hash_dirs(records)

In [66]:
def airport_search(latitude, longitude):
    h = pygeohash.encode(latitude,longitude)
    dist = 0
    name = ''
    for i,record in enumerate(records):
        src_airport = record.get('src_airport', {})
        if src_airport:
            lat = src_airport.get('latitude')
            long = src_airport.get('longitude')
            a_name = src_airport.get('name')
            if lat and long:
                h2 = pygeohash.encode(lat,long)
                
                dist_n = pygeohash.geohash_approximate_distance(h,h2)
                if i==0:
                    dist = dist_n
                else:
                    if dist > dist_n:
                        dist = dist_n
                        name = a_name
    print(name)
    pass
    
airport_search(41.1499988, -95.91779)

Eppley Airfield


NameError: name 'h' is not defined

In [68]:
pip install nbzip

Collecting nbzip
  Downloading nbzip-0.1.0-py3-none-any.whl (4.5 kB)
Collecting pytest
  Downloading pytest-6.2.3-py3-none-any.whl (280 kB)
[K     |████████████████████████████████| 280 kB 4.7 MB/s eta 0:00:01
Collecting toml
  Downloading toml-0.10.2-py2.py3-none-any.whl (16 kB)
Collecting iniconfig
  Downloading iniconfig-1.1.1-py2.py3-none-any.whl (5.0 kB)
Collecting py>=1.8.2
  Downloading py-1.10.0-py2.py3-none-any.whl (97 kB)
[K     |████████████████████████████████| 97 kB 447 kB/s eta 0:00:01
[?25hCollecting pluggy<1.0.0a1,>=0.12
  Downloading pluggy-0.13.1-py2.py3-none-any.whl (18 kB)
Installing collected packages: toml, py, pluggy, iniconfig, pytest, nbzip
Successfully installed iniconfig-1.1.1 nbzip-0.1.0 pluggy-0.13.1 py-1.10.0 pytest-6.2.3 toml-0.10.2
Note: you may need to restart the kernel to use updated packages.


In [73]:
%%bash
tar -czf archive.tar.gz data/enron