# ASIGNACION INICIO / FIN - BIGQUERY

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.internal.clients import bigquery
import json
from datetime import datetime
from apache_beam.transforms import CoGroupByKey

################config################################

options = PipelineOptions(
    streaming=True,
    runner='DataflowRunner',
    experiments='enable_streaming_engine,use_beam_bq_sink'
)

# Definir suscripciones y otros detalles si es necesario
suscripcion_coche = 'projects/dataflow-1-411618/subscriptions/coches_stream-sub'
suscripcion_usuario = 'projects/dataflow-1-411618/subscriptions/usuarios_stream-sub'

#########funciones#######################################

# Recibe datos
class DecodeMessage(beam.DoFn):
    def process(self, element):
        output = element.decode('utf-8')
        json_data = json.loads(output)
        return [json_data]

# Función para extraer la clave 'user_geo' de cada elemento para el inicio del viaje
def extract_geo_user(element):
    geo = element.get('user_geo', None)
    return (geo, element)

# Función para extraer la clave 'user_geo_fin' de cada elemento para el fin del viaje
def extract_geo_fin(element):
    geo = element.get('user_geo_fin', None)
    return (geo, element)

# Función para extraer la clave 'coche_geo' de cada elemento
def extract_geo_coche(element):
    geo = element.get('coche_geo', None)
    return (geo, element)

# Función para filtrar casos coincidentes y no coincidentes para el inicio del viaje
class FilterCoincidentCases_inicio(beam.DoFn):
    def process(self, element):
        geo_key, messages = element
        coches = messages['coches']
        usuarios = messages['usuarios']

        if coches and usuarios:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'inicio_viaje': True}
        else:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'inicio_viaje': False}

# Función para filtrar casos coincidentes y no coincidentes para el fin del viaje
class FilterCoincidentCases_fin(beam.DoFn):
    def process(self, element):
        geo_key, messages = element
        coches = messages['coches']
        usuarios = messages['usuarios']

        if coches and usuarios:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'fin_viaje': True}
        else:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'fin_viaje': False}
            
##########fucncion_process_data_INICIO############

class BuildRowFn(beam.DoFn):
    def process(self, element):
        row = {}
        geo = element['geo']
        coche = element['coches'][0]
        user = element['usuarios'][0][0]

        row['geo'] = geo
        row['coche_id'] = coche['coche_id']
        row['coche_index_msg'] = coche['coche_index_msg']
        row['coche_geo'] = coche['coche_geo']
        row['coche_latitud'] = coche['coche_latitud']
        row['coche_longitud'] = coche['coche_longitud']
        row['coche_datetime'] = coche['coche_datetime']
        row['coche_ruta'] = coche['coche_ruta']

        row['user_id'] = user['user_id']
        row['user_datetime'] = user['user_datetime']
        row['user_geo'] = user['user_geo']
        row['user_geo_fin'] = user['user_geo_fin']
        row['user_latitud_inicio'] = user['user_latitud_inicio']
        row['user_longitud_inicio'] = user['user_longitud_inicio']
        row['user_latitud_destino'] = user['user_latitud_destino']
        row['user_longitud_destino'] = user['user_longitud_destino']

        row['inicio_viaje'] = element['inicio_viaje']
        

        print(row)
        return [row]

##########fucncion_process_data_FIN############

class BuildRowFn_fin(beam.DoFn):
    def process(self, element):
        row = {}
        geo = element['geo']
        coche = element['coches'][0]
        user = element['usuarios'][0][0]

        row['geo'] = geo
        row['coche_id'] = coche['coche_id']
        row['coche_index_msg'] = coche['coche_index_msg']
        row['coche_geo'] = coche['coche_geo']
        row['coche_latitud'] = coche['coche_latitud']
        row['coche_longitud'] = coche['coche_longitud']
        row['coche_datetime'] = coche['coche_datetime']
        row['coche_ruta'] = coche['coche_ruta']

        row['user_id'] = user['user_id']
        row['user_datetime'] = user['user_datetime']
        row['user_geo'] = user['user_geo']
        row['user_geo_fin'] = user['user_geo_fin']
        row['user_latitud_inicio'] = user['user_latitud_inicio']
        row['user_longitud_inicio'] = user['user_longitud_inicio']
        row['user_latitud_destino'] = user['user_latitud_destino']
        row['user_longitud_destino'] = user['user_longitud_destino']

        row['fin_viaje'] = element['fin_viaje']
        

        print(row)
        return [row]
    

