## ASIGNACION VIAJE (INICIO + FIN)

In [33]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms import CoGroupByKey
import json

################config################################

options = PipelineOptions(
    streaming=True,
    runner='DataflowRunner',
    experiments='enable_streaming_engine,use_beam_bq_sink'
)

# Definir suscripciones y otros detalles si es necesario
suscripcion_coche = 'projects/woven-justice-411714/subscriptions/blablacar_car-sub'
suscripcion_usuario = 'projects/woven-justice-411714/subscriptions/blablacar_user-sub'

#########funciones#######################################

# Recibe datos
class DecodeMessage(beam.DoFn):
    def process(self, element):
        output = element.decode('utf-8')
        json_data = json.loads(output)
        return [json_data]

# Función para extraer la clave 'user_geo' de cada elemento para el inicio del viaje
def extract_geo_user(element):
    geo = element.get('user_geo', None)
    return (geo, element)

# Función para extraer la clave 'user_geo_fin' de cada elemento para el fin del viaje
def extract_geo_fin(element):
    geo = element.get('user_geo_fin', None)
    return (geo, element)

# Función para extraer la clave 'coche_geo' de cada elemento
def extract_geo_coche(element):
    geo = element.get('coche_geo', None)
    return (geo, element)

# Función para filtrar casos coincidentes y no coincidentes para el inicio del viaje
class FilterCoincidentCases_inicio(beam.DoFn):
    def process(self, element):
        geo_key, messages = element
        coches = messages['coches']
        usuarios = messages['usuarios']

        if coches and usuarios:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'inicio_viaje': True}
        else:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'inicio_viaje': False}

# Función para filtrar casos coincidentes y no coincidentes para el fin del viaje
class FilterCoincidentCases_fin(beam.DoFn):
    def process(self, element):
        geo_key, messages = element
        coches = messages['coches']
        usuarios = messages['usuarios']

        if coches and usuarios:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'fin_viaje': True}
        else:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'fin_viaje': False}

##########pipeline#################################

# Crear el pipeline
with beam.Pipeline(options=PipelineOptions(streaming=True, save_main_session=True)) as p:
    # Coches
    coches_data = (
        p
        | "Coche_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_coche)
        | "Coche_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Coche_Extraer_Clave_geo" >> beam.Map(extract_geo_coche)
        | "Coche_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(500))
    )

    # Usuarios
    usuarios_data = (
        p
        | "Usuario_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_usuario)
        | "Usuario_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Usuario_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(500))
    )

    # Derivar dos flujos distintos para inicio y fin del viaje
    usuarios_data_inicio = (
        usuarios_data
        | "Usuario_Extraer_Clave_geo_inicio" >> beam.Map(extract_geo_user)
        | "Etiquetar_inicio_viaje" >> beam.Map(lambda x: (x[0], (x[1], 'inicio')))
    )

    usuarios_data_fin = (
        usuarios_data
        | "Usuario_Extraer_Clave_geo_fin" >> beam.Map(extract_geo_fin)
        | "Etiquetar_fin_viaje" >> beam.Map(lambda x: (x[0], (x[1], 'fin')))
    )

    # Realizar un CoGroupByKey en base al campo 'geo'_inicio
    joined_data_inicio = (
        {'coches': coches_data, 'usuarios': usuarios_data_inicio}
        | "Merge_Mensajes_por_geo" >> CoGroupByKey()
        | "Filtrar_Casos_Coincidentes" >> beam.ParDo(FilterCoincidentCases_inicio())
        | "Filtrar_Solo_Coincidentes" >> beam.Filter(lambda element: element['inicio_viaje'])
        | "Imprimir_Resultados_inic" >> beam.Map(lambda element: print(element))
    )

    # Realizar un CoGroupByKey en base al campo 'geo'_fin
    joined_data_fin = (
        {'coches': coches_data, 'usuarios': usuarios_data_fin}
        | "Merge_Mensajes_por_geo_fin" >> CoGroupByKey()
        | "Filtrar_Casos_Coincidentes_fin" >> beam.ParDo(FilterCoincidentCases_fin())
        | "Filtrar_Solo_Coincidentes_fin" >> beam.Filter(lambda element: element['fin_viaje'])
        | "Imprimir_Resultados_fin" >> beam.Map(lambda element: print(element))
    )




