# Desarrollo Apache Beam

Pasos a seguir para desarrollar los scripts, es redundante pero asi se aprende.

In [None]:
%pip install --upgrade pip

Instalamos ipykernel, si es que no existe

In [None]:
%pip install ipykernel

Instalamos las dependencias de apache beam, en este caso el runner interactivo

In [None]:
%pip install apache-beam[interactive]

Cargamos los archivos necesarios para trabajar

In [1]:
import logging
import json
import csv
import datetime
import timezonefinder
import pytz
from pytz.exceptions import UnknownTimeZoneError
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery

In [None]:
!pwd

## 1- Airports

Paso 1 para procesar los aeropuertos extraídos

In [None]:
# pyright: reportPrivateImportUsage=false
# pyright: reportAttributeAccessIssue=false
folder = "/home/inspired/data-science-on-gcp/04_streaming/transform/files"
airports_file = f"{folder}/airports_2024.csv.gz"
with beam.Pipeline("DirectRunner") as pipeline:
    airports = (
        pipeline
        | beam.io.ReadFromText(airports_file)
        | beam.Map(lambda line: next(csv.reader([line])))
        | beam.Map(lambda fields: (fields[0], (fields[21], fields[26])))
    )

    transformed_airports = (
        airports
        | beam.Map(
            lambda airport_data: "{},{}".format(
                airport_data[0], ",".join(airport_data[1])
            )
        )
        | beam.io.WriteToText("df01_extracted_airports")
    )

[Opcional]\
Antes de ejecutar el paso 2 comprobamos la cantidad de aeropuertos
en Estados Unidos\
Aprox. 7288 cumplen con\
AIRPORT_COUNTRY_NAME=='United States'\
o por numero de columna\
8=='United States'

In [None]:
# pyright: reportPrivateImportUsage=false
# pyright: reportAttributeAccessIssue=false
with beam.Pipeline('DirectRunner') as pipeline:
    us_airports = (pipeline
                | beam.io.ReadFromText('airports_2024.csv.gz')
                | beam.Filter(lambda line: "United States" in line)
                | beam.Map(lambda line: next(csv.reader([line])))
                | beam.Map(lambda fields: (fields[0], (fields[21], fields[26])))
                )

    transformed_airports = (us_airports
                            | beam.Map(lambda airport_data: '{},{}'.format(
                                airport_data[0], ','.join(airport_data[1])))
                            | beam.io.WriteToText('df01_extracted_us_airports')
                            )

[Opcional]\
Antes de ejecutar el paso 2 comprobamos la cantidad de aeropuertos
en Estados Unidos y los aeropuertos actuales\
Aprox. 2926 cumplen con AIRPORT_COUNTRY_NAME=='United States' y
AIRPORT_IS_LATEST=='1'

In [None]:
# pyright: reportPrivateImportUsage=false
# pyright: reportAttributeAccessIssue=false
with beam.Pipeline("DirectRunner") as pipeline:
    last_us_airports = (
        pipeline
        | beam.io.ReadFromText("../files/airports_2024.csv.gz")
        | beam.Filter(
            lambda line: "United States" in line and line[-1:] == "1"
        )
        | beam.Map(lambda line: next(csv.reader([line])))
        | beam.Map(lambda fields: (fields[0], (fields[21], fields[26])))
    )

    transformed_airports = (
        last_us_airports
        | beam.Map(
            lambda airport_data: "{},{}".format(
                airport_data[0], ",".join(airport_data[1])
            )
        )
        | beam.io.WriteToText("df01_extracted_last_us_airports")
    )

Eliminamos los archivos procesados.

In [None]:
!rm df01_extracted_*

Obs. Podríamos considerar la fecha de funcionamiento del aeropuerto y las
fechas del vuelo en nuestro pipeline eso seria mas correcto.