##########pipeline#################################

# Crear el pipeline
with beam.Pipeline(options=PipelineOptions(streaming=True, save_main_session=True)) as p:
    # Coches
    coches_data = (
        p
        | "Coche_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_coche)
        | "Coche_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Coche_Extraer_Clave_geo" >> beam.Map(extract_geo_coche)
        | "Coche_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(60))
    )

    # Usuarios
    usuarios_data = (
        p
        | "Usuario_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_usuario)
        | "Usuario_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Usuario_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(500))
    )

    # Derivar dos flujos distintos para inicio y fin del viaje
    usuarios_data_inicio = (
        usuarios_data
        | "Usuario_Extraer_Clave_geo_inicio" >> beam.Map(extract_geo_user)
        | "Etiquetar_inicio_viaje" >> beam.Map(lambda x: (x[0], (x[1], 'inicio')))
    )

    usuarios_data_fin = (
        usuarios_data
        | "Usuario_Extraer_Clave_geo_fin" >> beam.Map(extract_geo_fin)
        | "Etiquetar_fin_viaje" >> beam.Map(lambda x: (x[0], (x[1], 'fin')))
    )

    # Realizar un CoGroupByKey en base al campo 'geo'_inicio
    joined_data_inicio = (
        {'coches': coches_data, 'usuarios': usuarios_data_inicio}
        | "Merge_Mensajes_por_geo" >> CoGroupByKey()
        | "Filtrar_Casos_Coincidentes" >> beam.ParDo(FilterCoincidentCases_inicio())
        | "Filtrar_Solo_Coincidentes" >> beam.Filter(lambda element: element['inicio_viaje'])
        | "TransformToBigQueryFormat_ini" >> beam.ParDo(BuildRowFn())
        | "WriteToBigQuery_ini" >> beam.io.WriteToBigQuery(
            table="dataflow-1-411618.blablacar.asignaciones",
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )
    )
    
        # Realizar un CoGroupByKey en base al campo 'geo'_fin
    joined_data_fin = (
        {'coches': coches_data, 'usuarios': usuarios_data_fin}
        | "Merge_Mensajes_por_geo_fin" >> CoGroupByKey()
        | "Filtrar_Casos_Coincidentes_fin" >> beam.ParDo(FilterCoincidentCases_fin())
        | "Filtrar_Solo_Coincidentes_fin" >> beam.Filter(lambda element: element['fin_viaje'])
        | "TransformToBigQueryFormat_fin" >> beam.ParDo(BuildRowFn_fin())
        | "WriteToBigQuery_fin" >> beam.io.WriteToBigQuery(
            table="dataflow-1-411618.blablacar.asignaciones",
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )
    )
    





In [None]:
#QUERY PARA GENERAR TABLA EN BIGQUERY
CREATE OR REPLACE TABLE `dataflow-1-411618.blablacar.asignaciones` (
    geo STRING,
    coche_id INT64,
    coche_index_msg INT64,
    coche_geo STRING,
    coche_latitud FLOAT64,
    coche_longitud FLOAT64,
    coche_datetime TIMESTAMP,
    coche_ruta STRING,
    user_id INT64,
    user_datetime TIMESTAMP,
    user_geo STRING,
    user_geo_fin STRING,
    user_latitud_inicio FLOAT64,
    user_longitud_inicio FLOAT64,
    user_latitud_destino FLOAT64,
    user_longitud_destino FLOAT64,
    inicio_viaje BOOL,
    fin_viaje BOOL
);

## ASIGNACION VIAJE (INICIO + FIN)

In [1]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms import CoGroupByKey
import json

################config################################

options = PipelineOptions(
    streaming=True,
    runner='DataflowRunner',
    experiments='enable_streaming_engine,use_beam_bq_sink'
)

# Definir suscripciones y otros detalles si es necesario
suscripcion_coche = 'projects/dataflow-1-411618/subscriptions/coches_stream-sub'
suscripcion_usuario = 'projects/dataflow-1-411618/subscriptions/usuarios_stream-sub'

#########funciones#######################################

# Recibe datos
class DecodeMessage(beam.DoFn):
    def process(self, element):
        output = element.decode('utf-8')
        json_data = json.loads(output)
        return [json_data]