{'geo': '39.49597-0.38838', 'coches': [{'coche_id_message': '15039f38-ef74-4707-a92e-e829042cf035', 'coche_id': 1, 'coche_index_msg': 1, 'coche_geo': '39.49597-0.38838', 'coche_latitud': 39.49597, 'coche_longitud': -0.38838, 'coche_datetime': '2024-02-02T19:09:21.205534', 'coche_ruta': 'benicalap-alboraya.kml'}, {'coche_id_message': 'ab6cd6ee-c38f-4b72-86f0-477097b55045', 'coche_id': 1, 'coche_index_msg': 1, 'coche_geo': '39.49597-0.38838', 'coche_latitud': 39.49597, 'coche_longitud': -0.38838, 'coche_datetime': '2024-02-02T19:09:51.821540', 'coche_ruta': 'benicalap-alboraya.kml'}], 'usuarios': [({'user_id_message': 'c764b991-edc3-49b7-b53c-53a93f51f161', 'user_id': 9990, 'user_datetime': '2024-02-02T19:09:34.203266', 'user_geo': '39.49597-0.38838', 'user_geo_fin': '39.49693-0.38688', 'user_latitud_inicio': 39.49597, 'user_longitud_inicio': -0.38838, 'user_latitud_destino': 39.49693, 'user_longitud_destino': -0.38688}, 'inicio'), ({'user_id_message': '3f1d8757-bf1e-4cbc-a42c-78c770774b

KeyboardInterrupt: 

## ASIGNACION VIAJE (INICIO + FIN) BQ

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms import CoGroupByKey
import json
from apache_beam.io.gcp.internal.clients import bigquery


################config################################

options = PipelineOptions(
    streaming=True,
    runner='DataflowRunner',
    experiments='enable_streaming_engine,use_beam_bq_sink'
)

##################################### Adri ##################################################
suscripcion_coche = 'projects/woven-justice-411714/subscriptions/blablacar_car1-sub'
suscripcion_usuario = 'projects/woven-justice-411714/subscriptions/blablacar_user1-sub'
project_id = 'woven-justice-411714'
bucket_name = "woven-justice-411714"
table_id= 'asignaciones'
dataset_id= 'ejemplo'


################################## Funciones #######################################
   
# Recibe datos
class DecodeMessage(beam.DoFn):
    def process(self, element):
        output = element.decode('utf-8')
        json_data = json.loads(output)
        return [json_data]

# Función para extraer la clave 'user_geo' de cada elemento para el inicio del viaje
def extract_geo_user(element):
    geo = element.get('user_geo', None)
    return (geo, element)

# Función para extraer la clave 'user_geo_fin' de cada elemento para el fin del viaje
def extract_geo_fin(element):
    geo = element.get('user_geo_fin', None)
    return (geo, element)

# Función para extraer la clave 'coche_geo' de cada elemento
def extract_geo_coche(element):
    geo = element.get('coche_geo', None)
    return (geo, element)

# Función para filtrar casos coincidentes y no coincidentes para el inicio del viaje
class FilterCoincidentCases_inicio(beam.DoFn):
    def process(self, element):
        geo_key, messages = element
        coches = messages['coches']
        usuarios = messages['usuarios']

        if coches and usuarios:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'inicio_viaje': True}
        else:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'inicio_viaje': False}

# Función para filtrar casos coincidentes y no coincidentes para el fin del viaje
class FilterCoincidentCases_fin(beam.DoFn):
    def process(self, element):
        geo_key, messages = element
        coches = messages['coches']
        usuarios = messages['usuarios']

        if coches and usuarios:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'fin_viaje': True}
        else:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'fin_viaje': False}







################################## Funciones para BQ #################################################
######################################################################################################

# Define una función para convertir el diccionario a una cadena JSON
def convert_to_json(element):
    return json.dumps(element)


############### Pipeline #################################
# Define el esquema de la tabla
new_table_schema = bigquery.TableSchema()
new_table_fields = [
    bigquery.TableFieldSchema(name='geo', type='STRING', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='coche_id_message', type='STRING', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='coche_id', type='INTEGER', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='coche_index_msg', type='INTEGER', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='coche_geo', type='STRING', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='coche_latitud', type='FLOAT', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='coche_longitud', type='FLOAT', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='coche_datetime', type='DATETIME', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='coche_ruta', type='STRING', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='user_id_message', type='STRING', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='user_id', type='INTEGER', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='user_datetime', type='DATETIME', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='user_geo', type='STRING', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='user_geo_fin', type='STRING', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='user_latitud_inicio', type='FLOAT', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='user_longitud_inicio', type='FLOAT', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='user_latitud_destino', type='FLOAT', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='user_longitud_destino', type='FLOAT', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='inicio_viaje', type='BOOLEAN', mode='NULLABLE')
]
new_table_schema.fields.extend(new_table_fields)