In [None]:
# Source
airports_table = bigquery.TableReference(
    projectId="bigquery-manu-407202",
    datasetId="dsongcp",
    tableId="airports",
)
with beam.Pipeline("DirectRunner") as pipeline:
    # Source 1
    airports = (
        pipeline
        | "airports:read"
        >> beam.io.ReadFromBigQuery(
            method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
            table=airports_table,
        )
        | "airports:onlyUSA"
        >> beam.Filter(
            lambda field: "United States" in field["AIRPORT_COUNTRY_NAME"]
        )
        | "airports:fields"
        >> beam.Map(
            lambda field: [
                field["AIRPORT_SEQ_ID"],
                field["LATITUDE"],
                field["LONGITUDE"],
            ]
        )
        | "airports:tz"
        >> beam.Map(
            lambda fields: (
                fields[0],
                addtimezone(fields[1], fields[2]),
            )
        )
        | "airports:write"
        >> beam.io.WriteToText(
            file_path_prefix="path/to/output/file",
            file_name_suffix=".csv",
            header="AIRPORT_SEQ_ID,TZ",
        )
    )

## 2- Airports
Importamos lo necesario para continuar

Paso 2 para procesar los aeropuertos\
Obs. Duración local 30 min aprox con 7288 filas aprox.

In [None]:
def addtimezone(lat, lon):
    """Agrega zona horaria"""

    try:
        # Creamos una instancia de la clase para que sea re-usada
        tf = timezonefinder.TimezoneFinder()
        # Comprobar en qué zona horaria se encuentra un punto
        tz = tf.timezone_at(lng=float(lon), lat=float(lat))
        if tz is None:
            tz = 'UTC'
        return lat, lon, tz
    except (ValueError, UnknownTimeZoneError):
        return lat, lon, 'TIMEZONE'  # Encabezado

In [None]:
# pyright: reportPrivateImportUsage=false
# pyright: reportAttributeAccessIssue=false
with beam.Pipeline('DirectRunner') as pipeline:
    airports = (pipeline
                | beam.io.ReadFromText('airports_2024.csv.gz')
                | beam.Filter(lambda line: "United States" in line)
                | beam.Map(lambda line: next(csv.reader([line])))
                | beam.Map(
                    lambda fields: (
                        fields[0], addtimezone(fields[21], fields[26])
                    )
                )
                )

    airports_with_tz = (airports
                        | beam.Map(lambda f: f"{f[0]},{','.join(f[1])}")
                        | beam.io.textio.WriteToText(
                            'df02_airports_with_tz'
                        )
                        )

Eliminamos los archivos procesados.

In [None]:
!rm df02_airports_*

## 3- Airports
Importamos lo necesario para continuar

Definimos las funciones a usar en el pipeline

In [None]:
def addtimezone(lat, lon):
    """
    Agrega la zona horaria correspondiente a las coordenadas proporcionadas.
    """

    try:
        # Crear una instancia de TimezoneFinder
        tf = timezonefinder.TimezoneFinder()
        # Convertir las coordenadas a números de punto flotante
        lat = float(lat)
        lon = float(lon)
        # Devolver las coordenadas y la zona horaria correspondiente
        return lat, lon, tf.timezone_at(lng=lon, lat=lat)
    except (ValueError, UnknownTimeZoneError):
        # Manejo de excepción en caso de error de valor o timezone
        return lat, lon, 'TIMEZONE'  # Encabezado


def as_utc(date, hhmm, tzone):
    """ Convierte a UTC."""

    try:
        # Verifica si hay una hora válida y una zona horaria proporcionada
        if len(hhmm) > 0 and tzone is not None:
            # Para manejar fechas y zonas horarias
            # Crea un objeto de zona horaria utilizando la zona proporcionada
            loc_tz = pytz.timezone(tzone)
            # Convierte la fecha en un objeto datetime en la zona horaria local
            loc_dt = loc_tz.localize(
                datetime.datetime.strptime(date, '%Y-%m-%d'),
                is_dst=False
            )
            # La hora se divide en horas y minutos, y se agrega a la fecha y hora local
            loc_dt += datetime.timedelta(
                hours=int(hhmm[:2]),
                minutes=int(hhmm[2:])
            )
            # Convierte la fecha y hora local en UTC
            utc_dt = loc_dt.astimezone(pytz.utc)
            # Retorna la fecha y hora en formato de cadena 'YYYY-MM-DD HH:MM:SS'
            return utc_dt.strftime('%Y-%m-%d %H:%M:%S')
        else:
            # Si no hay hora válida o zona horaria, retorna una cadena vacía
            return ''  # Una cadena vacía corresponde a vuelos cancelados
    except ValueError as e:
        # Si ocurre un error de ValueError, registra la excepción y vuelve a lanzarla
        logging.exception("%s %s %s ValueError: %s", date, hhmm, tzone, e)