# Función para extraer la clave 'user_geo' de cada elemento para el inicio del viaje
def extract_geo_user(element):
    geo = element.get('user_geo', None)
    return (geo, element)

# Función para extraer la clave 'user_geo_fin' de cada elemento para el fin del viaje
def extract_geo_fin(element):
    geo = element.get('user_geo_fin', None)
    return (geo, element)

# Función para extraer la clave 'coche_geo' de cada elemento
def extract_geo_coche(element):
    geo = element.get('coche_geo', None)
    return (geo, element)

# Función para filtrar casos coincidentes y no coincidentes para el inicio del viaje
class FilterCoincidentCases_inicio(beam.DoFn):
    def process(self, element):
        geo_key, messages = element
        coches = messages['coches']
        usuarios = messages['usuarios']

        if coches and usuarios:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'inicio_viaje': True}
        else:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'inicio_viaje': False}

# Función para filtrar casos coincidentes y no coincidentes para el fin del viaje
class FilterCoincidentCases_fin(beam.DoFn):
    def process(self, element):
        geo_key, messages = element
        coches = messages['coches']
        usuarios = messages['usuarios']

        if coches and usuarios:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'fin_viaje': True}
        else:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'fin_viaje': False}

##########pipeline#################################

# Crear el pipeline
with beam.Pipeline(options=PipelineOptions(streaming=True, save_main_session=True)) as p:
    # Coches
    coches_data = (
        p
        | "Coche_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_coche)
        | "Coche_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Coche_Extraer_Clave_geo" >> beam.Map(extract_geo_coche)
        | "Coche_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(500))
    )

    # Usuarios
    usuarios_data = (
        p
        | "Usuario_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_usuario)
        | "Usuario_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Usuario_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(500))
    )

    # Derivar dos flujos distintos para inicio y fin del viaje
    usuarios_data_inicio = (
        usuarios_data
        | "Usuario_Extraer_Clave_geo_inicio" >> beam.Map(extract_geo_user)
        | "Etiquetar_inicio_viaje" >> beam.Map(lambda x: (x[0], (x[1], 'inicio')))
    )

    usuarios_data_fin = (
        usuarios_data
        | "Usuario_Extraer_Clave_geo_fin" >> beam.Map(extract_geo_fin)
        | "Etiquetar_fin_viaje" >> beam.Map(lambda x: (x[0], (x[1], 'fin')))
    )

    # Realizar un CoGroupByKey en base al campo 'geo'_inicio
    joined_data_inicio = (
        {'coches': coches_data, 'usuarios': usuarios_data_inicio}
        | "Merge_Mensajes_por_geo" >> CoGroupByKey()
        | "Filtrar_Casos_Coincidentes" >> beam.ParDo(FilterCoincidentCases_inicio())
        | "Filtrar_Solo_Coincidentes" >> beam.Filter(lambda element: element['inicio_viaje'])
        | "Imprimir_Resultados_inic" >> beam.Map(lambda element: print(element))
    )

    # Realizar un CoGroupByKey en base al campo 'geo'_fin
    joined_data_fin = (
        {'coches': coches_data, 'usuarios': usuarios_data_fin}
        | "Merge_Mensajes_por_geo_fin" >> CoGroupByKey()
        | "Filtrar_Casos_Coincidentes_fin" >> beam.ParDo(FilterCoincidentCases_fin())
        | "Filtrar_Solo_Coincidentes_fin" >> beam.Filter(lambda element: element['fin_viaje'])
        | "Imprimir_Resultados_fin" >> beam.Map(lambda element: print(element))
    )




