In [1]:
%load_ext autoreload
%autoreload 2
import sys
import json
import re
# import pandas as pd
from pathlib import Path
from collections import defaultdict, deque
from datetime import datetime

In [3]:
from datetime import datetime, timedelta, date
from time import sleep
from random import randrange, choice
from pathlib import Path
from fast_flights import FlightData, Passengers, create_filter, get_flights_from_filter

MAX_WEEKS = 20
airports = ['LAS', 'LAX', 'SAN', 'SNA', 'SEA', 'LGB', 'SFO', 'IAH', 'HOU', 'ORD', 'HND', 'NRT', 'EZE', 'BOS', 'NYC', 'YYZ', 'AUS', 'DFW', 'IAD', 'ATL', 'CDG', 'LHR', 'SIN', 'HKG', 'KIX', 'PEK']

search_pairs = [(choice(airports), choice(airports)) for _ in range(10)]
search_pairs = [(from_, to) for from_, to in search_pairs if from_ != to]
print(f'generated {len(search_pairs)} search pairs')

def random_datestr() -> str:
    rand = randrange(1, MAX_WEEKS)
    return (date.today() + timedelta(days=round(rand * 7))).strftime('%Y-%m-%d')

def random_daterange(days_between: int = 0) -> tuple[date, date]:
    rand = randrange(1, MAX_WEEKS)
    start = date.today() + timedelta(days=round(rand * 7))
    return start, start + timedelta(days=days_between)

common_filter_kwargs = {
    'trip': 'one-way',
    'seat': 'economy',
    'passengers': Passengers(
        adults=1,
        children=0,
        infants_in_seat=0,
        infants_on_lap=0
    ),
}
# results = []
# Create a new filter
filters = [create_filter(flight_data=[FlightData(date=random_datestr(), from_airport=from_, to_airport=to)], **common_filter_kwargs)
           for from_, to in search_pairs]


generated 8 search pairs


In [17]:
results = []

In [18]:
# Get flights with a filter
from fast_flights import FlightData, Passengers, create_filter, get_flights_from_filter

for filter_ in filters:
    print(f'requesting flight: {filter_}')
    result = get_flights_from_filter(filter_, data_source='js')
    if result is not None:
        results.append(result)
    sleep(randrange(4, 10))

raw_results = [result.raw for result in results]

# write to corpus json file if success
corpus_path = Path('corpus.json')
if corpus_path.exists():
    raw_results = raw_results + json.loads(corpus_path.read_text())
corpus_path.write_text(json.dumps(raw_results))

