In [1]:

import yaml
import json
import os

from neo4j import GraphDatabase




In [2]:
with open("config.yaml", "r") as stream:
    try:
        PARAM = yaml.safe_load(stream)
    except yaml.YAMLError as exc:
        print(exc)

In [3]:

driver = GraphDatabase.driver(PARAM["neo4j_url"], auth=(PARAM["neo4j_username"], PARAM["neo4j_password"]))

records, summary, keys = driver.execute_query(f"""
    CALL apoc.meta.schema()
    YIELD value RETURN value;
    """,
    database_="neo4j",
)
# Loop through results and do something with them
for record in records:
    schema = record.data()["value"]
    json_schema = json.dumps(record.data()["value"])
    #print (json_schema)


In [4]:
schema.keys()

dict_keys(['Condition', 'IS_A_SITE_OF', 'Site', 'IS_A_CATEGORY_OF', 'IS_CARRIED_OUT_BY', 'FOCUSES_ON', 'IS_FOUND_AT_SITE', 'Institution', 'BELONGS_TO', 'HAS_MORPHOLOGY', 'Category', 'Trial', 'Morphology'])

In [5]:
schema['Condition']['type']

'node'

In [6]:
schema['Condition']

{'count': 23,
 'labels': [],
 'properties': {'SNOMEDCT': {'unique': True,
   'indexed': True,
   'type': 'STRING',
   'existence': False},
  'name': {'unique': False,
   'indexed': False,
   'type': 'STRING',
   'existence': False},
  'UMLS': {'unique': False,
   'indexed': False,
   'type': 'STRING',
   'existence': False}},
 'type': 'node',
 'relationships': {'HAS_MORPHOLOGY': {'count': 0,
   'direction': 'out',
   'labels': ['Morphology'],
   'properties': {}},
  'FOCUSES_ON': {'count': 26,
   'direction': 'in',
   'labels': ['Trial'],
   'properties': {}},
  'IS_FOUND_AT_SITE': {'count': 0,
   'direction': 'out',
   'labels': ['Site'],
   'properties': {}},
  'BELONGS_TO': {'count': 0,
   'direction': 'out',
   'labels': ['Category'],
   'properties': {}}}}

In [7]:
schema['Condition']['properties']

{'SNOMEDCT': {'unique': True,
  'indexed': True,
  'type': 'STRING',
  'existence': False},
 'name': {'unique': False,
  'indexed': False,
  'type': 'STRING',
  'existence': False},
 'UMLS': {'unique': False,
  'indexed': False,
  'type': 'STRING',
  'existence': False}}

In [8]:
for label in schema.keys():
    if schema[label]['type'] == 'node':
        print (label)

Condition
Site
Institution
Category
Trial
Morphology


In [9]:
variable_name = "t"
output_directory = "tsv"

for node_type in schema.keys():
    if schema[node_type]['type'] == 'node':

        records, summary, keys = driver.execute_query(f"""
            MATCH ({variable_name}:{node_type})
            RETURN {variable_name}
            """,
            database_="neo4j",
        )
        
        header = list(schema[node_type]['properties'].keys())
        content = "\t".join(header) + "\n"
        for record in records:
            result = record.data()[f"{variable_name}"]
            
            for h in header:
                if h in result:
                    content += str(result[h]) + "\t"
                else:
                    content += "\t"
            content = content[:-1] + "\n"

        with open(os.path.join(output_directory, f"{node_type}.tsv"), 'w') as f:
            f.write(content)


In [10]:
bucket_name = "neo4j-bigquery-project"

In [11]:
os.system(f'{PARAM["gsutil_path"]}/gsutil cp -r tsv gs://{bucket_name}/')

Copying file://tsv/Morphology.tsv [Content-Type=text/tab-separated-values]...
Copying file://tsv/Trial.tsv [Content-Type=text/tab-separated-values]...        
Copying file://tsv/Institution.tsv [Content-Type=text/tab-separated-values]...  
Copying file://tsv/Site.tsv [Content-Type=text/tab-separated-values]...         
- [4 files][ 84.3 KiB/ 84.3 KiB]                                                
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m cp ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying file://tsv/Condition.tsv [Content-Type=text/tab-separated-values]...
Copying file://tsv/Category.tsv [Content-Type=text/tab-separated-values]...     
- [6 files][109.6 KiB/109.6 KiB]                                                
Operation completed over 6 objects/109.6 KiB.                                    


0

