In [3]:
import csv
import json
import re
import time
import uuid
from pathlib import Path

import numpy as np
import pandas as pd
import pytz
import requests
import datetime as dt
import tqdm
import psycopg2
from difflib import SequenceMatcher

In [4]:
DATA_DIR = Path.cwd().joinpath('data')
SQL_DIR = Path.cwd().joinpath('sql')
AIRCRAFTS_DIR = DATA_DIR.joinpath('aircrafts')
AIRCRAFTS_FILTERED_DIR = DATA_DIR.joinpath('aircrafts_filtered')
ROUTES_DIR = DATA_DIR.joinpath('routes')
ROUTES_FILTERED_DIR = DATA_DIR.joinpath('routes_filtered')

In [5]:
DB_HOST = 'localhost'
DB_PORT = 5432
DB_USER = 'postgres'
DB_PASSWORD = r'zWKHqx1N3%Gt'
DB_NAME = 'data_analysis_lab1'

In [6]:
FR24_DTYPES = {
    'flight_id': object,
    'icao24': object,
    'latitude': float,
    'longitude': float,
    'heading': int,
    'height': int,
    'airspeed': int,
    'squawk': float,
    'locator': object,
    'aircraft': object,
    'registration': object,
    'unixtime': int,
    'departure': object,
    'arrival': object,
    'ticket_route': object,
    'status': int,
    'vertical_speed': int,
    'transponder_route': object,
    'airline': object,
}

In [7]:
SYMBOL_REPLACES = {
    '\u00c0': 'A',
    '\u00c1': 'A',
    '\u00c2': 'A',
    '\u00c3': 'A',
    '\u00c4': 'A',
    '\u00c5': 'A',
    '\u00c6': 'A',
    '\u00c7': 'C',
    '\u00c8': 'E',
    '\u00c9': 'E',
    '\u00ca': 'E',
    '\u00cb': 'E',
    '\u00cc': 'I',
    '\u00cd': 'I',
    '\u00ce': 'I',
    '\u00cf': 'I',
    '\u00d1': 'N',
    '\u00d2': 'O',
    '\u00d3': 'O',
    '\u00d4': 'O',
    '\u00d5': 'O',
    '\u00d6': 'O',
    '\u00d8': 'O',
    '\u00d9': 'U',
    '\u00da': 'U',
    '\u00db': 'U',
    '\u00dc': 'U',
    '\u00dd': 'Y',
    '\u00df': 'S',
    '\u00e0': 'a',
    '\u00e1': 'a',
    '\u00e2': 'a',
    '\u00e3': 'a',
    '\u00e4': 'a',
    '\u00e5': 'a',
    '\u00e6': 'a',
    '\u00e7': 'c',
    '\u00e8': 'e',
    '\u00e9': 'e',
    '\u00ea': 'e',
    '\u00eb': 'e',
    '\u00ec': 'i',
    '\u00ed': 'i',
    '\u00ee': 'i',
    '\u00ef': 'i',
    '\u00f0': 'd',
    '\u00f1': 'n',
    '\u00f2': 'o',
    '\u00f3': 'o',
    '\u00f4': 'o',
    '\u00f5': 'o',
    '\u00f6': 'ö',
    '\u00f8': 'o',
    '\u00f9': 'u',
    '\u00fa': 'u',
    '\u00fb': 'u',
    '\u00fc': 'u',
    '\u00fd': 'y',
    '\u00ff': 'y',
    '\u200b': '',
    '\xa0': ' '
}

In [8]:
def get_aircraft_designators():
    response = requests.post(
        url='https://www4.icao.int/doc8643/External/AircraftTypes'
    )

    content = response.content.decode(response.encoding)
    json_content = json.loads(content)
    designators_data = pd.DataFrame.from_records(json_content)
    special_designators = pd.read_csv(DATA_DIR.joinpath('special_designators.csv'))

    designators = pd.concat([designators_data, special_designators])

    designators.columns = ['name', 'description', 'turbulence_category', 'WTG', 'designator', 'manufacturer', 'type',
                           'engine_count', 'engine_type']
    designators.engine_count = designators.engine_count.str.replace(r'[^\d]+', '1', regex=True)

    designators.to_csv(DATA_DIR.joinpath('designators.csv'), index=False,
                       columns=['name', 'description', 'turbulence_category', 'designator', 'manufacturer', 'type',
                                'engine_count', 'engine_type'])

