In [1]:
import gc
import sys
import polars as pl
from google.cloud import bigquery

sys.path.append("../../")
import create_bq_table as cbq
from upload_parquet_to_bq import upload_parquet_to_bq

# Define BigQuery project and dataset details

In [2]:
dataset_name = "unified_data"
project_id = "prj-ext-dev-pertcat-437314"
dataset_id = f"{project_id}.{dataset_name}"

In the cells below, provide the `parquet_file_path`, `table_name`.

`parquet_file_path` is the path to the parquet file you want to upload to BigQuery.
`table_name` is the name of the table you want to create in BigQuery.

For metadata, 4 tables are created:
- `test_metadata_pc` - with partitioning and clustering
- `test_metadata_p` - partitioning only
- `test_metadata_c` - clustering only
- `test_metadata_none` - no partitioning or clustering

Since the tables already exist in BigQuery, this code shouldn't be run again, unless you delete the tables first (see last cell).

To upload data to existing tables, you can use the `upload_parquet_to_bq`. This will upload data to a temporary table with appended suffix `_staging`, and, if the data is unique, it will be merged into the main table. This prevents accidentally uploading the same data multiple times.

# Metadata

In [None]:
parquet_file_path = "../Perturbseq/curated/parquet/datlinger_2017_curated_long_metadata.parquet"

table_name="test_metadata"

df = pl.scan_parquet(parquet_file_path)
schema = df.schema

## Create tables in BigQuery

In [None]:
# Partitioning and clustering
ddl_sql_pc = cbq.generate_create_table_sql(
    dataset_name=dataset_name,
    table_name=table_name+"_pc",
    schema=schema,
    partition_column="perturbed_target_chromosome_encoding",
    partition_range_start=0,
    partition_range_end=25,
    partition_range_interval=1,
    cluster_columns=['dataset_id', 'sample_id', 'perturbed_target_symbol']
)

cbq.create_bq_table(
    project_id=project_id,
    dataset_name=dataset_id,
    ddl_sql=ddl_sql_pc
)

# Partitioning only
ddl_sql_p = cbq.generate_create_table_sql(
    dataset_name=dataset_name,
    table_name=table_name+"_p",
    schema=schema,
    partition_column="perturbed_target_chromosome_encoding",
    partition_range_start=0,
    partition_range_end=25,
    partition_range_interval=1
)

cbq.create_bq_table(
    project_id=project_id,
    dataset_name=dataset_name,
    ddl_sql=ddl_sql_p
)

# Clustering only
ddl_sql_c = cbq.generate_create_table_sql(
    dataset_name=dataset_name,
    table_name=table_name+"_c",
    schema=schema,
    cluster_columns=['dataset_id', 'sample_id', 'perturbed_target_symbol']
)

cbq.create_bq_table(
    project_id=project_id,
    dataset_name=dataset_name,
    ddl_sql=ddl_sql_c
)

# No partitioning or clustering
ddl_sql_none = cbq.generate_create_table_sql(
    dataset_name=dataset_name,
    table_name=table_name+"_none",
    schema=schema
)

cbq.create_bq_table(
    project_id=project_id,
    dataset_name=dataset_name,
    ddl_sql=ddl_sql_none
)

## Upload parquet files to BigQuery

This uploads the parquet files to BigQuery tables.

In [None]:
for table_name in ["test_metadata_pc", "test_metadata_p", "test_metadata_c", "test_metadata_none"]:
    upload_parquet_to_bq(
        parquet_path=parquet_file_path,
        dataset_id=dataset_id,
        table_name=table_name,
        key_columns=['dataset_id', 'sample_id']
    )

# Data

## Create a table in BigQuery

In [None]:
parquet_file_path = "../Perturbseq/curated/parquet/datlinger_2017_curated_long_data.parquet"
table_name="test_data"

df = pl.scan_parquet(parquet_file_path)
schema = df.schema


In [None]:

ddl_sql_data = cbq.generate_create_table_sql(
    dataset_name=dataset_name,
    table_name=table_name,
    schema=schema,
    cluster_columns=['dataset_id']
)

cbq.create_bq_table(
    project_id=project_id,
    dataset_name=dataset_name,
    ddl_sql=ddl_sql_data
)

## Upload parquet files to BigQuery

In [None]:
upload_parquet_to_bq(
    parquet_path=parquet_file_path,
    dataset_id=dataset_id,
    table_name=table_name,
    key_columns=['dataset_id', 'sample_id']
)

# Unified

## Create a table in BigQuery

In [None]:
parquet_file_path = "../Perturbseq/curated/parquet/adamson_2016_upr_epistasis_curated_long_unified.parquet"

table_name="test_unified"

df = pl.scan_parquet(parquet_file_path)
schema = df.schema

ddl_sql_unified = cbq.generate_create_table_sql(
    dataset_name=dataset_name,
    table_name=table_name,
    schema=schema,
    cluster_columns=['dataset_id']
)

cbq.create_bq_table(
    project_id=project_id,
    dataset_name=dataset_name,
    ddl_sql=ddl_sql_unified
)

## Upload parquet files to BigQuery

In [None]:
upload_parquet_to_bq(
    parquet_path=parquet_file_path,
    dataset_id=dataset_id,
    table_name=table_name,
    key_columns=['dataset_id', 'sample_id']
)

# Delete tables in BigQuery

In [None]:
# list of tables to delete
tables_to_delete = [
    # "test_metadata_pc",
    # "test_metadata_p",
    # "test_metadata_c",
    # "test_metadata_none",
    # "test_metadata_pc_staging",
    # "test_metadata_p_staging",
    # "test_metadata_c_staging",
    # "test_metadata_none_staging"
]

client = bigquery.Client()


for table_name in tables_to_delete:
    table_id = f"{dataset_id}.{table_name}"
    try:
        client.delete_table(table_id)
        print(f"Deleted table {table_id}.")
    except Exception as e:
        print(f"Failed to delete table {table_id}: {e}")