# **Lectura y carga del archivo CatLineasAereas.csv**

In [None]:
# Instalar las dependencias necesarias
!pip install apache-beam[gcp]
!pip install google-cloud-storage google-cloud-bigquery

# Importar librerias necesarias
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io.jdbc import WriteToJdbc

In [None]:
def parse_csv_cat_lineas_aereas(line):
    import csv
    from io import StringIO

    reader = csv.reader(StringIO(line))
    for row in reader:
        return {
            'Code': row[0],
            'Linea_Aerea': row[1]
        }

pipeline_options = beam.options.pipeline_options.PipelineOptions()

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'Read from GCS' >> ReadFromText('gs://aerline-storage/data/central/CatLineasAereas.csv')
     | 'Parse CSV' >> beam.Map(parse_csv_cat_lineas_aereas)
     | 'Write to MySQL' >> WriteToJdbc(
         driver_class_name='com.mysql.cj.jdbc.Driver',
         jdbc_url='jdbc:mysql://host:3306/db_aerolineas',
         username='root',
         password='',
         table_name='cat_lineas_aereas',
         statement='INSERT INTO catlineasaereas (Code, Linea_Aerea) VALUES (?, ?)'
     )
    )


# **Lectura y carga del archivo pasajeros.csv para la sucursal1**

In [None]:
def parse_csv_pasajeros(line):
    import csv
    from io import StringIO

    reader = csv.reader(StringIO(line))
    for row in reader:
        return {
            'ID_Pasajero': int(row[0]),
            'Pasajero': row[1],
            'Edad': int(row[2])
        }

pipeline_options = beam.options.pipeline_options.PipelineOptions()

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'Read from GCS' >> ReadFromText('gs://aerline-storage/data/sucursal1/Pasajeros.csv')
     | 'Parse CSV' >> beam.Map(parse_csv_pasajeros)
     | 'Write to MySQL' >> WriteToJdbc(
         driver_class_name='com.mysql.cj.jdbc.Driver',
         jdbc_url='jdbc:mysql://host:3306/db_aerolineas',
         username='root',
         password='',
         table_name='sucursal1_pasajeros',
         statement='INSERT INTO sucursal1_pasajeros (ID_Pasajero, Pasajero, Edad) VALUES (?, ?, ?)'
     )
    )


# **Lectura y carga del archivo vuelo.csv para la sucursal1**

In [None]:
def parse_csv(line):
    import csv
    from io import StringIO

    reader = csv.reader(StringIO(line))
    for row in reader:
        # Convert row values to appropriate types if necessary
        return {
            'Sucursal': int(row[0]),
            'Cve_LA': row[1],
            'Viaje': row[2],
            'Clase': row[3],
            'Precio': float(row[4]),
            'Ruta': row[5],
            'Cve_Cliente': int(row[6])
        }

pipeline_options = beam.options.pipeline_options.PipelineOptions()

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'Read from GCS' >> ReadFromText('gs://aerline-storage/data/sucursal1/vuelos.csv')
     | 'Parse CSV' >> beam.Map(parse_csv)
     | 'Write to MySQL' >> WriteToJdbc(
         driver_class_name='com.mysql.cj.jdbc.Driver',
         jdbc_url='jdbc:mysql://host:3306/db_aerolineas',
         username='root',
         password='',
         table_name='sucursal1_vuelos',
         statement='INSERT INTO sucursal1_vuelos (Sucursal, Cve_LA, Viaje, Clase, Precio, Ruta, Cve_Cliente) VALUES (?, ?, ?, ?, ?, ?, ?)'
     )
    )


# **Lectura y carga del archivo pasajeros.csv para la sucursal2**

In [None]:
def parse_csv_pasajeros(line):
    import csv
    from io import StringIO

    reader = csv.reader(StringIO(line))
    for row in reader:
        return {
            'ID_Pasajero': int(row[0]),
            'Pasajero': row[1],
            'Edad': int(row[2])
        }

pipeline_options = beam.options.pipeline_options.PipelineOptions()

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'Read from GCS' >> ReadFromText('gs://aerline-storage/data/sucursal1/Pasajeros.csv')
     | 'Parse CSV' >> beam.Map(parse_csv_pasajeros)
     | 'Write to MySQL' >> WriteToJdbc(
         driver_class_name='com.mysql.cj.jdbc.Driver',
         jdbc_url='jdbc:mysql://host:3306/db_aerolineas',
         username='root',
         password='',
         table_name='sucursal2_pasajeros',
         statement='INSERT INTO sucursal1_pasajeros (ID_Pasajero, Pasajero, Edad) VALUES (?, ?, ?)'
     )
    )


# **Lectura y carga del archivo vuelo.csv para la sucursal2**

In [None]:
def parse_csv(line):
    import csv
    from io import StringIO

    reader = csv.reader(StringIO(line))
    for row in reader:
        # Convert row values to appropriate types if necessary
        return {
            'Sucursal': int(row[0]),
            'Cve_LA': row[1],
            'Viaje': row[2],
            'Clase': row[3],
            'Precio': float(row[4]),
            'Ruta': row[5],
            'Cve_Cliente': int(row[6])
        }

pipeline_options = beam.options.pipeline_options.PipelineOptions()

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'Read from GCS' >> ReadFromText('gs://aerline-storage/data/sucursal1/vuelos.csv')
     | 'Parse CSV' >> beam.Map(parse_csv)
     | 'Write to MySQL' >> WriteToJdbc(
         driver_class_name='com.mysql.cj.jdbc.Driver',
         jdbc_url='jdbc:mysql://host:3306/db_aerolineas',
         username='root',
         password='',
         table_name='sucursal1_vuelos',
         statement='INSERT INTO sucursal1_vuelos (Sucursal, Cve_LA, Viaje, Clase, Precio, Ruta, Cve_Cliente) VALUES (?, ?, ?, ?, ?, ?, ?)'
     )
    )