In [20]:
import logging
import json
from datetime import datetime
import pandas as pd 
import time
import traceback

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options import pipeline_options
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.io import WriteToText

from apache_beam import (
    DoFn, 
    io,
    ParDo, 
    PTransform,
    WindowInto, 
    WithKeys,
)

from apache_beam.runners import DataflowRunner

import google.auth


In [21]:
project_auth = google.auth.default()[1] 


pipeline_options = pipeline_options.PipelineOptions(
    flags = {},
    streaming = True, 
    project = project_auth, 
    region= 'us-central1', 
    staging_location = "%s/staging" % "data604-project-g3-data"
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [24]:
table_schema = { 
    "fields":[ 
        {'name': "time", 'type':"TIMESTAMP", 'mode':"NULLABLE"}, 
        {'name': "lat", 'type':"FLOAT", 'mode':"NULLABLE"}, 
        {'name': "long",'type': "FLOAT", 'mode':"NULLABLE"},
        {'name': "speed", 'type':"FLOAT", 'mode':"NULLABLE"},
        {'name': "label",'type': "STRING", 'mode': "NULLABLE"}, 
        {'name': "place_type",'type': "STRING",'mode':"NULLABLE"}, 
        {'name': "user_id", 'type':"STRING", 'mode':"NULLABLE"}
    ]
}

table = "data604-project-g3:Footprint_data.locations"

In [22]:
class tranformations(beam.DoFn):
    def process(self, element):
        import pandas as pd 
        
        user_location = pd.DataFrame([element])
        
        try: 
            user_location['time'] = pd.to_datetime(user_location['time'], unit='ms')
            user_location['time'] = user_location['time'].apply(lambda x: x.replace(year=2024))
            user_location = user_location.drop(columns = ['acc', 'alt','bearing','postime'], axis = 1)
            
        except Exception as e: 
            logger.error(f"Error processing record {element}: {e}")
            #return not sure if we should keep this or not 
        
        yield user_location.to_dict(orient='records')[0]

In [25]:
user_locations_p = beam.Pipeline(
    DataflowRunner(),
    options = pipeline_options,
)

user_locations_topic = "projects/data604-project-g3/topics/locations_fake_streaming"

user_locations_pubsub = (user_locations_p | "Read Topic" >> ReadFromPubSub(topic=user_locations_topic)
                                | "Window" >> beam.WindowInto(beam.window.FixedWindows(size = 3600))
                                | "To Dict" >> beam.Map(json.loads)
                                | "Transformations" >> beam.ParDo(tranformations()))
                   

user_locations_pubsub | "Write To BigQuery" >> WriteToBigQuery(table=table, schema=table_schema,
                                  create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                  write_disposition=BigQueryDisposition.WRITE_APPEND)

user_locations_p.run()



[0m

<DataflowPipelineResult <Job
 clientRequestId: '20241128000524994514-5200'
 createTime: '2024-11-28T00:05:25.994633Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2024-11-27_16_05_25-15823640320879560523'
 location: 'us-central1'
 name: 'beamapp-root-1128000524-993435-2p8484pr'
 projectId: 'data604-project-g3'
 stageStates: []
 startTime: '2024-11-28T00:05:25.994633Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fad679fd1b0>