In [None]:
#! pip install google-cloud-bigquery-datatransfer

In [1]:
from google.cloud import bigquery, storage
import pydata_google_auth
from google.api_core.exceptions import AlreadyExists, NotFound, BadRequest, Forbidden 

from datetime import datetime, timedelta
import pytz
import sys

In [2]:
SCOPES = [
    'https://www.googleapis.com/auth/cloud-platform',
    'https://www.googleapis.com/auth/bigquery',
]

user_credentials = pydata_google_auth.get_user_credentials(
    SCOPES
)

In [3]:
# Projecto donde se va a ejecutar el job
PROJECT_ID = 'peya-data-qlty-stg'

TEMPORAL_PROJECT_ID = 'peya-data-qlty-stg'
TEMPORAL_DATASET_ID = 'data_export_temporal'

MAX_BYTES_BILLED = 5368709120 # 5GB

In [4]:
BIGQUERY_CLIENT = bigquery.Client(credentials=user_credentials, project=PROJECT_ID)

In [None]:
BIGQUERY_CLIENT.

### Parametros

In [None]:
# solicitud de exportación de data
USER_EMAIL = 'carlos.jaime@pedidosya.com'


In [None]:
# fuente de los datos, tabla o query
USER_QUERY = """
select order_id, o.business_type.business_type_id, o.business_type.business_type_name, o.amount_no_discount, o.commission, o.addressDescription
from `peya-bi-tools-pro.il_core.fact_orders` o
where registered_date >= '2020-01-01'
limit 100000000
"""

In [None]:
def humanbytes(B):
   'Return the given bytes as a human friendly KB, MB, GB, or TB string'
   B = float(B)
   KB = float(1024)
   MB = float(KB ** 2) # 1,048,576
   GB = float(KB ** 3) # 1,073,741,824
   TB = float(KB ** 4) # 1,099,511,627,776

   if B < KB:
      return '{0} {1}'.format(B,'Bytes' if 0 == B > 1 else 'Byte')
   elif KB <= B < MB:
      return '{0:.2f} KB'.format(B/KB), float(B/KB)
   elif MB <= B < GB:
      return '{0:.2f} MB'.format(B/MB), float(B/MB)
   elif GB <= B < TB:
      return '{0:.2f} GB'.format(B/GB), float(B/GB)
   elif TB <= B:
      return '{0:.2f} TB'.format(B/TB), float(B/TB)

In [None]:
# Custom methods 
class MaximumBytesExceeded(Exception):
    pass

class QueryStatementNotAllowed(Exception):
    pass

In [None]:
# fuente de los datos, tabla o query
USER_QUERY_TEST = """
SELECT * FROM `peya-uruguay.user_maria_elola.Reporte_Febrero_2021`
"""

In [None]:
def query_dry_run(**kwargs):
    """
    Check different parameters to return True to continue execution
    """
    # custom result dict
    query_dry_run_results = {'success':False,
                            'message':None}
    
    MAX_BYTES_PROCESSED = kwargs.get('max_bytes_billed',26843545600) # Default 25GB
    QUERY = kwargs.get('query')
    BQ_CLIENT = kwargs.get('client')
    
    assert (QUERY), 'Query parameter cannot be null'
    
    # Job config to validate query billing, rows, slots
    job_test_config = bigquery.QueryJobConfig(dry_run=True, 
                                              use_query_cache=False)
    
    try:
        # Check 
        job_query_test = BQ_CLIENT.query(query=QUERY,
                                           job_id_prefix='DataTech_ExportDataService_DryRun_Query',
                                           job_config=job_test_config)
        
        print(job_query_test.allow_large_results)
        
        # check only "SELECT" statements can be executed
        if job_query_test.statement_type != 'SELECT':
            raise QueryStatementNotAllowed
        
        # Check billed limit
        if job_query_test.total_bytes_processed >= MAX_BYTES_PROCESSED: 
            raise MaximumBytesExceeded
            
        # if all validations was ok
        query_dry_run_results['message'] = "This query will process {} ".format(humanbytes(job_query_test.total_bytes_processed))
        query_dry_run_results['success'] = True

    except MaximumBytesExceeded:
        query_dry_run_results['message'] = 'ERROR: Query Exceeded the maximum number of bytes allowed (It will be processed: {} | Max. allowed: {})'.format(humanbytes(job_query_test.total_bytes_processed),
                                                                                                                                                    humanbytes(MAX_BYTES_PROCESSED))
    except QueryStatementNotAllowed: 
        query_dry_run_results['message'] = 'ERROR: Query Statement {} NOT allowed, only SELECT statements are allowed.'.format(job_query_test.statement_type)
        
    except Forbidden as f:
        # Error trying to access to columns with policy tags
        query_dry_run_results['message'] = 'ERROR: BigQuery Client Forbidden: {}'.format(f.message)
    
    except BadRequest as e:
        # Error when the query has syntax problems
        query_dry_run_results['message'] = 'ERROR: BigQuery Client BadRequest: {}'.format(e.message)
        
    except:
        query_dry_run_results['message'] = 'ERROR: {}'.format(sys.exc_info()[0])
        
    return query_dry_run_results


