Skip to content

Commit

Permalink
draft 3
Browse files Browse the repository at this point in the history
  • Loading branch information
ohnorobo committed Jun 13, 2023
1 parent 8185ceb commit e7a6faa
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pipeline/beam_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from apache_beam.io.fileio import WriteToFiles
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
# SQL transform must be imported here since it is run through a java JAR expansion
from apache_beam.transforms.sql import SqlTransform
from google.cloud import bigquery as cloud_bigquery # type: ignore

from pipeline.metadata.schema import BigqueryRow, DashboardRow, dict_to_gcs_json_string
Expand Down Expand Up @@ -581,7 +583,7 @@ def derive_dashboard_rows(self, rows: beam.PCollection[schema.HyperquackRow]) ->

pprint(sql_query)

dash_rows = (rows | 'derive dashboard rows' >> beam.transforms.sql.SqlTransform(sql_query))
dash_rows = (rows | 'derive dashboard rows' >> SqlTransform(sql_query))

pprint("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
pprint(dash_rows)
Expand Down
99 changes: 99 additions & 0 deletions sql_taxi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""An example that processes streaming NYC Taxi data with SqlTransform.
This example reads from the PubSub NYC Taxi stream described in
https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, aggregates
the data in 15s windows using SqlTransform, and writes the output to
a user-defined PubSub topic.
A Java version supported by Beam must be installed locally to run this pipeline.
Additionally, Docker must also be available to run this pipeline locally.
"""

# pytype: skip-file

import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.sql import SqlTransform


def run(output_topic, pipeline_args):
pipeline_options = PipelineOptions(
pipeline_args, save_main_session=True, streaming=True)

with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| beam.Create(
['πŸ“', 'πŸ₯•', 'πŸ₯•', 'πŸ₯•', 'πŸ†', 'πŸ†', 'πŸ…', 'πŸ…', 'πŸ…', '🌽'])
| "Parse JSON payload" >> beam.Map(json.loads)
# Use beam.Row to create a schema-aware PCollection
| "Create beam Row" >> beam.Map(
lambda x: beam.Row(
ride_status=str(x['ride_status']),
passenger_count=int(x['passenger_count'])))
# SqlTransform will computes result within an existing window
| "15s fixed windows" >> beam.WindowInto(beam.window.FixedWindows(15))
# Aggregate drop offs and pick ups that occur within each 15s window
| SqlTransform(
"""
SELECT
ride_status,
COUNT(*) AS num_rides,
SUM(passenger_count) AS total_passengers
FROM PCOLLECTION
WHERE NOT ride_status = 'enroute'
GROUP BY ride_status""")
# SqlTransform yields python objects with attributes corresponding to
# the outputs of the query.
# Collect those attributes, as well as window information, into a dict
| "Assemble Dictionary" >> beam.Map(
lambda row,
window=beam.DoFn.WindowParam: {
"ride_status": row.ride_status,
"num_rides": row.num_rides,
"total_passengers": row.total_passengers,
"window_start": window.start.to_rfc3339(),
"window_end": window.end.to_rfc3339()
})
| "Convert to JSON" >> beam.Map(json.dumps)
| "UTF-8 encode" >> beam.Map(lambda s: s.encode("utf-8"))
| beam.combiners.Count.Globally().without_defaults()
| beam.Map(print))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
'--output_topic',
dest='output_topic',
required=True,
help=(
'Cloud PubSub topic to write to (e.g. '
'projects/my-project/topics/my-topic), must be created prior to '
'running the pipeline.'))
known_args, pipeline_args = parser.parse_known_args()

run(known_args.output_topic, pipeline_args)

0 comments on commit e7a6faa

Please sign in to comment.