# Crear el pipeline
with beam.Pipeline(options=PipelineOptions(streaming=True)) as p:
    
    # Coches
    coches_data = (
        p
        | "Coche_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_coche)
        | "Coche_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Coche_Extraer_Clave_geo" >> beam.Map(extract_geo_coche)
        | "Coche_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(500))
    )

    # Usuarios
    usuarios_data = (
        p
        | "Usuario_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_usuario)
        | "Usuario_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Usuario_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(500))
    )

    # Derivar dos flujos distintos para inicio y fin del viaje
    usuarios_data_inicio = (
        usuarios_data
        | "Usuario_Extraer_Clave_geo_inicio" >> beam.Map(extract_geo_user)
        | "Etiquetar_inicio_viaje" >> beam.Map(lambda x: (x[0], (x[1], 'inicio')))
    )

    usuarios_data_fin = (
        usuarios_data
        | "Usuario_Extraer_Clave_geo_fin" >> beam.Map(extract_geo_fin)
        | "Etiquetar_fin_viaje" >> beam.Map(lambda x: (x[0], (x[1], 'fin')))
    )

    # Realizar un CoGroupByKey en base al campo 'geo'_inicio
    joined_data_inicio = (
        {'coches': coches_data, 'usuarios': usuarios_data_inicio}
        | "Merge_Mensajes_por_geo" >> CoGroupByKey()
        | "Filtrar_Casos_Coincidentes" >> beam.ParDo(FilterCoincidentCases_inicio())
        | "Filtrar_Solo_Coincidentes" >> beam.Filter(lambda element: element['inicio_viaje'])
        | "Imprimir_Resultados_inic" >> beam.Map(lambda element: print(element))
    )

    # Realizar un CoGroupByKey en base al campo 'geo'_fin
    joined_data_fin = (
        {'coches': coches_data, 'usuarios': usuarios_data_fin}
        | "Merge_Mensajes_por_geo_fin" >> CoGroupByKey()
        | "Filtrar_Casos_Coincidentes_fin" >> beam.ParDo(FilterCoincidentCases_fin())
        | "Filtrar_Solo_Coincidentes_fin" >> beam.Filter(lambda element: element['fin_viaje'])
        | "Imprimir_Resultados_fin" >> beam.Map(lambda element: print(element))
    )

print ('hello BG')

    # Seleccionar campos específicos de joined_data_inicio
    selected_fields_inicio = (
        joined_data_inicio
        | "Seleccionar_Campos_inicio" >> beam.Map(lambda element: {
            'geo': element['geo'],
            'coche_id_message': element['coches'][0]['coche_id_message'],
            'coche_id': element['coches'][0]['coche_id'],
            'coche_index_msg': element['coches'][0]['coche_index_msg'],
            'coche_geo': element['coches'][0]['coche_geo'],
            'coche_latitud': element['coches'][0]['coche_latitud'],
            'coche_longitud': element['coches'][0]['coche_longitud'],
            'coche_datetime': element['coches'][0]['coche_datetime'],
            'coche_ruta': element['coches'][0]['coche_ruta'],
            'user_id_message': element['usuarios'][0]['user_id_message'],
            'user_id': element['usuarios'][0]['user_id'],
            'user_datetime': element['usuarios'][0]['user_datetime'],
            'user_geo': element['usuarios'][0]['user_geo'],
            'user_geo_fin': element['usuarios'][0]['user_geo_fin'],
            'user_latitud_inicio': element['usuarios'][0]['user_latitud_inicio'],
            'user_longitud_inicio': element['usuarios'][0]['user_longitud_inicio'],
            'user_latitud_destino': element['usuarios'][0]['user_latitud_destino'],
            'user_longitud_destino': element['usuarios'][0]['user_longitud_destino'],
            'inicio_viaje': element['inicio_viaje']})
        | "Imprimir_select_fields" >> beam.Map(lambda element: print(element))
    )
    
print ('hello BG')
    # Escribir los resultados en BigQuery
        Escribir = (
        selected_fields_inicio 
         | "Convertir_a_JSON_inicio" >> beam.Map(convert_to_json) 
         | "Escribir_en_BigQuery_inicio" >> beam.io.WriteToBigQuery(
        table=table_id,
        dataset=dataset_id,
        project=project_id,
        schema=new_table_schema,  # Utiliza el esquema definido anteriormente
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
           )
    
 



{'geo': '39.49693-0.38688', 'coches': [{'coche_id_message': '910a7a5b-b4c8-4bec-a970-a7e805ec21a4', 'coche_id': 1, 'coche_index_msg': 5, 'coche_geo': '39.49693-0.38688', 'coche_latitud': 39.49693, 'coche_longitud': -0.38688, 'coche_datetime': '2024-02-02T19:14:04.089689', 'coche_ruta': 'benicalap-alboraya.kml'}], 'usuarios': [({'user_id_message': '5d9b6183-deac-4c5c-b6b5-712636713e22', 'user_id': 9990, 'user_datetime': '2024-02-02T19:13:54.135670', 'user_geo': '39.49597-0.38838', 'user_geo_fin': '39.49693-0.38688', 'user_latitud_inicio': 39.49597, 'user_longitud_inicio': -0.38838, 'user_latitud_destino': 39.49693, 'user_longitud_destino': -0.38688}, 'fin')], 'fin_viaje': True}