In [None]:
query_dry_run(query=USER_QUERY, client=BIGQUERY_CLIENT)

### Creación de tabla temporal

In [None]:
def create_temporal_table(**kwargs):
    """
    Generate and export data from a table in google bigquery to a bucket and prefix in google storage.
    :param
        bigquery_client (bigquery.client): 
        user_email (str):
        user_query (str): 
    """
    
    # custom result dict
    create_temporal_table_result = {'success':False,
                                    'message':None,
                                    'temporal_table_id':None}
    
    # method arguments
    BQ_CLIENT = kwargs.get('bigquery_client')
    USER_EMAIL = kwargs.get('user_email','DefaultUser')
    USER_QUERY = kwargs.get('user_query')
    TEMPORAL_PROJECT_ID = kwargs.get('temporal_project_id')
    TEMPORAL_DATASET_ID = kwargs.get('temporal_dataset_id')
    TEMPORAL_EXPIRATION_TABLE = kwargs.get('expiration', 120)
    
    MAX_BYTES_BILLED_ALLOWED = kwargs.get('max_bytes_billed_allowed',26843545600) # Default 25GB
    
    assert (USER_EMAIL and USER_QUERY and TEMPORAL_PROJECT_ID and TEMPORAL_DATASET_ID and BQ_CLIENT), 'Arguments cannot be null'
    
    # Format user name
    names = USER_EMAIL.split('@')[0].split('.')
    user_name = ''.join([name.capitalize() for name in names])
    
    # First, run a test to measure the number of bytes to process and if the sql script is ok
    # Return a dict with results and messages
    dry_run_result = query_dry_run(query=USER_QUERY, 
                                   client= BQ_CLIENT, 
                                   max_bytes_billed=MAX_BYTES_BILLED_ALLOWED)
  
    # The query is fine, proceed to create a temporary table
    if dry_run_result.get('success',False):
        
        # Temporal Table Parameters
        # Timestamp to temporal table name
        current_timestamp = datetime.strftime(datetime.now(), "%Y%m%d_%H%M00")

        # temporal table id
        temporal_table_name = 'data_export_{user}_{timestamp}'.format(user=user_name, 
                                                                      timestamp=current_timestamp)
        # full temporal table id
        temporal_table_id = '{project}.{dataset}.{table_name}'.format(project=TEMPORAL_PROJECT_ID,
                                                                             dataset=TEMPORAL_DATASET_ID,
                                                                             table_name=temporal_table_name)


        # Then, generate a new table, if exists drop it and create a new one
        try:
            table = BQ_CLIENT.get_table(temporal_table_id)
            #Set new query to view
            BQ_CLIENT.delete_table(table=temporal_table_id, not_found_ok=True)

        except:

            print('Creating temporal table {}'.format(temporal_table_name))

        # Set configuration.query.destinationTable
        destination_dataset = BQ_CLIENT.dataset(TEMPORAL_DATASET_ID)
        destination_table = destination_dataset.table(temporal_table_name)

        # https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig
        job_config = bigquery.QueryJobConfig()
        
        # Destination table reference
        job_config.destination = destination_table
        # Set configuration.query.createDisposition
        job_config.create_disposition = 'CREATE_IF_NEEDED'

        try:

            # Start the query, it will create a temporal table
            job = BQ_CLIENT.query(query=USER_QUERY,
                                        job_id_prefix='DataTech_ExportDataService_Table', 
                                        job_config=job_config)

            # Wait for the query to finish
            job.result()
            
            assert job.state == 'DONE', 'Error when try to create the temporal table. job status result: {}'.format(job.state)
            
            # Update table metadata
            dataset_ref = bigquery.DatasetReference(TEMPORAL_PROJECT_ID, TEMPORAL_DATASET_ID)
            table_ref = dataset_ref.table(temporal_table_name)
            table = bigquery.Table(table_ref)
            
                    
            # Expiración de la tabla temporal
            expiration = datetime.now(pytz.utc) + timedelta(minutes=TEMPORAL_EXPIRATION_TABLE)
            # Descripción
            description = 'Temporal Table generate to export data into GCS.\nUser: {u}\nData Tech | Export Data Service'.format(u=USER_EMAIL)
            # Labels a nivel de la tabla
            labels = {"user":user_name.lower(),
                     "service":"export_data"}
            
            table.expires = expiration
            table.description = description
            table.labels = labels
            table = BQ_CLIENT.update_table(table, ["expires","description","labels"])  # API request
            
            # Finally, return results
            create_temporal_table_result['success'] = True
            create_temporal_table_result['message'] = 'Temporal table was created'
            create_temporal_table_result['temporal_table_id'] = temporal_table_id

        except BadRequest as e:
            create_temporal_table_result['message'] = 'ERROR: BigQuery Client BadRequest: {}'.format(e.message)

        except:
            create_temporal_table_result['message'] = 'Errors: {}'.format(sys.exc_info()[0])
            
    else:
        print('User query has one or more execution warnings\n{}'.format(dry_run_result.get('message')))
        create_temporal_table_result['message'] = dry_run_result.get('message')

    return create_temporal_table_result
    