In [9]:
def get_unique_data():
    registrations = []
    routes = []
    for index, file in enumerate(
            list(DATA_DIR.joinpath('fr24').iterdir())[0:]):
        try:
            data = pd.read_csv(file, names=FR24_DTYPES.keys(), on_bad_lines='skip', dtype=FR24_DTYPES)
        except ValueError:
            continue
        unique_registrations = data.registration.dropna().unique()
        registrations.append(unique_registrations)

        unique_routes = data.ticket_route.dropna().unique()
        routes.append(unique_routes)
    registrations = np.unique(np.concatenate(registrations))
    routes = np.unique(np.concatenate(routes))
    return registrations, routes

In [10]:
def get_aircrafts_info(filename):
    registrations = pd.read_csv(filename, names=['registration'])

    registrations = registrations.sample(frac=1)

    progress_bar = tqdm.tqdm(registrations.registration)
    for registration in progress_bar:
        progress_bar.set_description(f'Fetching {registration}')
        try:
            response = requests.get(
                url='https://api.flightradar24.com/common/v1/flight/list.json?'
                    'enc=IKQGxn3NR31_n-55iS2uKcuzjmvSFrtJX6mpRJYT7oI&'
                    f'query={registration}&'
                    'fetchBy=reg&'
                    'limit=1&'
                    'timestamp=0&'
                    'page=1&'
                    'filterBy=&'
                    'token=IKQGxn3NR31_n-55iS2uKcuzjmvSFrtJX6mpRJYT7oI&'
                    'client=ios_freemium&'
                    'version=9.2.1',
                headers={
                    'User-Agent': 'FlightradarFree/2023021501 CFNetwork/1404.0.5 Darwin/22.3.0'
                }
            )
            with open(f'aircrafts\\{registration}.json', 'w', encoding=response.encoding) as file:
                file.write(response.content.decode(response.encoding))
            time.sleep(0.6)
        except:
            ...

In [11]:
def get_routes_info(filename: 'str | Path'):
    routes = pd.read_csv(filename, names=['route'])

    routes = routes.sample(frac=1)

    progress_bar = tqdm.tqdm(routes.route)

    for route in progress_bar:
        progress_bar.set_description(f'Fetching {routes}')
        progress_bar.refresh()
        page = 1
        last = None
        while True:
            time.sleep(0.6)

            response = requests.get(
                url='https://api.flightradar24.com/common/v1/flight/list.json?'
                    'enc=6pDfb2KZPxots_3kFVasmNL1WJ7rQXvJ5yJb4NhegjA&'
                    f'query={route}&'
                    'fetchBy=flight&'
                    'limit=100&'
                    f'page={page}&'
                    'token=6pDfb2KZPxots_3kFVasmNL1WJ7rQXvJ5yJb4NhegjA',
                headers={
                    'User-Agent': 'FlightradarFree/2023021501 CFNetwork/1404.0.5 Darwin/22.3.0'
                }
            )
            content = response.content.decode(response.encoding)

            debug_name = DATA_DIR.joinpath(f'debug\\{uuid.uuid4()}.json')

            with open(debug_name, 'w') as file:
                file.write(content)

            if '402 Payment Required' in content or 'Error reference number' in content:
                break

            json_data = json.loads(content)
            if 'errors' in json_data:
                break
            elif 'result' not in json_data:
                break
            elif 'response' not in json_data['result']:
                break

            result = json_data['result']['response']

            respath = ROUTES_DIR.joinpath(f'{route}_{page}.json')

            with open(respath, 'w') as file:
                json.dump(json_data, file)

            if 'page' not in result:
                break

            data = result['data']

            if data is None or (last is not None and data[0] == last):
                respath.unlink()
                break

            if not result['page']['more']:
                break

            page += 1
            last = data[0]

