In [1]:
# https://docs.kedro.org/en/0.18.1/tools_integration/ipython.html
%load_ext kedro.extras.extensions.ipython
%reload_ext kedro.extras.extensions.ipython

In [2]:
import polars as pl
import requests

from typing import List

pl.Config.set_tbl_rows(40)
pl.Config.set_fmt_str_lengths(100)


polars.config.Config

In [151]:
def format_column_names(cols: List[str]) -> List[str]:
    """Utility for formatting column names"""
    return list(map(lambda x: x.strip().lower().replace(" ", "_"), cols))


In [79]:
# load airport data as a polars df
airports: pl.DataFrame = catalog.load("airports")
print(f"Raw airports dataset size: {airports.estimated_size(unit='kb'):.2f} KB")
airports.head()


Raw airports dataset size: 62.38 KB


AIRPORT_ID,AIRPORT,DISPLAY_AIRPORT_NAME,DISPLAY_AIRPORT_CITY_NAME_FULL,AIRPORT_STATE_NAME,AIRPORT_STATE_CODE,LATITUDE,LONGITUDE,ELEVATION,ICAO,IATA,FAA,MESONET_STATION
i64,str,str,str,str,str,f64,f64,i64,str,str,str,str
14256,"""PSG""","""Petersburg Jam…","""Petersburg, AK…","""Alaska""","""AK""",56.801388,-132.9461,111,"""PAPG""","""PSG""","""PSG""","""PAPG"""
10299,"""ANC""","""Ted Stevens An…","""Anchorage, AK""","""Alaska""","""AK""",61.174168,-149.99806,152,"""PANC""","""ANC""","""ANC""","""PANC"""
12819,"""KTN""","""Ketchikan Inte…","""Ketchikan, AK""","""Alaska""","""AK""",55.354168,-131.7111,89,"""PAKT""","""KTN""","""KTN""","""PAKT"""
13873,"""OME""","""Nome Airport""","""Nome, AK""","""Alaska""","""AK""",64.51056,-165.44472,37,"""PAOM""","""OME""","""OME""","""PAOM"""
10551,"""BET""","""Bethel Airport…","""Bethel, AK""","""Alaska""","""AK""",60.77861,-161.83722,126,"""PABE""","""BET""","""BET""","""PABE"""


In [133]:
def transform_airports(raw_airports: pl.DataFrame) -> pl.DataFrame:
    """Basic transformation of airport data"""
    df = raw_airports.drop(
        "DISPLAY_AIRPORT_CITY_NAME_FULL",
        "AIRPORT_STATE_NAME",
        "FAA",
    )
    df.columns = format_column_names(df.columns)
    return df


def dq_airports(airports: pl.DataFrame) -> None:
    """Data quality checks for airport data"""

    # check that we have at least one value
    if airports.is_empty():
        raise ValueError("Empty airports dataframe!")

    # check for any null values
    null_counts = airports.null_count()
    if sum(null_counts.row(0)) != 0:
        print(null_counts.transpose(include_header=True, column_names=["null_count"]))
        raise ValueError(
            "Detected null values. See their occurence per column before this raised error message"
        )

    # check for valid latitudes
    if not airports["latitude"].is_between(-90, 90).all():
        raise ValueError("Latitutes outside [-90, 90] degrees range found!")

    # check for valid longitudes
    if not airports["longitude"].is_between(-180, 180).all():
        raise ValueError("Longitudes outside [-180, 180] degrees range found!")
    
    # check that we don't have any duplicates
    if airports.is_duplicated().any():
        raise ValueError("Duplicate airport entries found!")

    print("All DQ checks on airports data passed!")


transformed_airports = transform_airports(airports)
dq_airports(transformed_airports)


All DQ checks on airports data passed!


In [109]:
# load flights
flights: pl.DataFrame = catalog.load("flights")
raw_flights_size = flights.estimated_size(unit='gb')
print(f"Raw flights dataset size: {raw_flights_size:.2f} GB")
print(f"Record count: {flights.shape}")
flights.head()

Raw flights dataset size: 2.74 GB
Record count: (6954636, 40)


FL_DATE,DEP_HOUR,MKT_UNIQUE_CARRIER,MKT_CARRIER_FL_NUM,OP_UNIQUE_CARRIER,OP_CARRIER_FL_NUM,TAIL_NUM,ORIGIN,DEST,DEP_TIME,CRS_DEP_TIME,TAXI_OUT,DEP_DELAY,AIR_TIME,DISTANCE,CANCELLED,LATITUDE,LONGITUDE,ELEVATION,MESONET_STATION,YEAR OF MANUFACTURE,MANUFACTURER,ICAO TYPE,RANGE,WIDTH,WIND_DIR,WIND_SPD,WIND_GUST,VISIBILITY,TEMPERATURE,DEW_POINT,REL_HUMIDITY,ALTIMETER,LOWEST_CLOUD_LAYER,N_CLOUD_LAYER,LOW_LEVEL_CLOUD,MID_LEVEL_CLOUD,HIGH_LEVEL_CLOUD,CLOUD_COVER,ACTIVE_WEATHER
str,i64,str,i64,str,i64,str,str,str,str,str,i64,i64,i64,i64,i64,f64,f64,i64,str,i64,str,str,str,str,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""2022-01-01""",6,"""AA""",1,"""AA""",1,"""N106NN""","""JFK""","""LAX""","""2022-01-01 06:…","""2022-01-01 07:…",19,-3,347,2475,0,40.64,-73.77861,13,"""JFK""",2014,"""Airbus""","""A321""","""Medium Range""","""Narrow-body""",180.0,7.0,0.0,8.0,9.39,8.89,96.69,29.92,300.0,2.0,1.0,0.0,0.0,4.0,0.0
"""2022-01-01""",8,"""AA""",10,"""AA""",10,"""N101NN""","""LAX""","""JFK""","""2022-01-01 08:…","""2022-01-01 09:…",20,-3,270,2475,0,33.9425,-118.40806,125,"""LAX""",2013,"""Airbus""","""A321""","""Medium Range""","""Narrow-body""",20.0,6.0,0.0,10.0,12.22,0.61,44.92,29.87,4800.0,1.0,1.0,0.0,0.0,2.0,0.0
"""2022-01-01""",0,"""AA""",1003,"""AA""",1003,"""N830NN""","""STL""","""ORD""","""2022-01-01 00:…","""2022-01-01 07:…",0,0,0,258,2,38.74861,-90.37,618,"""STL""",2010,"""Boeing""","""B738""","""Medium Range""","""Narrow-body""",,,,,,,,,,,,,,,
"""2022-01-01""",9,"""AA""",1004,"""AA""",1004,"""N805NN""","""ORD""","""FLL""","""2022-01-01 09:…","""2022-01-01 18:…",23,907,166,1182,0,41.976944,-87.90806,672,"""ORD""",2009,"""Boeing""","""B738""","""Medium Range""","""Narrow-body""",360.0,9.0,0.0,5.0,2.22,0.61,89.09,29.75,500.0,1.0,1.0,0.0,0.0,4.0,1.0
"""2022-01-01""",23,"""AA""",1007,"""AA""",1007,"""N870NN""","""CLT""","""STL""","""2022-01-01 23:…","""2022-01-01 22:…",16,20,104,575,0,35.21361,-80.949165,748,"""CLT""",2011,"""Boeing""","""B738""","""Medium Range""","""Narrow-body""",220.0,8.0,0.0,10.0,20.0,18.28,89.81,29.79,1500.0,3.0,1.0,0.0,1.0,3.0,0.0


In [158]:
# perform type conversion on flights
def parse_flights(raw_flights: pl.DataFrame) -> pl.DataFrame:
    df = (
        raw_flights.lazy()
        .with_columns(
            pl.col("FL_DATE").str.to_date(),
            pl.col("MKT_UNIQUE_CARRIER").cast(pl.Categorical),
            pl.col("MKT_CARRIER_FL_NUM").cast(str).str.zfill(4),
            pl.col("OP_UNIQUE_CARRIER").cast(pl.Categorical),
            pl.col("OP_CARRIER_FL_NUM").cast(str).str.zfill(4),
            pl.col("ORIGIN").cast(pl.Categorical),
            pl.col("DEST").cast(pl.Categorical).alias("DESTINATION"),
            pl.col("DEP_TIME").str.to_datetime(),
            pl.col("CRS_DEP_TIME").str.to_datetime(),
            pl.col("MANUFACTURER").cast(pl.Categorical),
            pl.col("ICAO TYPE").cast(pl.Categorical).alias("ICAO_TYPE"),
            pl.col("RANGE").cast(pl.Categorical),
            pl.col("WIDTH").cast(pl.Categorical),
            pl.col("LOW_LEVEL_CLOUD").cast(pl.Boolean),
            pl.col("MID_LEVEL_CLOUD").cast(pl.Boolean),
            pl.col("HIGH_LEVEL_CLOUD").cast(pl.Boolean),
        )
        .drop("DEST", "ICAO TYPE")
        .collect()
    )
    df.columns = format_column_names(df.columns)
    return df


# compute and print post conversion stats
parsed_flights = parse_flights(flights)

SIZE_UNIT_FLIGHTS = "gb"
parsed_flights_size = parsed_flights.estimated_size(unit=SIZE_UNIT_FLIGHTS)
size_diff_pct = (parsed_flights_size - raw_flights_size) / raw_flights_size

print(
    f"Parsed flights dataset size: {parsed_flights_size:.2f} {SIZE_UNIT_FLIGHTS.upper()}"
)
print(f"Relative size reduction after proper dtype conversions: {size_diff_pct:.2%}")

parsed_flights.head()


Parsed flights dataset size: 1.81 GB
Relative size reduction after proper dtype conversions: -33.85%


fl_date,dep_hour,mkt_unique_carrier,mkt_carrier_fl_num,op_unique_carrier,op_carrier_fl_num,tail_num,origin,dep_time,crs_dep_time,taxi_out,dep_delay,air_time,distance,cancelled,latitude,longitude,elevation,mesonet_station,year_of_manufacture,manufacturer,range,width,wind_dir,wind_spd,wind_gust,visibility,temperature,dew_point,rel_humidity,altimeter,lowest_cloud_layer,n_cloud_layer,low_level_cloud,mid_level_cloud,high_level_cloud,cloud_cover,active_weather,destination,icao_type
date,i64,cat,str,cat,str,str,cat,datetime[μs],datetime[μs],i64,i64,i64,i64,i64,f64,f64,i64,str,i64,cat,cat,cat,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,bool,bool,bool,f64,f64,cat,cat
2022-01-01,6,"""AA""","""0001""","""AA""","""0001""","""N106NN""","""JFK""",2022-01-01 06:57:00,2022-01-01 07:00:00,19,-3,347,2475,0,40.64,-73.77861,13,"""JFK""",2014,"""Airbus""","""Medium Range""","""Narrow-body""",180.0,7.0,0.0,8.0,9.39,8.89,96.69,29.92,300.0,2.0,True,False,False,4.0,0.0,"""LAX""","""A321"""
2022-01-01,8,"""AA""","""0010""","""AA""","""0010""","""N101NN""","""LAX""",2022-01-01 08:57:00,2022-01-01 09:00:00,20,-3,270,2475,0,33.9425,-118.40806,125,"""LAX""",2013,"""Airbus""","""Medium Range""","""Narrow-body""",20.0,6.0,0.0,10.0,12.22,0.61,44.92,29.87,4800.0,1.0,True,False,False,2.0,0.0,"""JFK""","""A321"""
2022-01-01,0,"""AA""","""1003""","""AA""","""1003""","""N830NN""","""STL""",2022-01-01 00:00:00,2022-01-01 07:05:00,0,0,0,258,2,38.74861,-90.37,618,"""STL""",2010,"""Boeing""","""Medium Range""","""Narrow-body""",,,,,,,,,,,,,,,,"""ORD""","""B738"""
2022-01-01,9,"""AA""","""1004""","""AA""","""1004""","""N805NN""","""ORD""",2022-01-01 09:37:00,2022-01-01 18:30:00,23,907,166,1182,0,41.976944,-87.90806,672,"""ORD""",2009,"""Boeing""","""Medium Range""","""Narrow-body""",360.0,9.0,0.0,5.0,2.22,0.61,89.09,29.75,500.0,1.0,True,False,False,4.0,1.0,"""FLL""","""B738"""
2022-01-01,23,"""AA""","""1007""","""AA""","""1007""","""N870NN""","""CLT""",2022-01-01 23:00:00,2022-01-01 22:40:00,16,20,104,575,0,35.21361,-80.949165,748,"""CLT""",2011,"""Boeing""","""Medium Range""","""Narrow-body""",220.0,8.0,0.0,10.0,20.0,18.28,89.81,29.79,1500.0,3.0,True,False,True,3.0,0.0,"""STL""","""B738"""


In [127]:
parsed_flights.select("ICAO_TYPE").unique()

ICAO_TYPE
cat
"""A321"""
"""B738"""
"""A319"""
"""B789"""
"""B772"""
"""A320"""
"""B788"""
"""A21N"""
"""B38M"""
"""B77W"""


## Testing requests to Census API

In [4]:
# load credentials for accessing the US Census API
from kedro.config import ConfigLoader, MissingConfigException
from kedro.framework.project import settings
from pathlib import Path

conf_path = str(Path.cwd().parent / settings.CONF_SOURCE)
conf_loader = ConfigLoader(conf_source=conf_path)

try:
    credentials = conf_loader["credentials"]
except MissingConfigException:
    credentials = {}


In [3]:
# load census data
census_pop_response: requests.Response = catalog.load("raw_census_population")
if census_pop_response.status_code >= 400:
    raise ValueError(f"Census API error: {census_pop_response.text}")

census_pop_data = census_pop_response.json()


In [148]:
def parse_population(raw_data: List[List[str]]) -> pl.DataFrame:
    """
    Parses population data received from the API and returns a polars DataFrame.

    The first record of the response is the header.
    All other reocrds are the data rows.
    """
    return pl.from_records(raw_data[1:], schema=raw_data[0])


def transform_population(population: pl.DataFrame) -> pl.DataFrame:
    """Perform type casts, column cleaning, renaming, and sorting"""
    df = (
        population.rename({"POP_2021": "POPULATION"})
        .with_columns(
            pl.col("LASTUPDATE").str.to_date(r"%B. %d, %Y"),
            pl.col("POPULATION").cast(pl.Int64),
        )
        .drop("state")
        .sort(by="NAME")
    )
    df.columns = format_column_names(df.columns)
    return df


def dq_population(population: pl.DataFrame) -> None:
    """Runs data quality checks on population data"""
    # check for negative population values
    if not population.filter(pl.col("population") < 0).is_empty():
        raise ValueError("Negative population values found")

    print("All DQ checks on population data passed!")


pop = parse_population(census_pop_data)
tp = transform_population(pop)
dq_population(tp)

tp


All DQ checks on population data passed!


name,lastupdate,state,population
str,date,str,i64
"""Alabama""",2021-12-21,"""01""",5039877
"""Alaska""",2021-12-21,"""02""",732673
"""Arizona""",2021-12-21,"""04""",7276316
"""Arkansas""",2021-12-21,"""05""",3025891
"""California""",2021-12-21,"""06""",39237836
"""Colorado""",2021-12-21,"""08""",5812069
"""Connecticut""",2021-12-21,"""09""",3605597
"""Delaware""",2021-12-21,"""10""",1003384
"""District of Columbia""",2021-12-21,"""11""",670050
"""Florida""",2021-12-21,"""12""",21781128