{'geo': '39.49597-0.38838', 'coches': [{'coche_id_message': 'd50f953c-220c-476b-a6a3-689fa55f74e7', 'coche_id': 1, 'coche_index_msg': 1, 'coche_geo': '39.49597-0.38838', 'coche_latitud': 39.49597, 'coche_longitud': -0.38838, 'coche_datetime': '2024-01-31T17:04:40.413173Z', 'coche_ruta': 'benicalap-alboraya.kml'}], 'usuarios': [({'user_id_message': 'd4f17633-4820-4e22-902b-3e220494e48c', 'user_id': 9990, 'user_datetime': '2024-01-31T17:04:29.894954Z', 'user_geo': '39.49597-0.38838', 'user_geo_fin': '39.49693-0.38688', 'user_latitud_inicio': 39.49597, 'user_longitud_inicio': -0.38838, 'user_latitud_destino': 39.49693, 'user_longitud_destino': -0.38688}, 'inicio')], 'inicio_viaje': True}
{'geo': '39.49693-0.38688', 'coches': [{'coche_id_message': 'a313fb9b-3a35-4b98-96c8-7d957126e944', 'coche_id': 1, 'coche_index_msg': 5, 'coche_geo': '39.49693-0.38688', 'coche_latitud': 39.49693, 'coche_longitud': -0.38688, 'coche_datetime': '2024-01-31T17:04:52.423392Z', 'coche_ruta': 'benicalap-alboray

KeyboardInterrupt: 

## ASIGNACION DE VIAJE (SOLO INICIO)

In [None]:
# Import Python Libraries
import logging
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [1]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms import CoGroupByKey
import json

# Definir suscripciones y otros detalles si es necesario
suscripcion_coche = 'projects/dataflow-1-411618/subscriptions/coches_stream-sub'
suscripcion_usuario = 'projects/dataflow-1-411618/subscriptions/usuarios_stream-sub'

# Recibe datos
class DecodeMessage(beam.DoFn):
    def process(self, element):
        output = element.decode('utf-8')
        json_data = json.loads(output)
        return [json_data]

# Función para extraer la clave 'geo' de cada elemento en coche
def extract_geo_coche(element):
    geo = element.get('coche_geo', None)
    return (geo, element)

# Función para extraer la clave 'geo' de cada elemento en user
def extract_geo_user(element):
    geo = element.get('user_geo', None)
    return (geo, element)


# Función para filtrar casos coincidentes y no coincidentes
class FilterCoincidentCases(beam.DoFn):
    def process(self, element):
        geo_key, messages = element
        coches = messages['coches']
        usuarios = messages['usuarios']

        if coches and usuarios:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'inicio_viaje': True}
        else:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'inicio_viaje': False}

# Crear el pipeline
with beam.Pipeline(options=PipelineOptions(streaming=True)) as p:
    # Coches
    coches_data = (
        p
        | "Coche_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_coche)
        | "Coche_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Coche_Extraer_Clave_geo" >> beam.Map(extract_geo_coche)
        | "Coche_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(60))
    )

    # Usuarios
    usuarios_data = (
        p
        | "Usuario_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_usuario)
        | "Usuario_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Usuario_Extraer_Clave_geo" >> beam.Map(extract_geo_user)
        | "Usuario_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(60))
    )

    # Realizar un CoGroupByKey en base al campo 'geo'
    joined_data = (
        {'coches': coches_data, 'usuarios': usuarios_data}
        | "Merge_Mensajes_por_geo" >> CoGroupByKey()
        | "Filtrar_Casos_Coincidentes" >> beam.ParDo(FilterCoincidentCases())
        | "Filtrar_Solo_Coincidentes" >> beam.Filter(lambda element: element['inicio_viaje'])
        | "Imprimir_Resultados" >> beam.Map(lambda element: print(element))
    )



{'geo': '39.49597-0.38838', 'coches': [{'coche_id_message': 'b5fbe915-88cc-4c80-b860-06004d60287c', 'coche_id': 1, 'coche_index_msg': 1, 'coche_geo': '39.49597-0.38838', 'coche_latitud': 39.49597, 'coche_longitud': -0.38838, 'coche_datetime': '2024-01-31T14:53:11.456009Z', 'coche_ruta': 'benicalap-alboraya.kml'}], 'usuarios': [{'user_id_message': '99e8225f-2395-452a-996e-b64b88265c01', 'user_id': 9990, 'user_datetime': '2024-01-31T14:53:32.364157Z', 'user_geo': '39.49597-0.38838', 'user_geo_fin': '39.49693-0.38688', 'user_latitud_inicio': 39.49597, 'user_longitud_inicio': -0.38838, 'user_latitud_destino': 39.49693, 'user_longitud_destino': -0.38688}], 'inicio_viaje': True}


KeyboardInterrupt: 

## ASIGNACION VIAJE (SOLO FIN)

In [2]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms import CoGroupByKey
import json

################config################################

options = PipelineOptions(
    streaming=True,
    runner='DataflowRunner',
    experiments='enable_streaming_engine,use_beam_bq_sink'
)

# Definir suscripciones y otros detalles si es necesario
suscripcion_coche = 'projects/dataflow-1-411618/subscriptions/coches_stream-sub'
suscripcion_usuario = 'projects/dataflow-1-411618/subscriptions/usuarios_stream-sub'

#########funciones#######################################

# Recibe datos
class DecodeMessage(beam.DoFn):
    def process(self, element):
        output = element.decode('utf-8')
        json_data = json.loads(output)
        return [json_data]

# Función para extraer la clave 'user_geo_fin' de cada elemento
def extract_geo_fin(element):
    geo = element.get('user_geo_fin', None)
 #   print(geo)
    return (geo, element)

# Función para extraer la clave 'geo' de cada elemento
def extract_geo(element):
    geo = element.get('coche_geo', None)
 #   print(geo)
    return (geo, element)


# Función para filtrar casos coincidentes y no coincidentes
class FilterCoincidentCases(beam.DoFn):
    def process(self, element):
        geo_key, messages = element
        coches = messages['coches']
        usuarios = messages['usuarios']

        if coches and usuarios:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'fin_viaje': True}
        else:
            yield {'geo': geo_key, 'coches': coches, 'usuarios': usuarios, 'fin_viaje': False}

            