In [12]:
def filter_aircrafts():
    progress_bar = tqdm.tqdm(AIRCRAFTS_DIR.iterdir())
    for filename in progress_bar:
        progress_bar.set_description(f'Checking {filename.name}')
        if filename.suffix == '.json':
            with open(filename) as file:
                content = file.read()
            if '402 Payment Required' in content or 'Cloudflare Location' in content:
                filename.unlink()
            else:
                json_data = json.loads(content)
                if 'errors' not in json_data:
                    aircraftInfo = json_data['result']['response']['aircraftInfo']
                    if aircraftInfo:
                        with open(DATA_DIR.joinpath('aircrafts_filtered').joinpath(filename.name), 'w') as file:
                            json.dump(json_data['result']['response']['aircraftInfo'], file)
        else:
            filename.unlink()

In [13]:
def filter_routes():
    progress_bar = tqdm.tqdm(list(ROUTES_DIR.iterdir()))
    for filename in progress_bar:
        progress_bar.set_description(f'Checking {filename.name}')
        if filename.suffix == '.json':
            with open(filename) as file:
                content = file.read()
            json_data = json.loads(content)
            if 'errors' not in json_data:
                response = json_data['result']['response']
                if 'data' in response and response['data']:
                    with open(ROUTES_FILTERED_DIR.joinpath(filename.name), 'w') as file:
                        json.dump(response['data'], file)

In [47]:
def routes_to_csv():
    def check_data(data) -> bool:
        if 'identification' in data:
            if 'id' in data['identification']:
                return data['identification']['id'] is not None
        return False

    csv_path = DATA_DIR.joinpath('flights.csv')
    if csv_path.exists():
        csv_path.unlink()

    progress_bar = tqdm.tqdm(list(ROUTES_FILTERED_DIR.iterdir()))

    for filename in progress_bar:
        progress_bar.set_description(f'Adding {filename.name}')
        progress_bar.refresh()
        with open(filename) as json_file:
            json_data = json.load(json_file)
            airline_icao = ''
            flights = []
            for index, row in enumerate(filter(check_data, json_data)):
                if 'status' in row:
                    if row['status']['live']:
                        continue
                route_number_regex = re.match('(?P<number>.+)_.+', filename.name)
                route_number = route_number_regex.group('number')
                flight_id = row['identification']['id']
                aircraft_registration = ''
                airport_origin = ''
                airport_destination = ''
                scheduled_departure = ''
                scheduled_arrival = ''
                real_departure = ''
                real_arrival = ''

                if 'aircraft' in row:
                    aircraft = row['aircraft']
                    if aircraft and 'registration' in aircraft:
                        aircraft_registration = row['aircraft']['registration']
                if not airline_icao and 'airline' in row:
                    airline = row['airline']
                    if airline and 'code' in airline:
                        code = airline['code']
                        if code and 'icao' in code:
                            airline_icao = code['icao']
                if 'airport' in row:
                    airport = row['airport']
                    if airport:
                        if 'origin' in airport:
                            origin = airport['origin']
                            if origin and 'code' in origin:
                                code = origin['code']
                                if code and 'icao' in code:
                                    airport_origin = code['icao']
                        if 'destination' in airport:
                            destination = airport['destination']
                            if destination and 'code' in destination:
                                code = destination['code']
                                if code and 'icao' in code:
                                    airport_destination = code['icao']
                if 'time' in row:
                    route_times = row['time']
                    if route_times:
                        if 'scheduled' in route_times:
                            scheduled_times = route_times['scheduled']
                            if scheduled_times:
                                if 'departure' in scheduled_times:
                                    scheduled_departure = scheduled_times['departure']
                                if 'arrival' in scheduled_times:
                                    scheduled_arrival = scheduled_times['arrival']
                        if 'real' in route_times:
                            real_times = route_times['real']
                            if real_times:
                                if 'departure' in real_times:
                                    real_departure = real_times['departure']
                                if 'arrival' in real_times:
                                    real_arrival = real_times['arrival']
                flights.append([int(flight_id, 16), route_number, aircraft_registration, airline_icao,
                                airport_origin, airport_destination,
                                scheduled_departure, scheduled_arrival, real_departure,
                                real_arrival])
            if airline_icao:
                for flight in flights:
                    flight[3] = airline_icao
            with open(csv_path, 'a', newline='') as file:
                csv_writer = csv.writer(file)
                csv_writer.writerows(flights)

    dtypes = {
        'flight_id': int,
        'route_number': object,
        'aircraft_registration': object,
        'airline_icao': object,
        'airport_origin': object,
        'airport_destination': object,
        'scheduled_departure': object,
        'scheduled_arrival': object,
        'real_departure': float,
        'real_arrival': float
    }
    data = pd.read_csv(
        csv_path,
        names=dtypes.keys()
    )

    data = data.drop_duplicates(subset='flight_id')
    data.to_csv(csv_path, index=False)

