Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ohnorobo committed Jun 23, 2023
1 parent c70a60c commit 03227a6
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
8 changes: 7 additions & 1 deletion pipeline/beam_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,12 @@ def _get_pipeline_options(self, scan_type: str,
],
dataflow_service_options=['enable_prime'],
setup_file='./pipeline/setup.py')
pipeline_options.set_planner_name('org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner')

from pprint import pprint
pprint("printing pipeline options")
pprint(pipeline_options)

pipeline_options.view_as(SetupOptions).save_main_session = True
return pipeline_options

Expand All @@ -590,7 +596,7 @@ def derive_dashboard_rows(self, rows: beam.PCollection[schema.HyperquackRow]) ->
pprint(dir(coder_rows))
pprint(coder_rows.element_type)

dash_rows = (coder_rows | 'derive dashboard rows' >> SqlTransform(sql_query))
dash_rows = (coder_rows | 'derive dashboard rows' >> SqlTransform(sql_query) )
#.with_output_types(BigqueryOutputRow))

pprint("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
Expand Down
8 changes: 6 additions & 2 deletions pipeline/manual_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,15 @@ def get_gs_formatted_gcs_folder(scan_type: str) -> str:
def get_local_pipeline_options(*_: List[Any]) -> PipelineOptions:
# This method is used to monkey patch the get_pipeline_options method in
# beam_tables in order to run a local pipeline.
return PipelineOptions(
pipeline_options = PipelineOptions(
runner='DirectRunner',
job_name=JOB_NAME,
project=firehook_resources.DEV_PROJECT_NAME,
temp_location=firehook_resources.DEV_BEAM_TEMP_LOCATION)
temp_location=firehook_resources.DEV_BEAM_TEMP_LOCATION,
planner_name='org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner'
)
#pipeline_options.set_planner_name('org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner')
return pipeline_options


def run_local_pipeline(scan_type: str, incremental: bool) -> None:
Expand Down
2 changes: 1 addition & 1 deletion pipeline/metadata/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def convert_byperquack_row_to_bq_row_format(row: HyperquackRow) -> BigqueryInput
('outcome', str),
('subnetwork', str),
('category', str),
('c', int),
('count', int),
('unexpected_count', int)
]
)
Expand Down

0 comments on commit 03227a6

Please sign in to comment.