def tz_correct(line, airport_timezones):
    """ Correcciones de zonas horarias."""

    # Cargamos los campos del registro JSON en un diccionario llamado "fields".
    fields = json.loads(line)
    try:
        # Obtenemos el ID del aeropuerto de origen y destino.
        dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
        arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
        # Obtenemos las zonas horarias de los aeropuertos de origen y destino.
        dep_timezone = airport_timezones[dep_airport_id][2]
        arr_timezone = airport_timezones[arr_airport_id][2]
        # Iteramos sobre las hhmm de embarque y las convertimos a UTC.
        for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
            fields[f] = as_utc(fields["FL_DATE"], fields[f], dep_timezone)
        # Iteramos sobre las hhmm de llegada y las convertimos a UTC.
        for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
            fields[f] = as_utc(fields["FL_DATE"], fields[f], arr_timezone)
        # Generamos una cadena JSON con los campos actualizados y la devolvemos.
        yield json.dumps(fields)
    except KeyError:
        # En caso de que falte una clave en el diccionario, registramos una excepción.
        logging.exception(
            " Ignorando %s porque el aeropuerto no es conocido",
            line
        )

In [None]:
# pyright: reportPrivateImportUsage=false
# pyright: reportAttributeAccessIssue=false
with beam.Pipeline('DirectRunner') as pipeline:
    # Leer el archivo 'airports.csv.gz' y filtrar líneas con "United States"
    airports = (pipeline
                | 'airports:read' >> beam.io.ReadFromText(
                    'airports_2024.csv.gz')
                | beam.Filter(lambda line: "United States" in line)
                # Mapear cada línea a los campos correspondientes
                | 'airports:fields' >> beam.Map(
                    lambda line: next(
                        csv.reader([line])
                    )
                )
                # Mapear los campos para agregar la zona horaria
                | 'airports:tz' >> beam.Map(
                    lambda fields: (
                        fields[0], addtimezone(fields[21], fields[26])
                    )
                )
                )
    logging.info("Éxito en airports:tz")

    # Leer el archivo 'flights_sample.json' y realizar corrección de zona horaria
    flights = (pipeline
               | 'flights:read' >> beam.io.ReadFromText(
                   'flights_sample_2024.json'
               )
               | 'flights:tzcorr' >> beam.FlatMap(
                   tz_correct, beam.pvalue.AsDict(airports)
               )
               )
    logging.info("Éxito en flights:tzcorr")

    # Escribir los resultados en un archivo 'all_flights'
    all_flights = (flights
                   | beam.io.textio.WriteToText('df03_all_flights')
                   )
    logging.info("Éxito en escribir df03_all_flights")

## 4- dwa

vdsavdsa

In [None]:
# pyright: reportOptionalMemberAccess=false


def addtimezone(lat, lon):
    """
    Agrega la zona horaria correspondiente a las coordenadas proporcionadas.
    """

    try:
        tf = timezonefinder.TimezoneFinder()
        lat = float(lat)
        lon = float(lon)
        return lat, lon, tf.timezone_at(lng=lon, lat=lat)
    except (ValueError, UnknownTimeZoneError):
        return lat, lon, 'TIMEZONE'  # header