### Configuración del job de exportación

In [None]:
def export_data_to_gcs(**kwargs):
    """
        Generate and export data from a table in google bigquery to a bucket and prefix in google storage.
        :param
            bigquery_client (bigquery.client): 
            user_email (str):
            source (str):
            bucket (str):
            prefix (str): 
            destination_format (str): 
                Specifies the type of file to be generated. The schema of the table that you want to export to GCS depends on this.
                Tables with nested or repeated fields cannot be exported as CSV.
                    'CSV' Specifies CSV format.
                    'NEWLINE_DELIMITED_JSON' Specifies newline delimited JSON format.
                    'PARQUET' Specifies Parquet format.
                    'AVRO' Specifies Avro format.
            
            compression (str): 
                The compression type to use for exported files. The default value is NONE.
                    'DEFLATE' Specifies DEFLATE format.
                    'GZIP' Specifies GZIP format.
                    'NONE' Specifies no compression.
                    'SNAPPY' Specifies SNAPPY format.
        :return
            (dict) status and message of process
    
    """
    
    # custom result dict
    export_data_to_gcs_result = {'success':False,
                                 'message':None,
                                 'authenticated_url':None}
    
    # method arguments
    BQ_CLIENT = kwargs.get('bigquery_client')
    USER_EMAIL = kwargs.get('user_email')
    SOURCE = kwargs.get('source')
    BUCKET = kwargs.get('bucket')
    PREFIX = kwargs.get('prefix')
    DESTINATION_FORMAT = kwargs.get('destination_format','CSV').upper()
    COMPRESSION = kwargs.get('compression')
    
    # Then, generate a new table, if exists drop it and create a new one
    try:
        table = BQ_CLIENT.get_table(SOURCE)
        
        table_size = table.num_bytes
        print('table size is {}'.format(table_size))
        
    except NotFound as n:
        export_data_to_gcs_result['message'] = 'Errors: Table {} does not exists \n{}'.format(SOURCE ,n.message) 
        return export_data_to_gcs_result
    except:
        export_data_to_gcs_result['message'] = 'Errors: {}'.format(sys.exc_info()[0])
        return export_data_to_gcs_result

    assert (BQ_CLIENT and USER_EMAIL and SOURCE and BUCKET), 'One or more parameters are null'

    try:
        source_project = SOURCE.split('.')[0]
        source_dataset = SOURCE.split('.')[-2]
        source_table = SOURCE.split('.')[-1]
        dataset_ref = bigquery.DatasetReference(source_project, source_dataset)
        table_ref = dataset_ref.table(source_table)
        
        full_table_id = table_ref # generate a TableReference Object
    except:
        export_data_to_gcs_result['message'] = 'Errors: bigquery.TableReference.from_string \n{}'.format(sys.exc_info()[0]) 
        return export_data_to_gcs_result
    
    # format user name
    names = USER_EMAIL.split('@')[0].split('.')
    user_name = ''.join([name.capitalize() for name in names])
    
    # Extract job labels
    job_export_labels = {
        "user": user_name.lower(),
        "source_dataset_id": source_dataset.lower(),
        "source_table_id": source_table.lower() 
    }
    
    # 
    extract_job_prefix = 'dq-test-job-export'
    extract_job_location = 'US'
    
    # File .csv name, format -> export_result_<user>_<timestamp>
    # Timestamp to temporal view name
    current_timestamp = datetime.strftime(datetime.now(), "%Y%m%d_%H%M00")
    current_timestamp_by_hour = datetime.strftime(datetime.now(), "%Y%m%d_%H0000")
    
    # URIs of Cloud Storage file(s) into which table data is to be extracted; in format
    NEW_PREFIX = '{prefix}/{userfolder}/{hour}'.format(prefix=PREFIX, 
                                                       hour=current_timestamp_by_hour,
                                                       userfolder=user_name)
    user_destination_path = 'gs://{bucket}/{new_prefix}'.format(bucket=BUCKET, new_prefix=NEW_PREFIX)
    
    try:
        
        # https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.ExtractJobConfig.html#google.cloud.bigquery.job.ExtractJobConfig
        extract_job_config = bigquery.job.ExtractJobConfig()
        
        # Set parameters to extract job
        extract_job_config.destination_format = DESTINATION_FORMAT
        extract_job_config.labels = job_export_labels
          
        if DESTINATION_FORMAT in ('CSV'):
            extract_job_config.field_delimiter = ';'
            extract_job_config.print_header = True
            
        # Generate URIs
        if COMPRESSION:
            extract_job_config.compression = COMPRESSION.upper()
            filename_wildcard = 'export_result_{user}_{timestamp}_part_*.{file_format}.{compression}'.format(user=user_name,
                                                                                                                     timestamp=current_timestamp,
                                                                                                                     file_format=DESTINATION_FORMAT.lower(),
                                                                                                                     compression=COMPRESSION.lower())
            
        else:
            filename_wildcard = 'export_result_{user}_{timestamp}_part_*.{file_format}'.format(user=user_name,
                                                                                                       timestamp=current_timestamp,
                                                                                                       file_format=DESTINATION_FORMAT.lower()
                                                                                                      )
    except:
        export_data_to_gcs_result['message'] = 'Error when try to create the ExtractJobConfig'
    
    print('generating file into GCS --> {}'.format(filename_wildcard))
    
    try:
        # Start a job to extract a table into Cloud Storage files.
        extract_job = BQ_CLIENT.extract_table(source=full_table_id,
                                                destination_uris='{}/{}'.format(user_destination_path,filename_wildcard),
                                                location=extract_job_location,
                                                job_config=extract_job_config, 
                                                job_id_prefix=extract_job_prefix
                                               )  # API request

        extract_job.result()  # Waits for job to complete.
        
    except BadRequest as e:
        export_data_to_gcs_result['message'] = 'ERROR: BigQuery Client BadRequest: {}'.format(e.message)
    except:
        export_data_to_gcs_result['message'] = 'Errors: {}'.format(sys.exc_info()[0])  
         
    ### Generate Compose File
    
    # Google Storage Client
    try:
        storage_client = storage.Client()
        print('Storage Client OK')
    except:
        export_data_to_gcs_result['message'] = 'Errors: Unable to connect with Google Store API {}'.format(sys.exc_info()[0]) 
        
    #
        
    try:
        blobs = storage_client.list_blobs(bucket_or_name=BUCKET, prefix=NEW_PREFIX)

        # list of files generated
        list_files = []
        list_blobs = []

        # Loop into bucket using prefix 
        for blob in blobs:
            list_files.append(blob.name)
            list_blobs.append(blob)

        compose_filename = filename_wildcard.replace('part_*','compose')
        destination_prefix = '{}/{}'.format(NEW_PREFIX, compose_filename)

        # Create compose filename
        bucket = storage_client.bucket(BUCKET)
        blob = bucket.blob(destination_prefix)

        # Merge files into a one
        destination = bucket.blob(destination_prefix)
        destination.content_type = "text/plain"
        destination.compose(list_blobs)

        # Grant read permission to user
        #result_blob = storage.Blob(destination_prefix, bucket)
        destination.acl.user(USER_EMAIL).grant_read()
        destination.acl.save()

        #
        # if everything is run correctly, it generates results message
        url_result_file = 'https://storage.cloud.google.com/{bucket}/{prefix}'.format(bucket=BUCKET, prefix=destination_prefix)

        export_data_to_gcs_result['success'] = True
        export_data_to_gcs_result['message'] = 'file was generated successfully in GCS'
        export_data_to_gcs_result['authenticated_url'] = url_result_file

    except:
        export_data_to_gcs_result['message'] = 'Errors: {}'.format(sys.exc_info()[0])  
    
        
    return export_data_to_gcs_result

