diff --git a/Dockerfile b/Dockerfile index f692b753..d2b36c33 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,7 +13,7 @@ # limitations under the License. # 3.8 (not 3.9) is required for apache beam -FROM python:3.8-buster +FROM python:3.9-buster # Allow statements and log messages to immediately appear in the Knative logs ENV PYTHONUNBUFFERED True @@ -28,4 +28,4 @@ COPY . ./ RUN pip install -r requirements.txt -ENTRYPOINT python3 schedule_pipeline.py --env=${PIPELINE_ENV} +ENTRYPOINT python3 -m pipeline.run_beam_tables --env=prod --scan_type=echo --export_bq diff --git a/pipeline/beam_tables.py b/pipeline/beam_tables.py index f5832ec8..e2a9fbac 100644 --- a/pipeline/beam_tables.py +++ b/pipeline/beam_tables.py @@ -441,6 +441,12 @@ def _data_to_load(self, full_table_name = self._get_full_table_name(table_name) existing_sources = _get_existing_bq_datasources(full_table_name, self.project) + from pprint import pprint + pprint("------------ existing bq datasources ---------------") + existing_sources.sort() + pprint(existing_sources) + pprint(full_table_name) + elif gcs_folder: existing_sources = _get_existing_gcs_datasources( gcs_folder, self.project) @@ -617,6 +623,10 @@ def run_beam_pipeline(self, scan_type: str, incremental_load: bool, if not new_filenames: logging.info('No new files to load') return + + from pprint import pprint + pprint("------------- new filenames --------------") + pprint(new_filenames) with beam.Pipeline(options=pipeline_options) as p: # PCollection[Tuple[filename,line]] @@ -624,9 +634,9 @@ def run_beam_pipeline(self, scan_type: str, incremental_load: bool, # Reshuffle to break fusion on long files # PCollection[Tuple[filename,line]] - lines = ( - lines | - 'reshuffle' >> beam.Reshuffle().with_output_types(Tuple[str, str])) + #lines = ( + # lines | + # 'reshuffle' >> beam.Reshuffle().with_output_types(Tuple[str, str])) if scan_type == schema.SCAN_TYPE_SATELLITE: # PCollection[SatelliteRow] diff --git a/table/run_queries.py b/table/run_queries.py index 8ed47c9e..7b7616e3 100644 --- a/table/run_queries.py +++ b/table/run_queries.py @@ -60,7 +60,7 @@ def rebuild_all_tables(project_name: str, """ client = cloud_bigquery.Client(project=project_name) - for filepath in glob.glob('table/queries/*.sql'): + for filepath in glob.glob('table/queries/merged_reduced_scans.sql'): try: _run_query(client, filepath, project_name, base_dataset, derived_dataset) except Exception as ex: