Skip to content

Commit

Permalink
Check columns exist when running bq merge update
Browse files Browse the repository at this point in the history
  • Loading branch information
feluelle committed Sep 12, 2022
1 parent b70b044 commit ce33d74
Showing 1 changed file with 26 additions and 1 deletion.
27 changes: 26 additions & 1 deletion python-sdk/src/astro/databases/google/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,23 @@ def load_pandas_dataframe_to_table(
credentials=creds,
)

def columns_exist(self, table: Table, columns: list[str]) -> bool:
"""
Check that a list of columns exist in the given table.
:param table: The table to check in.
:param columns: The columns to check.
:returns: whether the columns exist in the table or not.
"""
statement = (
"SELECT COLUMN_NAME"
f" FROM {table.metadata.schema}.INFORMATION_SCHEMA.COLUMNS WHERE table_name='{table.name}'"
)
response = self.run_sql(statement)
rows = response.fetchall()
return all(any(row[0] == column for row in rows) for column in columns)

def merge_table(
self,
source_table: Table,
Expand Down Expand Up @@ -210,7 +227,15 @@ def merge_table(
f"T.{col}=S.{source_columns[idx]}"
for idx, col in enumerate(target_columns)
)
# FIXME: sql injection issue
if not self.columns_exist(source_table, source_columns):
raise ValueError(
f"Not all the columns provided exist for {source_table_name}!"
)
if not self.columns_exist(target_table, target_columns):
raise ValueError(
f"Not all the columns provided exist for {target_table_name}!"
)
# Note: Ignoring below sql injection warning, as we validate that the table columns exist beforehand.
update_statement = f"UPDATE SET {update_statement_map}" # skipcq BAN-B608
merge_statement += f" WHEN MATCHED THEN {update_statement}"
self.run_sql(sql_statement=merge_statement)
Expand Down

0 comments on commit ce33d74

Please sign in to comment.