Skip to content

Commit

Permalink
trying to bypass jenkins AttributeError
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Apr 25, 2022
1 parent 88810ff commit 942b94e
Showing 1 changed file with 53 additions and 49 deletions.
102 changes: 53 additions & 49 deletions sdks/python/apache_beam/io/gcp/bigquery_json_it_test.py
Expand Up @@ -30,7 +30,8 @@

import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.internal.clients.bigquery import TableFieldSchema
from apache_beam.io.gcp.internal.clients.bigquery import TableSchema
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
Expand All @@ -43,35 +44,6 @@

JSON_TABLE_DESTINATION = f"{PROJECT}:{DATASET_ID}.{JSON_TABLE_NAME}"

JSON_FIELDS = [
bigquery.TableFieldSchema(
name='country_code', type='STRING', mode='NULLABLE'),
bigquery.TableFieldSchema(name='country', type='JSON', mode='NULLABLE'),
bigquery.TableFieldSchema(
name='stats',
type='STRUCT',
mode='NULLABLE',
fields=[
bigquery.TableFieldSchema(
name="gdp_per_capita", type='JSON', mode='NULLABLE'),
bigquery.TableFieldSchema(
name="co2_emissions", type='JSON', mode='NULLABLE'),
]),
bigquery.TableFieldSchema(
name='cities',
type='STRUCT',
mode='REPEATED',
fields=[
bigquery.TableFieldSchema(
name="city_name", type='STRING', mode='NULLABLE'),
bigquery.TableFieldSchema(
name="city", type='JSON', mode='NULLABLE'),
]),
bigquery.TableFieldSchema(name='landmarks', type='JSON', mode='REPEATED'),
]

JSON_TABLE_SCHEMA = bigquery.TableSchema(fields=JSON_FIELDS)

STREAMING_TEST_TABLE = "py_streaming_test" \
f"{time.time_ns() // 1000}_{randint(0,32)}"

Expand All @@ -82,6 +54,7 @@ def setUpClass(cls):
cls.test_pipeline = TestPipeline(is_integration_test=True)

def run_test_write(self, options):
json_table_schema = self.generate_schema()
rows_to_write = []
json_data = self.generate_data()
for country_code, country in json_data.items():
Expand Down Expand Up @@ -110,7 +83,7 @@ def run_test_write(self, options):
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
method=known_args.write_method,
table=known_args.output,
schema=JSON_TABLE_SCHEMA,
schema=json_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
))

Expand Down Expand Up @@ -223,30 +196,61 @@ def test_streaming_inserts(self):

self.run_test_write(options)

# Schema for writing to BigQuery
def generate_schema(self):
json_fields = [
TableFieldSchema(name='country_code', type='STRING', mode='NULLABLE'),
TableFieldSchema(name='country', type='JSON', mode='NULLABLE'),
TableFieldSchema(
name='stats',
type='STRUCT',
mode='NULLABLE',
fields=[
TableFieldSchema(
name="gdp_per_capita", type='JSON', mode='NULLABLE'),
TableFieldSchema(
name="co2_emissions", type='JSON', mode='NULLABLE'),
]),
TableFieldSchema(
name='cities',
type='STRUCT',
mode='REPEATED',
fields=[
TableFieldSchema(
name="city_name", type='STRING', mode='NULLABLE'),
TableFieldSchema(name="city", type='JSON', mode='NULLABLE'),
]),
TableFieldSchema(name='landmarks', type='JSON', mode='REPEATED'),
]

schema = TableSchema(fields=json_fields)

return schema

# Expected data for query test
def generate_query_data(self):
JSON_QUERY_DATA = [{
query_data = [{
'country_code': 'usa',
'past_leader': '\"George W. Bush\"',
'gdp': '58559.675',
'city_name': '\"Los Angeles\"',
'landmark_name': '\"Golden Gate Bridge\"'
},
{
'country_code': 'aus',
'past_leader': '\"Kevin Rudd\"',
'gdp': '58043.581',
'city_name': '\"Melbourne\"',
'landmark_name': '\"Great Barrier Reef\"'
},
{
'country_code': 'special',
'past_leader': '\"!@#$%^&*()_+\"',
'gdp': '421.7',
'city_name': '\"Bikini Bottom\"',
'landmark_name': "\"Willy Wonka's Factory\""
}]
return JSON_QUERY_DATA
{
'country_code': 'aus',
'past_leader': '\"Kevin Rudd\"',
'gdp': '58043.581',
'city_name': '\"Melbourne\"',
'landmark_name': '\"Great Barrier Reef\"'
},
{
'country_code': 'special',
'past_leader': '\"!@#$%^&*()_+\"',
'gdp': '421.7',
'city_name': '\"Bikini Bottom\"',
'landmark_name': "\"Willy Wonka's Factory\""
}]
return query_data

def generate_data(self):
# Raw country data
Expand Down Expand Up @@ -374,7 +378,7 @@ def generate_data(self):
}
}

JSON_DATA = {
data = {
"usa": {
"country": json.dumps(usa),
"cities": {
Expand Down Expand Up @@ -424,7 +428,7 @@ def generate_data(self):
}
}
}
return JSON_DATA
return data


if __name__ == '__main__':
Expand Down

0 comments on commit 942b94e

Please sign in to comment.