Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
ohnorobo committed Aug 25, 2023
1 parent caeb338 commit ebf81e6
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# 3.8 (not 3.9) is required for apache beam
FROM python:3.8-buster
FROM python:3.9-buster

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True
Expand All @@ -28,4 +28,4 @@ COPY . ./

RUN pip install -r requirements.txt

ENTRYPOINT python3 schedule_pipeline.py --env=${PIPELINE_ENV}
ENTRYPOINT python3 -m pipeline.run_beam_tables --env=prod --scan_type=echo --export_bq
16 changes: 13 additions & 3 deletions pipeline/beam_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,12 @@ def _data_to_load(self,
full_table_name = self._get_full_table_name(table_name)
existing_sources = _get_existing_bq_datasources(full_table_name,
self.project)
from pprint import pprint
pprint("------------ existing bq datasources ---------------")
existing_sources.sort()
pprint(existing_sources)
pprint(full_table_name)

elif gcs_folder:
existing_sources = _get_existing_gcs_datasources(
gcs_folder, self.project)
Expand Down Expand Up @@ -617,16 +623,20 @@ def run_beam_pipeline(self, scan_type: str, incremental_load: bool,
if not new_filenames:
logging.info('No new files to load')
return

from pprint import pprint
pprint("------------- new filenames --------------")
pprint(new_filenames)

with beam.Pipeline(options=pipeline_options) as p:
# PCollection[Tuple[filename,line]]
lines = _read_scan_text(p, new_filenames)

# Reshuffle to break fusion on long files
# PCollection[Tuple[filename,line]]
lines = (
lines |
'reshuffle' >> beam.Reshuffle().with_output_types(Tuple[str, str]))
#lines = (
# lines |
# 'reshuffle' >> beam.Reshuffle().with_output_types(Tuple[str, str]))

if scan_type == schema.SCAN_TYPE_SATELLITE:
# PCollection[SatelliteRow]
Expand Down
2 changes: 1 addition & 1 deletion table/run_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def rebuild_all_tables(project_name: str,
"""
client = cloud_bigquery.Client(project=project_name)

for filepath in glob.glob('table/queries/*.sql'):
for filepath in glob.glob('table/queries/merged_reduced_scans.sql'):
try:
_run_query(client, filepath, project_name, base_dataset, derived_dataset)
except Exception as ex:
Expand Down

0 comments on commit ebf81e6

Please sign in to comment.