In [None]:
# test
create_temporal_table_result =  create_temporal_table(user_email=USER_EMAIL, 
                      user_query=USER_QUERY, 
                      bigquery_client=BIGQUERY_CLIENT,
                      temporal_project_id=TEMPORAL_PROJECT_ID, 
                      temporal_dataset_id=TEMPORAL_DATASET_ID)

print(create_temporal_table_result)

In [None]:
# test
USER_EMAIL = 'guillermo.hernandez@pedidosya.com'
export_data_to_gcs_results = export_data_to_gcs(bigquery_client=BIGQUERY_CLIENT, 
                   user_email= USER_EMAIL, 
                   source=create_temporal_table_result.get('temporal_table_id'), 
                   bucket='peya-anonymization-libraries', 
                   prefix='peya-data-export-repo', 
                   destination_format='AVRO',
                   compression='GZIP'
                  )

print(export_data_to_gcs_results)

### Permisos de descarga del archivo al usuario

In [None]:
# Check 
 # Job config to validate query billing, rows, slots
job_test_config = bigquery.QueryJobConfig(dry_run=True,
                                          use_query_cache=False)

job_query_test = BIGQUERY_CLIENT.query(query=query_user,
                                   job_id_prefix='DataTech_ExportDataService_DryRun_Query',
                                       job_config=job_test_config)