def as_utc(date, hhmm, tzone):
    """Convierte una fecha y hora en formato UTC"""

    try:
        if len(hhmm) > 0 and tzone is not None:
            loc_tz = pytz.timezone(tzone)
            loc_dt = loc_tz.localize(
                datetime.datetime.strptime(date, '%Y-%m-%d'),
                is_dst=False
            )
            # Considera las horas 2400 y 0000
            loc_dt += datetime.timedelta(
                hours=int(hhmm[:2]), minutes=int(hhmm[2:]))
            utc_dt = loc_dt.astimezone(pytz.utc)
            return (
                utc_dt.strftime('%Y-%m-%d %H:%M:%S'),
                loc_dt.utcoffset().total_seconds()
            )
        else:
            # Vuelos cancelados y offset de 0
            print("Devolviendo ('', 0) porque hhmm está vacío or tzone es None")
            return '', 0
    except ValueError as e:
        logging.exception("%s %s %s ValueError: %s", date, hhmm, tzone, e)
        print("Exception occurred in as_utc:", e)
        raise ValueError from e

def add_24h_if_before(arrtime, deptime):
    """add_24h_if_before"""

    if len(arrtime) > 0 and len(deptime) > 0 and arrtime < deptime:
        adt = datetime.datetime.strptime(arrtime, '%Y-%m-%d %H:%M:%S')
        adt += datetime.timedelta(hours=24)
        return adt.strftime('%Y-%m-%d %H:%M:%S')
    else:
        return arrtime


