## BigQuery Write API
On this notebook we will try the bigquery write api, so we can be ready to send new data to our tables.

For that we created a .proto file that can be seen on the api/proto_message.proto address.

and ran the following command 

```protoc -I=. --python_out=. ./proto_message.proto```

This created a .py file that can be seen on the repo. We will now use that generated file to create and parse our messages sent to BigQuery

In [1]:
import datetime, os, json, uuid

In [2]:
import proto_message_pb2 as bqm

In [15]:
job = bqm.Job()
job.id = -1
job.job = "Not registered job"

In [57]:
department = bqm.Department()
department.id = -1
department.department = "Not registered department"

In [6]:
hired_employee = bqm.Hired_employee()
hired_employee.id = 2000
hired_employee.name = "Alejandro Rojas"
hired_employee.datetime = datetime.datetime(2023,2,20,19).strftime("%Y-%m-%d %H:%M:%S")
hired_employee.department_id = 5
hired_employee.job_id = 179

In [14]:
 datetime.datetime(2023,2,20,19,5,12,34).timestamp()

1676937912.000034

In [9]:
hired_employee.SerializeToString()

b'\x08\xd0\x0f\x12\x0fAlejandro Rojas\x1a\x132023-02-20 19:00:00 \x05(\xb3\x01'

Great! We've created our protobuf messages. 

Now, following a kind guide written by matthieucham from stax labs, we will try to use a wrapper for our BigQuery calls. Initially try the one suggested by him, if not then tweak it

https://dev.to/stack-labs/13-tricks-for-the-new-bigquery-storage-write-api-in-python-296e

In [32]:
help(descriptor_pb2.DescriptorProto)

Help on class DescriptorProto in module google.protobuf.descriptor_pb2:

class DescriptorProto(google.protobuf.pyext._message.CMessage, google.protobuf.message.Message)
 |  A ProtocolMessage
 |  
 |  Method resolution order:
 |      DescriptorProto
 |      google.protobuf.pyext._message.CMessage
 |      google.protobuf.message.Message
 |      builtins.object
 |  
 |  Data descriptors defined here:
 |  
 |  enum_type
 |      Field google.protobuf.DescriptorProto.enum_type
 |  
 |  extension
 |      Field google.protobuf.DescriptorProto.extension
 |  
 |  extension_range
 |      Field google.protobuf.DescriptorProto.extension_range
 |  
 |  field
 |      Field google.protobuf.DescriptorProto.field
 |  
 |  name
 |      Field google.protobuf.DescriptorProto.name
 |  
 |  nested_type
 |      Field google.protobuf.DescriptorProto.nested_type
 |  
 |  oneof_decl
 |      Field google.protobuf.DescriptorProto.oneof_decl
 |  
 |  options
 |      Field google.protobuf.DescriptorProto.options
 | 

In [36]:
help(bqm.DESCRIPTOR.CopyToProto)

Help on built-in function CopyToProto:

CopyToProto(...) method of google.protobuf.pyext._message.FileDescriptor instance



In [3]:
"""Wrapper around BigQuery call."""
from __future__ import annotations
from typing import Any, Iterable
import logging
from google.cloud import bigquery_storage
from google.cloud.bigquery_storage_v1 import exceptions as bqstorage_exceptions

from google.cloud.bigquery_storage_v1 import types, writer
from google.protobuf import descriptor_pb2
from google.protobuf.descriptor import Descriptor