##########pipeline#################################            
            
# Crear el pipeline
with beam.Pipeline(options=PipelineOptions(streaming=True, save_main_session=True)) as p:
    # Coches
    coches_data = (
        p
        | "Coche_LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion_coche)
        | "Coche_decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "Coche_Extraer_Clave_geo" >> beam.Map(extract_geo)
        | "Coche_ventana_5_minutos" >> beam.WindowInto(beam.window.FixedWindows(60))
    )
    
    # Usuarios_fin
    usuarios_data_fin = (
        p
        | "Usuario_LeerDesdePubSub_inic" >> beam.io.ReadFromPubSub(subscription=suscripcion_usuario)
        | "Usuario_decodificar_msg_inic" >> beam.ParDo(DecodeMessage())
        | "Usuario_Extraer_Clave_geo_inic" >> beam.Map(extract_geo_fin)
        | "Usuario_ventana_5_minutos_inic" >> beam.WindowInto(beam.window.FixedWindows(60))
    )
    
    
    # Realizar un CoGroupByKey en base al campo 'geo'_fin
    joined_data_fin = (
        {'coches': coches_data, 'usuarios': usuarios_data_fin}
        | "Merge_Mensajes_por_geo_fin" >> CoGroupByKey()
        | "Filtrar_Casos_Coincidentes_fin" >> beam.ParDo(FilterCoincidentCases())
        | "Filtrar_Solo_Coincidentes_fin" >> beam.Filter(lambda element: element['fin_viaje'])
        | "Imprimir_Resultados_fin" >> beam.Map(lambda element: print(element))
    )



{'geo': '39.49693-0.38688', 'coches': [{'coche_id_message': 'f33e614c-ee5f-4856-b10f-ac7785a0f622', 'coche_id': 1, 'coche_index_msg': 5, 'coche_geo': '39.49693-0.38688', 'coche_latitud': 39.49693, 'coche_longitud': -0.38688, 'coche_datetime': '2024-01-31T15:05:34.092013Z', 'coche_ruta': 'benicalap-alboraya.kml'}], 'usuarios': [{'user_id_message': '1b5ae6d6-aa9a-4fb8-b201-699f274722d9', 'user_id': 9990, 'user_datetime': '2024-01-31T15:05:16.817070Z', 'user_geo': '39.49597-0.38838', 'user_geo_fin': '39.49693-0.38688', 'user_latitud_inicio': 39.49597, 'user_longitud_inicio': -0.38838, 'user_latitud_destino': 39.49693, 'user_longitud_destino': -0.38688}], 'fin_viaje': True}


KeyboardInterrupt: 

### Read PubSub - Write Bigquery

In [6]:

################################ Script para escribir en Big Query la información de los coches ####################################

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.internal.clients import bigquery
import json
from datetime import datetime

#################################################### Cris ######################################################
project_id = 'dataflow-1-411618'
topic_name= 'clase_test'
table_name = 'dataflow-1-411618:blablacar.coches'
suscripcion = 'projects/dataflow-1-411618/subscriptions/coches_stream-sub'
################################################################################################################