def tz_correct(line, airport_timezones):
    """Realiza un ajuste de zonas horarias"""

    fields = json.loads(line)
    try:
        # convert all times to UTC
        dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
        arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
        dep_timezone = airport_timezones[dep_airport_id][2]
        arr_timezone = airport_timezones[arr_airport_id][2]

        for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
            fields[f], deptz = as_utc(
                fields["FL_DATE"],
                fields[f],
                dep_timezone
            )  # type: ignore

        for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
            fields[f], arrtz = as_utc(
                fields["FL_DATE"],
                fields[f],
                arr_timezone
            )  # type: ignore

        for f in ["WHEELS_OFF", "WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
            fields[f] = add_24h_if_before(
                fields[f],
                fields["DEP_TIME"]
            )

        fields["DEP_AIRPORT_LAT"] = airport_timezones[dep_airport_id][0]
        fields["DEP_AIRPORT_LON"] = airport_timezones[dep_airport_id][1]
        fields["DEP_AIRPORT_TZOFFSET"] = deptz
        fields["ARR_AIRPORT_LAT"] = airport_timezones[arr_airport_id][0]
        fields["ARR_AIRPORT_LON"] = airport_timezones[arr_airport_id][1]
        fields["ARR_AIRPORT_TZOFFSET"] = arrtz
        yield json.dumps(fields)
    except KeyError as e:
        # En caso de que falte una clave en el diccionario, registramos una excepción.
        logging.exception(
            " Ignorando %s aeropuerto no conocido, KeyError Error: %s",
            line,
            e
        )

In [None]:
# pyright: reportPrivateImportUsage=false
# pyright: reportAttributeAccessIssue=false
with beam.Pipeline('DirectRunner') as pipeline:
    airports = (pipeline
                | 'airports:read' >> beam.io.ReadFromText(
                    'airports_2024.csv.gz'
                )
                | beam.Filter(lambda line: "United States" in line)
                | 'airports:fields' >> beam.Map(
                    lambda line: next(
                        csv.reader([line])
                    )
                )
                | 'airports:tz' >> beam.Map(
                    lambda fields: (
                        fields[0],
                        addtimezone(fields[21], fields[26])
                    )
                )
                )
    flights = (pipeline
                | 'flights:read' >> beam.io.ReadFromText(
                    'flights_sample_2024.json'
                )
                | 'flights:tzcorr' >> beam.FlatMap(
                    tz_correct,
                    beam.pvalue.AsDict(
                        airports)  # type: ignore
                )
                )

    all_flights = (flights
                    | beam.io.textio.WriteToText('df04_all_flights')
                    )

## 5- fewfe

fwefewfw

In [None]:
# pyright: reportGeneralTypeIssues =false
def tz_correct(fields, airport_timezones):
    """
    Realiza un ajuste de zonas horarias para los campos de fecha y hora"""

    try:
        # convert all times to UTC
        dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
        arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
        dep_timezone = airport_timezones[dep_airport_id][2]
        arr_timezone = airport_timezones[arr_airport_id][2]

        for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
            fields[f], deptz = as_utc(fields["FL_DATE"], fields[f], dep_timezone)
        for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
            fields[f], arrtz = as_utc(fields["FL_DATE"], fields[f], arr_timezone)

        for f in ["WHEELS_OFF", "WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
            fields[f] = add_24h_if_before(fields[f], fields["DEP_TIME"])

        fields["DEP_AIRPORT_LAT"] = airport_timezones[dep_airport_id][0]
        fields["DEP_AIRPORT_LON"] = airport_timezones[dep_airport_id][1]
        fields["DEP_AIRPORT_TZOFFSET"] = deptz
        fields["ARR_AIRPORT_LAT"] = airport_timezones[arr_airport_id][0]
        fields["ARR_AIRPORT_LON"] = airport_timezones[arr_airport_id][1]
        fields["ARR_AIRPORT_TZOFFSET"] = arrtz
        yield fields
    except KeyError as e:
        # En caso de que falte una clave en el diccionario, registramos una excepción.
        logging.exception(
            " Ignorando %s aeropuerto no conocido, KeyError Error: %s", fields, e
        )


def get_next_event(fields):
    """Determina el siguiente evento a partir de los campos disponibles."""

    if len(fields["DEP_TIME"]) > 0:
        event = dict(fields)  # copia de linea json
        event["EVENT_TYPE"] = "departed"
        event["EVENT_TIME"] = fields["DEP_TIME"]
        for f in [
            "TAXI_OUT",
            "WHEELS_OFF",
            "WHEELS_ON",
            "TAXI_IN",
            "ARR_TIME",
            "ARR_DELAY",
            "DISTANCE",
        ]:
            event.pop(f, None)  # No se conoce el dato a la hora de embarque
        yield event
    if len(fields["ARR_TIME"]) > 0:
        event = dict(fields)
        event["EVENT_TYPE"] = "arrived"
        event["EVENT_TIME"] = fields["ARR_TIME"]
        yield event

In [None]:
# pyright: reportPrivateImportUsage=false
# pyright: reportAttributeAccessIssue=false
with beam.Pipeline("DirectRunner") as pipeline:
    airports = (
        pipeline
        | "airports:read" >> beam.io.ReadFromText("airports_2024.csv.gz")
        | beam.Filter(lambda line: "United States" in line)
        | "airports:fields" >> beam.Map(lambda line: next(csv.reader([line])))
        | "airports:tz"
        >> beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
    )

    flights = (
        pipeline
        | "flights:read" >> beam.io.ReadFromText("flights_sample_2024.json")
        | "flights:parse" >> beam.Map(lambda line: json.loads(line))
        | "flights:tzcorr" >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports))
    )

    write_flights = (
        flights
        | "flights:tostring" >> beam.Map(lambda fields: json.dumps(fields))
        | "flights:out" >> beam.io.textio.WriteToText("df_05_all_flights")
    )

    events = flights | beam.FlatMap(get_next_event)

    print_events = (
        events
        | "events:tostring" >> beam.Map(lambda fields: json.dumps(fields))
        | "events:out" >> beam.io.textio.WriteToText("df05_all_events")
    )

## 6- BigQuery

Leer y escribir desde bigquery necesita un bucket, ya que el SDK de python
para apache beam invoca una solicitud de exportación cuando aplicamos
una transformación de lectura BigQueryIO.

In [None]:
# pyright: reportGeneralTypeIssues =false