class DefaultStreamManager:  # pragma: no cover
    """Manage access to the _default stream write streams."""

    def __init__(
        self,
        table_path: str,
        message_protobuf_descriptor: Descriptor,
        bigquery_storage_write_client: bigquery_storage.BigQueryWriteClient,
    ):
        """Init."""
        self.stream_name = f"{table_path}/_default"
        self.message_protobuf_descriptor = message_protobuf_descriptor
        self.write_client = bigquery_storage_write_client
        self.append_rows_stream = None

    def _init_stream(self):
        """Init the underlying stream manager."""
        # Create a template with fields needed for the first request.
        request_template = types.AppendRowsRequest()
        # The initial request must contain the stream name.
        request_template.write_stream = self.stream_name
        # So that BigQuery knows how to parse the serialized_rows, generate a
        # protocol buffer representation of our message descriptor.
        proto_schema = types.ProtoSchema()
        proto_descriptor = descriptor_pb2.DescriptorProto()  # pylint: disable=no-member
        self.message_protobuf_descriptor.CopyToProto(proto_descriptor)
        proto_schema.proto_descriptor = proto_descriptor
        proto_data = types.AppendRowsRequest.ProtoData()
        proto_data.writer_schema = proto_schema
        request_template.proto_rows = proto_data
        # Create an AppendRowsStream using the request template created above.
        self.append_rows_stream = writer.AppendRowsStream(
            self.write_client, request_template
        )

    def send_appendrowsrequest(
        self, request: types.AppendRowsRequest
    ) -> writer.AppendRowsFuture:
        """Send request to the stream manager. Init the stream manager if needed."""
        try:
            if self.append_rows_stream is None:
                self._init_stream()
            return self.append_rows_stream.send(request)
        except bqstorage_exceptions.StreamClosedError:
            # the stream needs to be reinitialized
            self.append_rows_stream.close()
            self.append_rows_stream = None
            raise

    # Use as a context manager

    def __enter__(self) -> DefaultStreamManager:
        """Enter the context manager. Return the stream name."""
        self._init_stream()
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Exit the context manager : close the stream."""
        if self.append_rows_stream is not None:
            # Shutdown background threads and close the streaming connection.
            self.append_rows_stream.close()


class BigqueryWriteManager:
    """Encapsulation for bigquery client."""

    def __init__(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
        bigquery_storage_write_client: bigquery_storage.BigQueryWriteClient,
        pb2_descriptor: Descriptor,
    ):  # pragma: no cover
        """Create a BigQueryManager."""
        self.bigquery_storage_write_client = bigquery_storage_write_client

        self.table_path = self.bigquery_storage_write_client.table_path(
            project_id, dataset_id, table_id
        )
        self.pb2_descriptor = pb2_descriptor

    def write_rows(self, pb_rows: Iterable[Any]) -> None:
        """Write data rows."""
        with DefaultStreamManager(
            self.table_path, self.pb2_descriptor, self.bigquery_storage_write_client
        ) as target_stream_manager:
            proto_rows = types.ProtoRows()
            # Create a batch of row data by appending proto2 serialized bytes to the
            # serialized_rows repeated field.
            for row in pb_rows:
                proto_rows.serialized_rows.append(row.SerializeToString())
            # Create an append row request containing the rows
            request = types.AppendRowsRequest()
            proto_data = types.AppendRowsRequest.ProtoData()
            proto_data.rows = proto_rows
            request.proto_rows = proto_data

            future = target_stream_manager.send_appendrowsrequest(request)

            # Wait for the append row requests to finish.
            print(future.result())


In [35]:
test = (
    BigqueryWriteManager(
        project_id = os.environ['GCP_PROJECT'],
        dataset_id = 'globant',
        table_id = 'departments',
        bigquery_storage_write_client = bigquery_storage.BigQueryWriteClient(),
        pb2_descriptor = bqm.Job.DESCRIPTOR
    )
)

In [59]:
test.write_rows([department])

Unknown: None There was a problem opening the stream. Try turning on DEBUG level logs to see the error.

Perfect! Now that we managed to write data into BigQuery we need to create a function that can validate the data incoming
before actually writing it into de Data Base. 

For that we will use out new protobuf knowledge, and letting them handle any errors regarding the wrong records.


One more thing before I forget it! 
We will create a logs table with every error, so we also have to modify our .proto file for a forth schema.

This is an ok request for our jobs table, so let's try to parse it into a protobuf list of messages

In [4]:
def parse_message(req, proto_class):
    proto = proto_class()
    for attributes in req.keys():
        try:
            setattr(proto, attributes, req[attributes])
        except AttributeError as e:
            raise e 
        except TypeError as e:
            raise e 
    return proto

def parse_request(request, proto_class):
    json_request = request.get_json()
    try:
        current_record = 1
        parsed_request = []
        for row in json_request:
            parsed_request.append(parse_message(row, proto_class))
            current_record = current_record + 1
        return parsed_request
    except AttributeError as e:
        error_string = str(e)
        return ({"msg": f"{error_string} on record number {current_record}"}), 400  
    except TypeError as e:
        error_string = str(e)
        return ({"msg": f"{error_string} on record number {current_record}"}), 400  
        
class Dummy_request():
    def __init__(self, json_request):
        self.json_request = json_request
    
    def get_json(self):
        return self.json_request

We are handling 2 types of errors here:
- AttributeError when users send a key that does not belong to the dataset schema
- TypeError when the type of a request does not match with the type registered at the proto file.

Let's try a correct request and see if everything works

In [82]:
good_job_request = Dummy_request([
    {'id': 184, 'job': 'Test Job'},
    {'id': 185, 'job': 'Test Job'},
    {'id': 186, 'job': 'Test Job'},
    {'id': 187, 'job': 'Test Job'},
    {'id': 188, 'job': 'Test Job'}
])

In [93]:
parsed_request = parse_request(good_job_request, bqm.Job)

In [96]:
parsed_request

[id: 184
 job: "Test Job",
 id: 185
 job: "Test Job",
 id: 186
 job: "Test Job",
 id: 187
 job: "Test Job",
 id: 188
 job: "Test Job"]

In [97]:
[type(x) for x in parsed_request]

[proto_message_pb2.Job,
 proto_message_pb2.Job,
 proto_message_pb2.Job,
 proto_message_pb2.Job,
 proto_message_pb2.Job]

Great!
We are parsing just fine a list of dictionaries.
Now let's try a couple of bad requests and see what happens.

Off course if we get 2 different errors, the function will return the first one to show up.

In [125]:
bad_job_request_attr_1 = Dummy_request([
    {'id': 184, 'job': 'Test Job'},
    {'id': 185, 'job': 'Test Job'},
    {'id': 186, 'job': 'Test Job'},
    {'idi': 187, 'job': 'Test Job'},
    {'id': 188, 'job': 'Test Job'}
])

In [129]:
bad_parsed_request_1 = parse_request(bad_job_request_attr_1, bqm.Job)

In [130]:
bad_parsed_request_1

({'msg': "'Job' object has no attribute 'idi' on record number 4"}, 400)

Now one with the wrong type

In [131]:
bad_job_request_attr_2 = Dummy_request([
    {'id': 184, 'job': 'Test Job'},
    {'id': 185, 'job': 'Test Job'},
    {'id': '186', 'job': 'Test Job'},
    {'id': 187, 'job': 'Test Job'},
    {'id': 188, 'job': 'Test Job'}
])

In [134]:
bad_parsed_request_2 = parse_request(bad_job_request_attr_2, bqm.Job)

In [135]:
bad_parsed_request_2

({'msg': "'186' has type str, but expected one of: int on record number 3"},
 400)

We are back,
This time we don't want to send an error message and stop the process when we find a bad record. We will instead save it into a bad records list, which will be uploaded later into the bad records table in BigQuery.

In [10]:
def parse_message(req, proto_class):
    proto = proto_class()
    for attributes in req.keys():
        try:
            setattr(proto, attributes, req[attributes])
        except AttributeError as e:
            raise e 
        except TypeError as e:
            raise e 
    return proto

def parse_wrong_record(req, error_class, table_name, error, datetime_utc):
    err = error_class()
    setattr(err, 'uuid', str(uuid.uuid4()))
    setattr(err,'content', json.dumps(req) )
    setattr(err,'table', table_name )
    setattr(err,'datetime', datetime_utc )
    setattr(err,'error', error )
    return err
    
    
def parse_request(request, proto_class, table_name):
    json_request = request.get_json()
    parsed_request = []
    bad_requests = []
    current_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    current_record = 1
    for row in json_request:
        try:
            parsed_request.append(parse_message(row, proto_class))
        except AttributeError as e:
            error_string = str(e)
            bad_requests.append(
                parse_wrong_record(
                    row,
                    bqm.Wrong_record,
                    table_name,
                    f"{error_string} on record number {current_record}",
                    current_time
                )
            )
            #return ({"msg": f"{error_string} on record number {current_record}"}), 400  
        except TypeError as e:
            error_string = str(e)
            bad_requests.append(
                parse_wrong_record(
                    row,
                    bqm.Wrong_record,
                    table_name,
                    f"{error_string} on record number {current_record}",
                    current_time
                )
            )
            #return ({"msg": f"{error_string} on record number {current_record}"}), 400
        
        current_record = current_record + 1
    return parsed_request, bad_requests
        
class Dummy_request():
    def __init__(self, json_request):
        self.json_request = json_request
    
    def get_json(self):
        return self.json_request

Let's try the following list with 2 bad records

In [6]:
bad_job_request_2_rec = Dummy_request([
    {'id': 184, 'job': 'Test Job'},
    {'id': '185', 'job': 'Test Job'},
    {'id': 186, 'job': 'Test Job'},
    {'idi': 187, 'job': 'Test Job'},
    {'id': 188, 'job': 'Test Job'}
])

In [11]:
mixed_parse_request = parse_request(bad_job_request_2_rec, bqm.Job, 'Jobs')

In [12]:
mixed_parse_request

([id: 184
  job: "Test Job",
  id: 186
  job: "Test Job",
  id: 188
  job: "Test Job"],
 [uuid: "de04f339-bd2c-4d98-aea9-10c1f13bede5"
  content: "{\"id\": \"185\", \"job\": \"Test Job\"}"
  table: "Jobs"
  error: "\'185\' has type str, but expected one of: int on record number 2"
  datetime: "2023-02-15 01:47:50",
  uuid: "fda633f3-d6a7-4603-b8a1-d342da56c75e"
  content: "{\"idi\": 187, \"job\": \"Test Job\"}"
  table: "Jobs"
  error: "\'Job\' object has no attribute \'idi\' on record number 4"
  datetime: "2023-02-15 01:47:50"])

In [9]:
str(uuid.uuid4())

'95545431-d156-41a3-9176-6b95a47946c0'

In [23]:
mixed_parse_request[1][0].uuid

UUID('723709d2-6f4f-452b-a833-a12b66f7fe39')

In [24]:
mixed_parse_request[1][0].content

'{"idi": 187, "job": "Test Job"}'

In [25]:
mixed_parse_request[1][0].table

'Jobs'

In [26]:
mixed_parse_request[1][0].error

"'Job' object has no attribute 'idi' on record number 4"

In [27]:
mixed_parse_request[1][0].datetime

'2023-02-14 23:56:17'

This looks good. I think we are ready to set up our function inside our api

In [30]:
print(json.dumps(bad_job_request_2_rec.get_json()))

[{"id": 184, "job": "Test Job"}, {"id": "185", "job": "Test Job"}, {"id": 186, "job": "Test Job"}, {"idi": 187, "job": "Test Job"}, {"id": 188, "job": "Test Job"}]


Just before we start testing our API, let's try the entire flow of functions and check if they work

In [36]:
final_test = (
    BigqueryWriteManager(
        project_id = os.environ['GCP_PROJECT'],
        dataset_id = 'globant',
        table_id = 'jobs',
        bigquery_storage_write_client = bigquery_storage.BigQueryWriteClient(),
        pb2_descriptor = bqm.Job.DESCRIPTOR
    )
)

In [40]:
final_test.write_rows(
    mixed_parse_request[0]
)

append_result {
}



In [13]:
final_test_bad = (
    BigqueryWriteManager(
        project_id = os.environ['GCP_PROJECT'],
        dataset_id = 'globant',
        table_id = 'error_logs',
        bigquery_storage_write_client = bigquery_storage.BigQueryWriteClient(),
        pb2_descriptor = bqm.Wrong_record.DESCRIPTOR
    )
)

In [14]:
final_test_bad.write_rows(
    mixed_parse_request[1]
)

append_result {
}