In [15]:
def get_missing_aircrafts():
    dtypes = {
        'flight_id': str,
        'route_number': str,
        'aircraft_registration': str,
        'airline_icao': str,
        'airport_origin': str,
        'airport_destination': str,
        'scheduled_departure': str,
        'scheduled_arrival': str,
        'real_departure': float,
        'real_arrival': float
    }
    data = pd.read_csv(
        DATA_DIR.joinpath('routes_filtered.csv'),
        names=dtypes.keys(),
        dtype=dtypes,
        index_col='flight_id'
    )

    aircrafts_aircrafts = set(map(lambda p: p.stem, DATA_DIR.joinpath('aircrafts_filtered').iterdir()))
    routes_aircrafts = set(data.aircraft_registration.unique())

    with open(DATA_DIR.joinpath('missed_aircrafts.csv'), 'w') as file:
        file.write('\n'.join(map(str, routes_aircrafts - aircrafts_aircrafts)))

    get_aircrafts_info(DATA_DIR.joinpath('missed_aircrafts.csv'))

In [16]:
def aircrafts_to_csv():
    csv_path = DATA_DIR.joinpath('aircrafts_filtered.csv')
    with open(csv_path, 'w', newline='') as csv_file:
        csw_writer = csv.writer(csv_file)
        csw_writer.writerow([
            'registration',
            'model_code',
            'model_text',
            'country_code',
            'production_date',
            'owner_icao'
        ])

    progress_bar = tqdm.tqdm(list(AIRCRAFTS_FILTERED_DIR.iterdir()))

    for filename in progress_bar:
        # for filename in [AIRCRAFTS_FILTERED_DIR.joinpath('D-EEEH.json')]:
        progress_bar.set_description(f'Adding {filename.name}')
        progress_bar.refresh()
        with open(filename) as json_file:
            try:
                json_data = json.load(json_file)
            except:
                continue
            registration = filename.stem

            model_code = None
            model_text = None
            country_code = None
            production_date = None
            owner_icao = None

            if 'model' in json_data:
                model = json_data['model']
                if model:
                    if 'code' in model:
                        model_code = model['code']
                    if 'text' in model:
                        model_text = model['text']
                        if model_text:
                            for original, to_replace in SYMBOL_REPLACES.items():
                                model_text = model_text.replace(original, to_replace)

            if 'country' in json_data:
                country = json_data['country']
                if country and 'alpha2' in country:
                    country_code = country['alpha2']

            if 'age' in json_data:
                age = json_data['age']
                if age and 'date' in age and age['date']:
                    production_date = dt.datetime.strptime(age['date'], '%b %Y').date()

            if 'owner' in json_data:
                owner = json_data['owner']
                if owner and 'code' in owner:
                    code = owner['code']
                    if 'icao' in code:
                        owner_icao = code['icao']
            with open(csv_path, 'a', newline='') as file:
                csv_writer = csv.writer(file)
                csv_writer.writerow([
                    registration,
                    model_code,
                    model_text,
                    country_code,
                    production_date,
                    owner_icao
                ])