def tz_correct(fields, airport_timezones):
    """
    Realiza un ajuste de zonas horarias para los campos de fecha y hora
    """

    # Compatibilidad con JSON y BigQuery
    fields['FL_DATE'] = fields['FL_DATE'].strftime('%Y-%m-%d')
    try:
        # Convertir a UTC
        dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
        arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
        dep_timezone = airport_timezones[dep_airport_id][2]
        arr_timezone = airport_timezones[arr_airport_id][2]
        for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
            fields[f], deptz = as_utc(
                fields["FL_DATE"], fields[f], dep_timezone)
        for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
            fields[f], arrtz = as_utc(
                fields["FL_DATE"], fields[f], arr_timezone)
        for f in ["WHEELS_OFF", "WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
            fields[f] = add_24h_if_before(fields[f], fields["DEP_TIME"])

        fields["DEP_AIRPORT_LAT"] = airport_timezones[dep_airport_id][0]
        fields["DEP_AIRPORT_LON"] = airport_timezones[dep_airport_id][1]
        fields["DEP_AIRPORT_TZOFFSET"] = deptz
        fields["ARR_AIRPORT_LAT"] = airport_timezones[arr_airport_id][0]
        fields["ARR_AIRPORT_LON"] = airport_timezones[arr_airport_id][1]
        fields["ARR_AIRPORT_TZOFFSET"] = arrtz
        yield fields
    except KeyError:
        logging.exception(
            "Ignoring %s because airport is not known,error: %s",
            fields,
            KeyError
        )


def get_next_event(fields):
    """
    Determina el siguiente evento de un vuelo a partir de los campos de datos
    """

    if len(fields["DEP_TIME"]) > 0:
        event = dict(fields)  # copy
        event["EVENT_TYPE"] = "departed"
        event["EVENT_TIME"] = fields["DEP_TIME"]
        for f in ["TAXI_OUT", "WHEELS_OFF", "WHEELS_ON", "TAXI_IN",
                  "ARR_TIME", "ARR_DELAY", "DISTANCE"]:
            event.pop(f, None)  # not knowable at departure time
        yield event
    if len(fields["WHEELS_OFF"]) > 0:
        event = dict(fields)  # copy
        event["EVENT_TYPE"] = "wheelsoff"
        event["EVENT_TIME"] = fields["WHEELS_OFF"]
        for f in ["WHEELS_ON", "TAXI_IN", "ARR_TIME", "ARR_DELAY", "DISTANCE"]:
            event.pop(f, None)  # not knowable at departure time
        yield event
    if len(fields["ARR_TIME"]) > 0:
        event = dict(fields)
        event["EVENT_TYPE"] = "arrived"
        event["EVENT_TIME"] = fields["ARR_TIME"]
        yield event


# Crea fila bq
def create_event_row(fields):
    """Crea una fila de evento para ser utilizada en un formato tabular."""

    featdict = dict(fields)  # copy
    featdict['EVENT_DATA'] = json.dumps(fields)
    return featdict

In [None]:
# pyright: reportPrivateImportUsage=false
# pyright: reportAttributeAccessIssue=false
# pyright: reportUnusedExpression =false

def run(project):
    """
    Ejecuta el pipeline con los argumentos correspondientes
    """

    argv = [
        f'--project={project}',
        '--runner=DirectRunner'
    ]
    airports_query = f'SELECT * FROM {project}.dsongcp.airports'
    sample_query = f'SELECT * FROM {project}.dsongcp.flights_sample'
    flights_output = 'df06_all_flights'

    with beam.Pipeline(argv=argv) as pipeline:
        airports = (pipeline
                    | 'airports:read' >> beam.io.ReadFromBigQuery(
                        query=airports_query,
                        use_standard_sql=True
                    )
                    | beam.Filter(lambda line: "United States" in line)
                    | 'airports:fields' >> beam.Map(
                        lambda line: next(csv.reader([line]))
                    )
                    | 'airports:tz' >> beam.Map(
                        lambda fields: (fields[0],
                                        addtimezone(fields[21], fields[26])
                                        )
                    )
                    )

        flights = (pipeline
                   | 'flights:read' >> beam.io.ReadFromBigQuery(
                       query=sample_query,
                       use_standard_sql=True
                   )
                   | 'flights:tzcorr' >> beam.FlatMap(
                       tz_correct, beam.pvalue.AsDict(airports)
                   )
                   )

        (flights
         | 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
         | 'flights:out' >> beam.io.textio.WriteToText(flights_output)
         )


        events = flights | beam.FlatMap(get_next_event)

        (events
         | 'events:tostring' >> beam.Map(lambda fields: json.dumps(fields))
         | 'events:out' >> beam.io.textio.WriteToText('df06_all_events')
         )

In [None]:
pr = 'bigquery-manu-407202'
run(pr)

# Test

In [3]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
    keep_duration = pipeline | "Duration filters" >> beam.Create(
        [
            ("annual", False),
            ("biennial", False),
            ("perennial", True),
        ]
    )

    perennials = (
        pipeline
        | "Gardening plants"
        >> beam.Create(
            [
                {
                    "icon": "🍓",
                    "name": "Strawberry",
                    "duration": "perennial",
                },
                {"icon": "🥕", "name": "Carrot", "duration": "biennial"},
                {"icon": "🍆", "name": "Eggplant", "duration": "perennial"},
                {"icon": "🍅", "name": "Tomato", "duration": "annual"},
                {"icon": "🥔", "name": "Potato", "duration": "perennial"},
            ]
        )
        | "Filter plants by duration"
        >> beam.Filter(
            lambda plant, keep_duration: keep_duration[plant["duration"]],
            keep_duration=beam.pvalue.AsDict(keep_duration),
        )
        | beam.Map(print)
    )
    # [END filter_side_inputs_dict]

{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


In [11]:
import csv
import apache_beam as beam

with beam.Pipeline() as pipeline:
    icons = (
        pipeline
        | "read" >> beam.io.ReadFromText("./airports_test.csv")
        # Convierte a lista
        | "parse" >> beam.Map(lambda line: next(csv.reader([line])))
        | "print" >> beam.Map(print)
    )

['AIRPORT_SEQ_ID', 'AIRPORT_ID', 'AIRPORT', 'DISPLAY_AIRPORT_NAME', 'DISPLAY_AIRPORT_CITY_NAME_FULL', 'AIRPORT_WAC_SEQ_ID2', 'AIRPORT_WAC', 'AIRPORT_COUNTRY_NAME', 'AIRPORT_COUNTRY_CODE_ISO', 'AIRPORT_STATE_NAME', 'AIRPORT_STATE_CODE', 'AIRPORT_STATE_FIPS', 'CITY_MARKET_SEQ_ID', 'CITY_MARKET_ID', 'DISPLAY_CITY_MARKET_NAME_FULL', 'CITY_MARKET_WAC_SEQ_ID2', 'CITY_MARKET_WAC', 'LAT_DEGREES', 'LAT_HEMISPHERE', 'LAT_MINUTES', 'LAT_SECONDS', 'LATITUDE', 'LON_DEGREES', 'LON_HEMISPHERE', 'LON_MINUTES', 'LON_SECONDS', 'LONGITUDE', 'UTC_LOCAL_TIME_VARIATION', 'AIRPORT_START_DATE', 'AIRPORT_THRU_DATE', 'AIRPORT_IS_CLOSED', 'AIRPORT_IS_LATEST']
['1000101', '10001', '01A', 'Afognak Lake Airport', 'Afognak Lake, AK', '101', '1', 'United States', 'US', 'Alaska', 'AK', '02', '3000101', '30001', 'Afognak Lake, AK', '101', '1', '58', 'N', '6', '34', '58.10944444', '152', 'W', '54', '24', '-152.90666667', '', '2007-07-01', '', '0', '1']
['1000301', '10003', '03A', 'Bear Creek Mining Strip', 'Granite Moun

## Raw input BigQuery