This script is written for loading synapses table to CAVE database. 
The script includes steps followed:
- (1) Load csv tables from the Cloud Storage bucket of export-project
- (2) Reformat the table to fit synapses schema
- (3) Write the table to the Cloud Storage bucket of import-project

This script excludes steps followed:
- (0) Export required columns to the Cloud Storage bucket of export-project from the BigQuery synapses table in export-project
- (4) Import tables from the Cloud Storage bucket of import-project to Cloud SQL of import-project

In [None]:
from google.cloud import storage
import gcsfs
import datetime
import pandas as pd
from geoalchemy2 import WKBElement
from shapely.geometry import Point

In [None]:
export_bucket_name = 'jinhan-test'
export_folder_path = 'synapse-export'
export_key_path = 'lcht-goog-connectomics-xxxxx.json'
export_project_id = "lcht-goog-connectomics"

import_bucket_name = "jinhan-synapse-import"
import_folder_path = "synapse-import-july-2023"
import_key_path = 'lcht-goog-cave-temp-xxxxx.json'
import_project_id = "lcht-goog-cave-temp"

In [None]:
# Google Cloud Storage Interface
def get_bucket_file_names():
    client = storage.Client.from_service_account_json(param.export_key_path)
    bucket = storage.Bucket(client, param.export_bucket_name)
    blobs = client.list_blobs(bucket, prefix=param.export_folder_path)
    csv_files = [blob.name for blob in blobs if blob.name.lower().endswith('.csv')]

    return csv_files

def write_files_to_bucket(df):
    # create Google Cloud Storage file system instance
    fs = gcsfs.GCSFileSystem(project=param.import_project_id, token=param.import_key_path)

    csv_data = df.to_csv(header=False, mode='a', index=False)

    created_time_stamp = datetime.datetime.utcnow()

    path = f'formatted_synapse_annotations_{created_time_stamp}.csv'

    with fs.open(f'{param.import_bucket_name}/{param.import_folder_path}/{path}', 'wb') as file:
        file.write(csv_data.encode())


In [None]:
# Reformat
def create_wkt_element(geom):
    return WKBElement(geom.wkb)


def format_points(data):
    li = data.strip('[]').split()
    data_final = [int(item) for item in li]
    return data_final


def process_chunk(chunk, last_index):
    # keys in chunk dictionary are for synapses schema
    chunk['id'] = chunk.index + last_index + 1
    chunk['id'] = chunk['id'].astype(int)
    chunk['created'] = datetime.datetime.utcnow()
    chunk['deleted'] = ''
    chunk["superceded_id"] = ''
    chunk['valid'] = 1
    chunk['pre_pt_position'] = chunk[['pre_pt_x', 'pre_pt_y', 'pre_pt_z']].values.tolist()
    chunk = chunk.drop(['pre_pt_x', 'pre_pt_y', 'pre_pt_z'], axis=1)
    chunk['post_pt_position'] = chunk[['post_pt_x', 'post_pt_y', 'post_pt_z']].values.tolist()
    chunk = chunk.drop(['post_pt_x', 'post_pt_y', 'post_pt_z'], axis=1)
    chunk['ctr_pt_position'] = chunk[['x', 'y', 'z']].values.tolist()
    chunk = chunk.drop(['x', 'y', 'z'], axis=1)
    chunk['ctr_pt_position'] = chunk.ctr_pt_position.apply(Point)
    chunk['post_pt_position'] = chunk.post_pt_position.apply(Point)
    chunk['pre_pt_position'] = chunk.pre_pt_position.apply(Point)

    chunk['pre_pt_position'] = chunk['pre_pt_position'].apply(create_wkt_element)
    chunk['ctr_pt_position'] = chunk['ctr_pt_position'].apply(create_wkt_element)
    chunk['post_pt_position'] = chunk['post_pt_position'].apply(create_wkt_element)
    chunk['size'] = None
    chunk = chunk.reindex(
        ['id','created','deleted','superceded_id','valid','pre_pt_position','post_pt_position','ctr_pt_position', 'size'], axis=1)
    return chunk

def reformat_write(file_path, last_index):
    fs = gcsfs.GCSFileSystem(project=param.export_project_id, token=param.export_key_path)

    with fs.open(f'{param.export_bucket_name}/{file_path}', 'rb') as file:
        chunks = pd.read_csv(file, chunksize=10000) # max size for uploading to CAVE is 10k

        formatted_chunk_list = []
        for i, chunk in enumerate(chunks):
            # print(f"Chunk {str(i)} / len({len(file_path)})")
            formatted_chunk = process_chunk(chunk, last_index)
            formatted_chunk_list.append(formatted_chunk)
        
        df_concat = pd.concat(formatted_chunk_list)
        write_files_to_bucket(df_concat)

        last_index = last_index + df_concat.shape[0] # last_index should be cumulative to prevent duplicate ids.

    return last_index

In [None]:
def start():
    file_paths = get_bucket_file_names()

    last_index = 0 
    for i, file_path in enumerate(file_paths):
        print(f"# {str(i)} / len({len(file_paths)}): {file_path}, {last_index}")
        if i > 1:
            last_index = reformat_write(file_path, last_index)
    return ''

In [None]:
start()