In [None]:
schema = job_query_test._properties['statistics']['query']['schema']

In [None]:
fields = schema['fields']
not_plain_columns = []
for field in fields:
    field_name = field['name']
    field_type = field['type']
    field_mode = field['mode']
    if field_type in ('RECORD','ARRAY') or field_mode in ('REPEATED'):
        not_plain_columns.append([field_name,field_type, field_mode])


In [None]:
not_plain_columns

In [None]:
TEMPORAL_PROJECT_ID = 'peya-data-qlty-stg'
TEMPORAL_DATASET_ID = '_eca9662032d34636a2856a6fa78e751bd17ac6a1'

temporal_table_name = 'anonc32e728581a24957c6c949c458067be6c408f7cb'

In [None]:
# Update table metadata
dataset_ref = bigquery.DatasetReference(TEMPORAL_PROJECT_ID, TEMPORAL_DATASET_ID)
table_ref = dataset_ref.table(temporal_table_name)
table = bigquery.Table(table_ref)

query_user = 'SELECT * FROM `peya-bi-tools-pro.il_core.dim_partner` where 1=1'

In [None]:
table.created

In [None]:
try:
    table_ref = BIGQUERY_CLIENT.get_table('peya-data-qlty-stg.data_export_temporal.data_export_CarlosJaime_20210513_120400')
except NotFound as n:
    print('ERROR:  {}'.format(n.message))

In [None]:
not_plain_columns = []
for field in table_ref.schema:
    if field.field_type in ('RECORD','ARRAY'):
        not_plain_columns.append([field.name,field.field_type])

print(not_plain_columns)

In [None]:
import uuid

print(str(uuid.uuid4())[:8])