Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
ohnorobo committed Jun 7, 2023
1 parent 670714f commit dc8bbf5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
21 changes: 19 additions & 2 deletions pipeline/beam_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import bigquery as cloud_bigquery # type: ignore

from pipeline.metadata.schema import BigqueryRow, dict_to_gcs_json_string
from pipeline.metadata.schema import BigqueryRow, DashboardRow, dict_to_gcs_json_string
from pipeline.metadata import hyperquack
from pipeline.metadata import schema
from pipeline.metadata import flatten_base
Expand Down Expand Up @@ -473,7 +473,7 @@ def _data_to_load(self,
return filtered_filenames

def _write_to_bigquery(self, scan_type: str,
rows: beam.pvalue.PCollection[BigqueryRow],
rows: beam.pvalue.PCollection[BigqueryRow|DashboardRow],
table_name: str, incremental_load: bool) -> None:
"""Write out row to a bigquery table.
Expand Down Expand Up @@ -574,6 +574,14 @@ def _get_pipeline_options(self, scan_type: str,
pipeline_options.view_as(SetupOptions).save_main_session = True
return pipeline_options

def derive_dashboard_rows(rows: beam.PCollection[schema.HyperquackRow]) -> beam.PCollection[DashboardRow]:
sql_query = open('table/queries/merge_hyperquack.sql').read()

dash_rows = (rows | 'derive dashboard rows' >> beam.transform.sql.SqlTransform(sql_query))

return dash_rows


def run_beam_pipeline(self, scan_type: str, incremental_load: bool,
job_name: str, table_name: Optional[str],
gcs_folder: Optional[str],
Expand Down Expand Up @@ -645,3 +653,12 @@ def run_beam_pipeline(self, scan_type: str, incremental_load: bool,
self._write_to_gcs(scan_type, rows, gcs_folder)
elif table_name is not None:
self._write_to_bigquery(scan_type, rows, table_name, incremental_load)

# also calculate the derived rows in-pipeline
# and write to their own table
dashboard_rows = self.derive_dashboard_rows(rows)

self._write_to_bigquery('dashboard', dashboard_rows, 'firehook-censoredplanet.derived.merged_reduced_scans_v3', False)



15 changes: 15 additions & 0 deletions pipeline/metadata/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,21 @@ class PageFetchRow():
error: Optional[str] = None


@dataclass
class DashboardRow():
"""A row in the dashboard table"""
date: Optional[str] = None
source: Optional[str] = None
country_name: Optional[str] = None
network: Optional[str] = None
subnetwork: Optional[str] = None
domain: Optional[str] = None
domain_category: Optional[str] = None
outcome: Optional[str] = None
unexpected_count: Optional[int] = None
count: Optional[int] = None


def flatten_to_dict(row: Union[BigqueryRow, PageFetchRow]) -> Dict[str, Any]:
if isinstance(row, HyperquackRow):
return flatten_to_dict_hyperquack(row)
Expand Down

0 comments on commit dc8bbf5

Please sign in to comment.