In [17]:
def get_airports():
    airports = pd.read_csv('https://davidmegginson.github.io/ourairports-data/airports.csv', header=0, names=[
        'airport_id',
        'code',
        'type',
        'name',
        'latitude',
        'longitude',
        'elevation',
        'continent',
        'country',
        'region',
        'municipality',
        'scheduled_service',
        'gps_code',
        'iata_code',
        'local_code',
        'home_link',
        'wikipedia_link',
        'keywords'
    ], keep_default_na=False)

    airports.to_csv(DATA_DIR.joinpath('airports.csv'), columns=[
        'code',
        'type',
        'name',
        'latitude',
        'longitude',
        'elevation',
        'region',
        'gps_code'
    ], index=False)

In [18]:
def get_countries():
    countries = pd.read_csv('https://davidmegginson.github.io/ourairports-data/countries.csv', header=0, names=[
        'country_id',
        'code',
        'name',
        'continent',
        'wikipedia_link',
        'keywords'
    ], keep_default_na=False)
    countries.to_csv(DATA_DIR.joinpath('countries.csv'), columns=[
        'code',
        'name',
        'continent'
    ], index=False)

In [19]:
def get_regions():
    regions = pd.read_csv('https://davidmegginson.github.io/ourairports-data/regions.csv', header=0, names=[
        'region_id',
        'code',
        'local_coe',
        'name',
        'continent',
        'country',
        'wikipedia_link',
        'keywords'
    ], keep_default_na=False)
    regions.to_csv(DATA_DIR.joinpath('regions.csv'), columns=[
        'code',
        'name',
        'country'
    ], index=False)

In [20]:
def get_airlines():
    airlines = pd.read_csv('https://raw.githubusercontent.com/jpatokal/openflights/master/data/airlines.dat',
                           header=None, names=[
            'airline_id',
            'name',
            'alias',
            'iata_code',
            'code',
            'callsign',
            'country',
            'active'
        ], keep_default_na=False)
    airlines.active = airlines.active.str.upper().map({'Y': True, 'N': False})
    airlines.to_csv(DATA_DIR.joinpath('airlines.csv'), columns=[
        'airline_id',
        'name',
        'alias',
        'code',
        'callsign',
        'country',
        'active'
    ], index=False)

In [21]:
def generate_dates(
        start_date=dt.datetime(year=1900, month=1, day=1, tzinfo=pytz.UTC),
        last_date=dt.datetime.now(tz=pytz.UTC)
):
    if start_date.tzinfo != pytz.UTC or last_date.tzinfo != pytz.UTC:
        raise ValueError('Time zone must be UTC')

    rows = [
        ['the_date', 'weekday', 'month', 'year', 'quarter', 'day_of_year', 'weekend', 'week_of_year']
    ]

    current_date = start_date

    while current_date < last_date:
        the_date = current_date.date()
        weekday = current_date.weekday()
        month = current_date.month
        year = current_date.year
        quarter = (current_date.month - 1) // 3
        day_of_year = current_date.timetuple().tm_yday
        weekend = 5 <= weekday
        week_of_year = current_date.isocalendar()[1]
        rows.append([the_date, weekday, month, year, quarter, day_of_year, weekend, week_of_year])
        current_date += dt.timedelta(days=1)

    with open(DATA_DIR.joinpath('dates.csv'), 'w', newline='') as csv_file:
        csw_writer = csv.writer(csv_file)
        csw_writer.writerows(rows)

In [22]:
def generate_data():
    aircrafts_to_csv()
    get_airlines()
    get_airports()
    get_countries()
    generate_dates()
    get_aircraft_designators()
    routes_to_csv()
    get_regions()