# Recibe datos
def decode_message(msg):
    # Lógica para decodificar el mensaje y cargarlo como JSON
    output = msg.decode('utf-8')
    json_data = json.loads(output)
    logging.info("New message in PubSub: %s", json_data)
    return json_data

# Obtiene la hora actual en formato UTC
current_time_utc = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'

class DecodeMessage(beam.DoFn):
    def process(self, element):
        output = element.decode('utf-8')
        json_data = json.loads(output)
        print(f"JSON guardado en BigQuery: {json_data}")
        return [json_data]

# Nueva definición del esquema para BigQuery
new_table_schema = bigquery.TableSchema()
new_table_fields = [
    bigquery.TableFieldSchema(name='id_message', type='STRING', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='coche_id', type='INT', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='index_msg', type='STRING', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='latitud', type='FLOAT', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='longitud', type='FLOAT', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='datetime', type='DATETIME', mode='NULLABLE'),
    bigquery.TableFieldSchema(name='ruta', type='STRING', mode='NULLABLE')
]
new_table_schema.fields.extend(new_table_fields)





with beam.Pipeline(options=PipelineOptions(streaming=True)) as p:
    #coches:
    data = (
        p
        | "LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription=suscripcion)
        | "decodificar_msg" >> beam.ParDo(DecodeMessage())
        | "escribir" >> beam.io.WriteToBigQuery(
            table= table_name,
            schema=new_table_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )
    )





JSON guardado en BigQuery: {'coche_id_message': '0058b5ce-322c-4b51-955c-ad06c2698ce9', 'coche_id': 1, 'coche_index_msg': 2, 'coche_geo': '39.49658-0.38742', 'coche_latitud': 39.49658, 'coche_longitud': -0.38742, 'coche_datetime': '2024-02-04T13:23:38.236532', 'coche_ruta': 'benicalap-alboraya.kml'}
JSON guardado en BigQuery: {'coche_id_message': '130c871f-0056-4b72-bbd1-c9a495abbf45', 'coche_id': 1, 'coche_index_msg': 5, 'coche_geo': '39.49693-0.38688', 'coche_latitud': 39.49693, 'coche_longitud': -0.38688, 'coche_datetime': '2024-02-04T13:23:47.245067', 'coche_ruta': 'benicalap-alboraya.kml'}




KeyboardInterrupt: 

In [1]:
from google.cloud import bigquery
import json

# Configura tu proyecto y tu ID de conjunto de datos
project_id = 'dataflow-1-411618'
dataset_id = 'blablacar'
table_id = 'asignaciones'

# Configura el cliente de BigQuery
client = bigquery.Client(project=project_id)

# Define un JSON de ejemplo con el esquema esperado
json_data = {
    'geo': '39.49597-0.38838',
    'coche_id': 1,
    'coche_datetime': '2024-02-03T16:16:30.200604',
    'coche_ruta': 'benicalap-alboraya.kml',
    'user_id': 9990,
    'user_datetime': '2024-02-03T16:16:19.790657',
    'user_geo_inicio': '39.49597-0.38838',
    'user_geo_fin': '39.49693-0.38688',
    'user_latitud_inicio': 39.49597,
    'user_longitud_inicio': -0.38838,
    'user_latitud_destino': 39.49693,
    'user_longitud_destino': -0.38688,
    'inicio_viaje': True,
    'fin_viaje': True
}

# Inserta el JSON en BigQuery
table_ref = client.dataset(dataset_id).table(table_id)
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
job = client.load_table_from_json([json_data], table_ref, job_config=job_config)

# Espera a que el job termine
job.result()

print(f'Datos insertados en BigQuery: {json_data}')


Datos insertados en BigQuery: {'geo': '39.49597-0.38838', 'coche_id': 1, 'coche_datetime': '2024-02-03T16:16:30.200604', 'coche_ruta': 'benicalap-alboraya.kml', 'user_id': 9990, 'user_datetime': '2024-02-03T16:16:19.790657', 'user_geo_inicio': '39.49597-0.38838', 'user_geo_fin': '39.49693-0.38688', 'user_latitud_inicio': 39.49597, 'user_longitud_inicio': -0.38838, 'user_latitud_destino': 39.49693, 'user_longitud_destino': -0.38688, 'inicio_viaje': True, 'fin_viaje': True}