In [12]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from google.cloud import bigquery
import argparse
from apache_beam.runners.runner import PipelineState
from google.cloud import storage

In [13]:
parser = argparse.ArgumentParser()

args, beam_args = parser.parse_known_args()


In [14]:
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project="715484494373",
    job_name='import neo4j to bigquery',
    temp_location='gs://neo4j-bigquery-project',
    region='asia-northeast1')


p = beam.Pipeline(options=beam_options)

usage: ipykernel_launcher.py [-h] [--dataflow_endpoint DATAFLOW_ENDPOINT]
                             [--project PROJECT] [--job_name JOB_NAME]
                             [--staging_location STAGING_LOCATION]
                             [--temp_location TEMP_LOCATION] [--region REGION]
                             [--service_account_email SERVICE_ACCOUNT_EMAIL]
                             [--no_auth]
                             [--template_location TEMPLATE_LOCATION]
                             [--label LABELS] [--update]
                             [--transform_name_mapping TRANSFORM_NAME_MAPPING]
                             [--enable_streaming_engine]
                             [--dataflow_kms_key DATAFLOW_KMS_KEY]
                             [--create_from_snapshot CREATE_FROM_SNAPSHOT]
                             [--flexrs_goal {COST_OPTIMIZED,SPEED_OPTIMIZED}]
                             [--dataflow_service_option DATAFLOW_SERVICE_OPTIONS]
                           

AttributeError: 'tuple' object has no attribute 'tb_frame'

In [None]:
os.environ["GOOGLE_CLOUD_PROJECT"] = PARAM["gcp_project_id"]
client = bigquery.Client()

In [None]:
dataset_id = f"{PARAM['gcp_project_name']}.{PARAM['bigquery_dataset']}"

In [None]:
print (client.project)

715484494373


In [None]:
try:
	client.get_dataset(dataset_id)
	
except:
	dataset = bigquery.Dataset(dataset_id)  #

	dataset.location = "asia-northeast1"
	dataset.description = "dataset for clinical trial data"

	dataset_ref = client.create_dataset(dataset, timeout=30)  # Make an API request.

In [None]:
bucket_client = storage.Client()
bucket = bucket_client.bucket(bucket_name)
for blob in bucket.list_blobs(prefix='tsv'):
  print (blob.name)

tsv/Category.tsv
tsv/Condition.tsv
tsv/Institution.tsv
tsv/Morphology.tsv
tsv/Site.tsv
tsv/Trial.tsv


In [None]:

for f in bucket.list_blobs(prefix='tsv'):
    
    full_path = f"gs://{bucket_name}/{f.name}"

    filename = f.name.split("/")[1]
    print (full_path)
    import_data = (
        p
        | f'Read {full_path} files' >> beam.io.ReadFromText(full_path)
        | f'Write to {filename} output table' >> beam.io.WriteToBigQuery(
                ignore_unknown_columns=True,
                project=PARAM['gcp_project_name'],
                dataset=PARAM['bigquery_dataset'],
                table=filename.replace(".tsv", ""),
                create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )

    ret = p.run()
    if ret.state == PipelineState.DONE:
        print('Success!!!')
    else:
        print('Error Running beam pipeline')

gs://neo4j-bigquery-project/tsv/Category.tsv


ERROR:apache_beam.runners.common:Invalid GCS location: None.
Writing to BigQuery with FILE_LOADS method requires a GCS location to be provided to write files to be loaded into BigQuery. Please provide a GCS bucket through custom_gcs_temp_location in the constructor of WriteToBigQuery or the fallback option --temp_location, or pass method="STREAMING_INSERTS" to WriteToBigQuery. [while running '[60]: Write to Category.tsv output table/BigQueryBatchFileLoads/GenerateFilePrefix']
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1434, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/home/dgg32/anaconda3/envs/neo4j_bigquery/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 1963, in <lambda>
    wrapper = lambda x: [fn(x)]
                         ^^^^^
  File "/home/dgg32/anaconda3/envs/neo4j_bigquery/lib/python3.11/site-package

ValueError: Invalid GCS location: None.
Writing to BigQuery with FILE_LOADS method requires a GCS location to be provided to write files to be loaded into BigQuery. Please provide a GCS bucket through custom_gcs_temp_location in the constructor of WriteToBigQuery or the fallback option --temp_location, or pass method="STREAMING_INSERTS" to WriteToBigQuery. [while running '[60]: Write to Category.tsv output table/BigQueryBatchFileLoads/GenerateFilePrefix']