In [29]:
def load_to_stage():
    conn = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD)
    conn.autocommit = True
    cursor = conn.cursor()

    cursor.execute('TRUNCATE stage.aircrafts;')
    cursor.execute('TRUNCATE stage.airlines;')
    cursor.execute('TRUNCATE stage.airports;')
    cursor.execute('TRUNCATE stage.continents;')
    cursor.execute('TRUNCATE stage.countries;')
    cursor.execute('TRUNCATE stage.designators;')
    cursor.execute('TRUNCATE stage.flights;')
    cursor.execute('TRUNCATE stage.regions;')
    cursor.execute('TRUNCATE stage.dates;')

    cursor.execute(
        f"COPY stage.aircrafts (registration, model_code, model_text, country_code, production_date, owner_icao) FROM '{DATA_DIR.joinpath('aircrafts_filtered.csv').absolute()}' DELIMITER ',' CSV HEADER;")

    cursor.execute(
        f"COPY stage.airlines (airline_id, name, alias, code, callsign, country, active) FROM '{DATA_DIR.joinpath('airlines.csv')}' DELIMITER ',' CSV HEADER;")

    cursor.execute(
        f"COPY stage.airports (code, type, name, latitude, longitude, elevation, region, gps_code) FROM '{DATA_DIR.joinpath('airports.csv').absolute()}' DELIMITER ',' CSV HEADER;")

    cursor.execute(
        f"COPY stage.continents (code, name) FROM '{DATA_DIR.joinpath('continents.csv').absolute()}' DELIMITER ',' CSV HEADER;")

    cursor.execute(
        f"COPY stage.countries (code, name, continent) FROM '{DATA_DIR.joinpath('countries.csv').absolute()}' DELIMITER ',' CSV HEADER;")

    cursor.execute(
        f"COPY stage.designators (name, description, turbulence_category, designator, manufacturer, type, engine_count, engine_type) FROM '{DATA_DIR.joinpath('designators.csv').absolute()}' DELIMITER ',' CSV HEADER;")

    cursor.execute(
        f"COPY stage.flights (flight_id, route_number, aircraft_registration, airline_icao, airport_origin, airport_destination, scheduled_departure, scheduled_arrival, real_departure, real_arrival) FROM '{DATA_DIR.joinpath('flights.csv').absolute()}' DELIMITER ',' CSV HEADER;")

    cursor.execute(
        f"COPY stage.regions (code, name, country) FROM '{DATA_DIR.joinpath('regions.csv').absolute()}' DELIMITER ',' CSV HEADER;")

    cursor.execute(
        f"COPY stage.dates (the_date, weekday, month, year, quarter, day_of_year, weekend, week_of_year) FROM '{DATA_DIR.joinpath('dates.csv').absolute()}' DELIMITER ',' CSV HEADER;")

    cursor.close()
    conn.close()

In [24]:
def link_models(conn: str):
    aircrafts = pd.read_sql_table('aircrafts', conn, schema='stage', index_col='aircraft_id')
    designators = pd.read_sql_table('designators_dim', conn, index_col='model_id')
    results = []
    progress_bar = tqdm.tqdm(list(aircrafts.iterrows()))
    for row in progress_bar:
        _, model_code, model_text, *_ = row[1]
        progress_bar.set_description(f'{model_code} | {model_text}')
        progress_bar.refresh()
        variants = designators.query(f'designator == "{model_code}"').name
        result = None
        if model_text and not variants.empty:
            ratios = np.array([SequenceMatcher(None, model_text, variant).ratio() for variant in variants])
            result = variants.iloc[ratios.argmax()]
        results.append(result)
    aircrafts['linked_model_text'] = results
    return aircrafts
if (3 == 3 and
    4 == 5):

In [26]:
def stage_to_fact():
    conn = psycopg2.connect(host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD)
    conn.autocommit = True
    cursor = conn.cursor()

    with open(SQL_DIR.joinpath('etl1.sql')) as file:
        cursor.execute(file.read())

    conn = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
    linked_aircrafts = link_models(conn)
    linked_aircrafts.to_sql('temp_aircrafts', conn)

    with open(SQL_DIR.joinpath('etl2.sql')) as file:
        cursor.execute(file.read())

    cursor.close()
    conn.close()

In [50]:
load_to_stage()

In [None]:
stage_to_fact()

B77L | Boeing 777-FS2:  70%|██████▉   | 19094/27410 [01:45<00:46, 177.98it/s]                            

In [49]:
routes_to_csv()

Adding ZW3624_1.json: 100%|██████████| 3190/3190 [00:18<00:00, 169.08it/s] 


In [1]:
def etl():
    load_to_stage()
    stage_to_fact()