requesting flight: TFSData(flight_data=[FlightData(date='2025-08-05', from_airport=IAD, to_airport=LHR, max_stops=None)], max_stops=None)
requesting flight: TFSData(flight_data=[FlightData(date='2025-08-12', from_airport=AUS, to_airport=SEA, max_stops=None)], max_stops=None)
requesting flight: TFSData(flight_data=[FlightData(date='2025-07-15', from_airport=EZE, to_airport=SEA, max_stops=None)], max_stops=None)
requesting flight: TFSData(flight_data=[FlightData(date='2025-05-27', from_airport=SFO, to_airport=LHR, max_stops=None)], max_stops=None)
requesting flight: TFSData(flight_data=[FlightData(date='2025-07-29', from_airport=NRT, to_airport=DFW, max_stops=None)], max_stops=None)
requesting flight: TFSData(flight_data=[FlightData(date='2025-06-24', from_airport=CDG, to_airport=YYZ, max_stops=None)], max_stops=None)
requesting flight: TFSData(flight_data=[FlightData(date='2025-08-12', from_airport=HND, to_airport=SIN, max_stops=None)], max_stops=None)
requesting flight: TFSData(flight_

20412774

We want to find real list types in this data schema (as opposed to a packed struct encoded as a list). 

In order to do so, we can scan multiple instances of the data, and track the size of each list we encounter (using the path/indices). When the size of a list is not constant, we know that the data type of that index is a list type rather than a struct packed as a list.

The length of a struct packed as a list _should_ be constant, except when it has extensions...

In [37]:
from collections import defaultdict

# wrapper over a list to allow indexing with a path (list of indices)
class Corpus:
    data: list[list]

    def __init__(self, data: list[list]):
        assert isinstance(data, list), 'data is not a list type'
        self.data = data

    def __getitem__(self, i: int | list[int]):
        if isinstance(i, int):
            return self.data[i]

        it = self.data
        for next_index in i:
            assert isinstance(it[next_index], list), f'next index in path is not a list type (path: {i})'
            it = it[next_index]
        return it

    def __len__(self) -> int:
        return len(self.data)

# hashable list to allow adding lists to sets
class HashList(list):

    def __hash__(self):
        return hash(json.dumps(self))

# paths to lists
lists: set[list[int]] = set()
found_values: dict[tuple[int, ...], set] = defaultdict(lambda: set())
alias_types = [
    # all paths starting with the first element will be mapped to the second element

    # other flights -> best flights
    ((3,), (2,))
]

# some structs have variable length, but aren't actual list types... grrr
ignored_lists = [
    (),
    (1, 0, 1, 0, 2),
    (2, 0, 1j, 0),
    (2, 0, 1j, 0, 2, 1j, 15),
    (2, 0, 1j, 0, 2, 1j, 18),
    (2, 0, 1j, 0, 13),
    (2, 0, 1j, 0, 13, 0, 3),
    (2, 0, 1j, 0, 16),
    (2, 0, 1j, 0, 22, 9),
    (2, 0, 1j, 9),
    (5,),
    (6, 0, 1),
    (15,),
    (17, 1j, 2),
]

# resolve alias type
def get_path_key(path: tuple[int, ...]) -> tuple[int, ...]:
    store_path = path
    for from_, to in alias_types:
        if from_ == path[:len(from_)]:
            store_path = to + path[len(to):]
    return store_path

            
'''
corpus is a list of sample data, where each sample is at the same path

2 possibilities when BFSing:
1. the current path is an actual list type, this can be determined by checking if the length of the current path
is not constant between every sample. an index of 1j in the path is used when an actual list type is encountered.
aggregate all elements of the list from all samples and BFS using those new samples.

2. the current path is not a list type, we will iterate on each element in each sample in the current path. value types will be tracked
and can be used to add additional decoder keys. lists will be BFSed further

'''
def bfs(corpus: list[list], path: tuple[int, ...]):
    first = corpus[0]

    # make samples path indexable
    corpus = [Corpus(sample) for sample in corpus]
    path_key = get_path_key(path)

    # check if the samples all have the same length
    if path_key not in ignored_lists and not all([len(sample) == len(first) for sample in corpus]):
        # if not, this path refers to a list type
        lists.add(path_key)

        # if this list type contains structs (not value types), we can explore this new data type by
        # aggregating all samples into a new corpus. the path key of 1j is used to signify that that index
        # refers to a list element's index rather than a fixed index
        if isinstance(first[0], list):
            new_corpus = []
            for sample in corpus:
                assert all([isinstance(sample[i], list) for i in range(len(sample))])
                assert isinstance(sample, Corpus), f'sample is not a list, found {type(sample)} at path {path}'
                for list_items in sample:
                    new_corpus.append(list_items)
            bfs(new_corpus, path_key + (1j,))
        # otherwise map this path to the values seen
        else:
            for sample in corpus:
                value = HashList(sample.data) if isinstance(sample, Corpus) else sample
                found_values[path_key].add(value)
        return
    # if the samples all have the same length, this path refers to a struct
    # explore the struct by looking at each field
    for i in range(len(first)):
        next_path = path_key + (i,)
        # if the next path is also a struct, keep exploring that path
        if isinstance(first[i], list):
            new_corpus = [corpus[j][i] for j in range(len(corpus)) if len(corpus[j]) >= i + 1 and corpus[j][i]]
            # print(new_corpus, next_path)
            if new_corpus:
                bfs(new_corpus, next_path)
        # otherwise map this path to the values seen
        elif next_path not in ignored_lists:
            for j, sample in enumerate(corpus):
                if i < len(sample):
                    value = HashList(sample) if isinstance(sample[i], list) else sample[i]
                    found_values[next_path].add(value)
                else:
                    print(f'Sample {j} is smaller than first sample in corpus ({len(sample)} vs {len(first)})')


bfs(raw_results, ())
for path in lists:
    print(f'Found list type at path {path}')

for path, values in found_values.items():
    print(f'{path} contains: {", ".join(map(str, values))}')



Sample 6 is smaller than first sample in corpus (31 vs 32)
Found list type at path (11,)
Found list type at path (2, 0, 1j, 0, 2, 1j, 12)
Found list type at path (17,)
Found list type at path (26,)
Found list type at path (7, 2, 0)
Found list type at path (2, 0, 1j, 0, 24)
Found list type at path (7, 1, 1)
Found list type at path (7, 4, 0)
Found list type at path (2, 0, 1j, 0, 2, 1j, 8)
Found list type at path (2, 0, 1j, 0, 2)
Found list type at path (2, 4)
Found list type at path (2, 0, 1j, 0, 5)
Found list type at path (2, 0, 1j, 0, 8)
Found list type at path (2, 0, 1j, 0, 23)
Found list type at path (2, 0, 1j, 0, 2, 1j)
Found list type at path (2, 0, 1j, 0, 2, 1j, 10)
Found list type at path (2, 0, 1j, 0, 1)
Found list type at path (2, 0)
Found list type at path (5, 10, 0)
(0, 0) contains: None
(0, 1, 0, 0) contains: 1744099303090466, 1744099324864453, 1744099353554150, 1744099332577927, 1744099342615464, 1744099359783817, 1744099314010955, 1744099347529112
(0, 1, 0, 1) contains: 11

In [215]:
data = {
    'lists': [str(list_) for list_ in sorted(lists)],
    'found_values': {str(key): list(values) for key, values in sorted(found_values.items(), key=lambda items: items[0][0])}
}

Path('data.json').write_text(json.dumps(data, indent=2))

1210953