In [1]:
#Project: Import csv to bigquery table
#StartDate: 4/06/2022
#EndDate: 
#Developer: Bradley, Hongquy, Khoa

In [2]:
#Note: avro is a encoded file, so getting the total rows of the input file is currently not possible

In [3]:
from flask import Flask,render_template,request,redirect,url_for
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import storage
import os, re, datetime, uuid, time, json
import google.cloud.logging
import apache_beam as beam
from io import StringIO
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from werkzeug.utils import secure_filename
import avro
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro

In [4]:
project = 'model-craft-342921'
temp_location = 'gs://data_intake4062022/temp1'
region='us-east1'
service_account_email = 'practice-py@model-craft-342921.iam.gserviceaccount.com'
dataset = 'testing'
data_input = 'gs://data_intake4062022'
message_ouput = 'gs://complete_message_4182022'
bad_rows_input = 'gs://practice_error_logs'
local_file_loc = r"C:\\Users\Brad\temp.txt"
error_message = 'gs://error_messages4142022'

In [5]:
#this is to allow a authenticated user on the project to work on bigquerry
Key_path = r"C:\\Users\Brad\Documents\model-craft-342921-ea36cdb339e7.json"

#this is needed to request a job
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = Key_path
credentials = service_account.Credentials.from_service_account_file(Key_path,scopes=["https://www.googleapis.com/auth/cloud-platform"])
client=bigquery.Client(credentials=credentials,project=credentials.project_id)

#this is needed to allow the request to access gcp cloud storage
storage_client=storage.Client(credentials=credentials,project=credentials.project_id)

#this is where a temporary file is stored to be accessed by the cloud
bucket = storage_client.get_bucket("data_intake4062022")

#this is where the column location and message from bigquery is stored to be accessed later
bucket1 = storage_client.get_bucket("error_messages4142022")

#this is where the rows attempted to be inserted into bigquery is stored to be accessed later
bucket2 = storage_client.get_bucket("practice_error_logs")

bucket3 = storage_client.get_bucket("complete_message_4182022")

#this is needed to access logs from the bigquery job
client1 = google.cloud.logging.Client()
logger = client1.logger(name="dataflow.googleapis.com%2Fworker")

In [6]:
#this is a function created to format the input data into a dictionary so the data can be easily inserted into bigquery
class formating_csv(beam.DoFn):
    #table_sch is a side input that was found for the table we are trying to insert the data into
    def process(self, element, table_sch, job_name):
        import apache_beam as beam
        record = {}
        record['job_id'] = job_name
        for x in range(0, len(element)):
            if x < len(table_sch)-1:
                name = table_sch[x]['name']
                if element[x] == '':
                    record[name] = None
                else:
                    record[name] = element[x]
            else:
                record['unknown{}'.format(x)] = element[x]
        yield beam.pvalue.TaggedOutput('rec', record)
        yield beam.pvalue.TaggedOutput('count', 1)

In [7]:
class formating_avro(beam.DoFn):
    #table_sch is a side input that was found for the table we are trying to insert the data into
    def process(self, element, table_sch, job_name):
        import apache_beam as beam
        record = {}
        record['job_id'] = job_name
        for x in range(0, len(element)):
            if x < len(table_sch)-1:
                name = table_sch[x]['name']
                if element[name] == '':
                    record[name] = None
                else:
                    record[name] = element[name]
            else:
                record['unknown{}'.format(name)] = element[name]
        yield beam.pvalue.TaggedOutput('rec', record)
        yield beam.pvalue.TaggedOutput('count', 1)

In [8]:
class customJsonParse(beam.DoFn):
    def process (self, json_file,job_name):
        import apache_beam as beam
        import json
        data = []
        for i in json_file.splitlines():
            d = i.strip()
            d = d.strip(',')
            try:
                d = json.loads(d)
                d['job_id'] = job_name
                data.append(d)
            except:
                pass
        
        if data:
            for row_data in data:
                yield beam.pvalue.TaggedOutput('rec', row_data)
                yield beam.pvalue.TaggedOutput('count', 1)

In [9]:
class formating_parquet(beam.DoFn):
    #table_sch is a side input that was found for the table we are trying to insert the data into
    def process(self, element, job_name):
        import apache_beam as beam
        element['job_id'] = job_name
        yield beam.pvalue.TaggedOutput('rec', element)
        yield beam.pvalue.TaggedOutput('count', 1)

In [10]:
class formating2(beam.DoFn):
    def process(self, element, table_id):
        import apache_beam as beam
        element = element[1]
        element.pop('job_id', None)
        yield element

In [11]:
def beam_setup(job_name):
    return PipelineOptions(
                            #runner = 'DataflowRunner',
                            #runner='DirectRunner',
                            project = project,
                            job_name = '{}'.format(job_name),
                            temp_location = temp_location,
                            region=region,
                            service_account_email = service_account_email
                            )

In [12]:
def beam_setup_wGCP(job_name):
    GoogleCloudOptions.temp_location = temp_location
    GoogleCloudOptions.staging_location = '{}/stage'.format(temp_location)
    GoogleCloudOptions.project = project
    GoogleCloudOptions.region = region
    GoogleCloudOptions.job_name = job_name

In [13]:
def schema_fetch(table_id):
    SCHEMA = {}
    table_sch = []
    #in this try block it is attempting to get the table schema. its a side input
    try:
        SchemaJob = client.get_table('{}.{}.{}'.format(project,dataset,table_id))
        for s in SchemaJob.schema:
            new_dict = {}
            new_dict['name'] = s.name
            new_dict['type'] = s.field_type
            new_dict['mode'] = s.mode
            table_sch.append(new_dict)
        SCHEMA['fields'] = table_sch
        return SCHEMA
    except Exception as e:
        print(e)

In [14]:
def main_pipeline_csv(runner, beam_options,SCHEMA, job_name,filename,table_id):
    table_sch = SCHEMA['fields']
    p = beam.Pipeline(runner=runner,options=beam_options)              
    rec, count  = (p | 'ReadData' >> beam.io.ReadFromText('{}/data.csv'.format(data_input), skip_header_lines =1)
           | 'Split' >> beam.Map(lambda x: x.split(','))
            | 'format to dict2' >> beam.ParDo(formating_csv(),table_sch, job_name).with_outputs("rec", "count")
                  )
           
    (count | 'Count total records' >> beam.combiners.Count.Globally()
                     | 'Store count' >> beam.io.WriteToText('{}/count_temp.txt'.format(data_input), shard_name_template = "")
            )
    events  = (rec  | 'WriteToBigQuery2' >>  beam.io.gcp.bigquery.WriteToBigQuery(
               '{}:{}.{}'.format(project,dataset,table_id),
               schema=SCHEMA,
               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                batch_size = 100000,
               insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER,
               method='STREAMING_INSERTS')
            )
    (events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
             | "remove tuple" >> beam.ParDo(formating2(),table_id)
            | "Bad lines" >> beam.io.WriteToText('{}/{}.txt'.format(bad_rows_input,filename), shard_name_template = "")
    )
    result = p.run()
    result.wait_until_finish()

In [15]:
def main_pipeline_avro(runner, beam_options,SCHEMA, job_name,filename,table_id):
    table_sch = SCHEMA['fields']
    p = beam.Pipeline(runner = runner,options=beam_options)              
    rec, count  = (p | ReadFromAvro('{}/data.avro'.format(data_input))
            | 'format to dict2' >> beam.ParDo(formating_avro(),table_sch, job_name).with_outputs("rec", "count")
              )
    (count | 'Count total records' >> beam.combiners.Count.Globally()
                     | 'Store count' >> beam.io.WriteToText('{}/count_temp.txt'.format(data_input), shard_name_template = "")
            )
    events= (rec| 'WriteToBigQuery2' >>  beam.io.gcp.bigquery.WriteToBigQuery(
               '{}:{}.{}'.format(project,dataset,table_id),
               schema=SCHEMA,
               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                batch_size = 100000,
               insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER,
               method='STREAMING_INSERTS')
            )
    (events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
             | "remove tuple" >> beam.ParDo(formating2(),table_id)
            | "Bad lines" >> beam.io.WriteToText('{}/{}.txt'.format(bad_rows_input,filename), shard_name_template = "")
    )
    result = p.run()
    result.wait_until_finish()

In [16]:
def main_pipeline_json(runner, beam_options,SCHEMA, job_name,filename,table_id):
    table_sch = SCHEMA['fields']
    p = beam.Pipeline(runner = runner,options=beam_options)
    rec, count  = (p | 'Read Data 1' >> beam.io.ReadFromText('{}/data.json'.format(data_input))
             | 'Custom Parser 1' >> beam.ParDo(customJsonParse(),job_name).with_outputs("rec", "count")
              )
    (count | 'Count total records' >> beam.combiners.Count.Globally()
                     | 'Store count' >> beam.io.WriteToText('{}/count_temp.txt'.format(data_input), shard_name_template = "")
            )
    events = (rec| 'WriteToBigQuery2' >>  beam.io.gcp.bigquery.WriteToBigQuery(
               '{}:{}.{}'.format(project,dataset,table_id),
                schema=SCHEMA,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                batch_size = 100000,
               insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER,
               method='STREAMING_INSERTS')
          )
    (events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
             | "remove tuple" >> beam.ParDo(formating2(),table_id)
            | "Bad lines" >> beam.io.WriteToText('{}/{}.txt'.format(bad_rows_input,filename), shard_name_template = "")
    )
    result = p.run()
    result.wait_until_finish()

In [17]:
def main_pipeline_parquet(runner,beam_options,SCHEMA, job_name,filename,table_id):
    table_sch = SCHEMA['fields']
    p = beam.Pipeline(runner = runner,options=beam_options)              
    rec, count  = (p | 'ReadData' >> beam.io.ReadFromParquet('{}/data.parquet'.format(data_input))
                | 'format to dict2' >> beam.ParDo(formating_parquet(),job_name).with_outputs("rec", "count")
              )
    (count | 'Count total records' >> beam.combiners.Count.Globally()
                     | 'Store count' >> beam.io.WriteToText('{}/count_temp.txt'.format(data_input), shard_name_template = "")
            )
    events = (rec| 'WriteToBigQuery2' >>  beam.io.gcp.bigquery.WriteToBigQuery(
               '{}:{}.{}'.format(project,dataset,table_id),
               schema=SCHEMA,
               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                batch_size = 100000,
               insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER,
               method='STREAMING_INSERTS')
            )
    (events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
             | "remove tuple" >> beam.ParDo(formating2(),table_id)
            | "Bad lines" >> beam.io.WriteToText('{}/{}.txt'.format(bad_rows_input,filename), shard_name_template = "")
    )
    result = p.run()
    result.wait_until_finish()

In [18]:
def get_errors(filename):
    blob = bucket2.blob('{}.txt'.format(filename))
    downloaded_blob = blob.download_as_string()
    log_message = downloaded_blob.decode("utf-8", "ignore")
    log_message = log_message.replace("'",'"')
    log_message = log_message.replace("None", "null")
    log_message = log_message.replace('b"', "b'")
    bad_list =log_message.split('\n')
    bad_json = []
    bad_input_data = {}
    for row in bad_list:
        if row != '':
            j = json.loads(row)
            bad_json.append(j)
    bad_input_data['elements'] = bad_json
    return bad_input_data

In [19]:
#this is needed to run the website html
app = Flask(__name__)

#secret key is to allow this python code to move variable from the frontend to the backend
app.secret_key = "35367565"

@app.route('/')
def form():
    return redirect(url_for('upload'))

#This will render the frontend of the website to get the json file
@app.route('/upload', methods=['GET','POST'])
def upload():

    #resets the message so that previous messages dont confuse the user
    message = None
    
    #this will find all the tables being used in this project and display them as options to be used in bigquery
    tables = ['Task3'] #change--------------------------
    
    #this creates a unique job name for dataflow
    job_name = 'pythontobigquerry-{}'.format(uuid.uuid4())

    beam_options = beam_setup(job_name)
    #beam_options = beam_setup_wGCP(job_name)
    runner = 'DataflowRunner'
    #runner='DirectRunner'
    
    #this is needed to get the error logs from the bigquery in dataflow job
    filter_str = (
        f'resource.labels.job_name={job_name}'
        f' resource.type="dataflow_step"'
        f' AND severity >= ERROR'
    )
    
    
    #This will only run if the user attempts to submit a file
    if request.method == 'POST':

        #this will only run if what the user submitted is a file
        if 'file' in request.files:
            #this gets the file data
            file = request.files['file']
            #this aquires the name of the file
            filename = secure_filename(file.filename)
            
            #this will check it the file has data to be processed
            if len(file.readlines()) == 0:
                message = "File has no data to process"
            else:
                file.seek(0)
                try:
                    #this will only run if the file is a csv
                    if filename.endswith('.csv') or filename.endswith('.json') or filename.endswith('.avro') or filename.endswith('.parquet'):
                        #these two lines are to upload the data fetched from the front-end to gcp cloud storage
                        if filename.endswith('.csv'):
                            blob = bucket.blob('data.csv')
                            blob.upload_from_file(file, timeout=3600)
                        elif filename.endswith('.avro'):
                            blob = bucket.blob('data.avro')
                            blob.upload_from_file(file, timeout=3600)
                        elif filename.endswith('.json'):
                            blob = bucket.blob('data.json')
                            blob.upload_from_file(file, timeout=3600)
                        elif filename.endswith('.parquet'):
                            blob = bucket.blob('data.parquet')
                            blob.upload_from_file(file, timeout=3600)
                        
                        #file.seek(0)
                        #this gets the total number of records to be inserted to bigquery. its a side input
                        #total_records = len(file.readlines())
                        #total_records = 10
                        
                        #this gets the table wanting to be used from the front end
                        table_id = request.form.getlist('checks')[0]
                        SCHEMA = schema_fetch(table_id)
                        
                        #these two lines create a unique name for the files being saved to gcp cloud storage. files with the same name but in differnt buckets are tied together
                        filename_helper = str(datetime.datetime.now())
                        u_filename = "LOG: "+ filename_helper
                        
                        #in this try block, it will attempted to create the first pipeline that will read the input data and try to write it to bigquery. all failed rows with be stored into the gcp cloud storage
                        try:
                            start = time.time()
                            if filename.endswith('.csv'):
                                main_pipeline_csv(runner,beam_options,SCHEMA, job_name,u_filename,table_id)
                            elif filename.endswith('.avro'):
                                main_pipeline_avro(runner,beam_options,SCHEMA, job_name,u_filename,table_id)
                            elif filename.endswith('.json'):
                                main_pipeline_json(runner,beam_options,SCHEMA, job_name,u_filename,table_id)
                            elif filename.endswith('.parquet'):
                                main_pipeline_parquet(runner,beam_options,SCHEMA, job_name,u_filename,table_id)
                                
                        except Exception as error:
                            print('This was the error: ', error)
                            message = "Error setting up the data ingestion pipeline"
                            
                        
                        #this try block is creating a side input with all the bad rows attempted to be inserted into bigquery
                        try:
                            bad_input_data = get_errors(u_filename)
                            errors = client.insert_rows_json('{}.{}.{}'.format(project,dataset,table_id), bad_input_data['elements'])
                            error_message = ''
                            with open('temp.txt', 'w') as m:
                                for i in errors:
                                    error_message ="""Error #{}, Message - {}, Location - {}, Inputted Data -  {}\n\n""".format(i['index'], i['errors'][0]['message'],i['errors'][0]['location'],bad_input_data['elements'][i['index']])
                                    m.write(error_message)
                            blob = bucket3.blob('{}.txt'.format(u_filename))
                            blob.upload_from_filename(local_file_loc, timeout=3600)
                            os.remove("temp.txt")
                            message = "Data uploaded to the Bigquery"
                        except Exception as error:
                            print('This was the error: ', error)
                            message = "Error setting up messaging"
                        
                        
                        blob = bucket.blob('count_temp.txt')
                        total_records = blob.download_as_string()
                        total_records = total_records.decode("utf-8", "ignore")
                        end = time.time()
                        Total_time = end - start
                        try:
                            # Good elements - read number of elements in BigQuery table
                            query = """SELECT count(*) FROM `{}.{}.{}` WHERE job_id = '{}'""".format(str(project),str(dataset),str(table_id),job_name)
                            results =  client.query(query)
                        except Exception as e:
                                message = "Error getting the query"
                                print("ERROR: could not query")
                        try:
                            num_good = ""
                            for row in results:
                                num_good = row["f0_"]
                            #Write to GCP bucket
                            msg = '''{}, {} seconds, {}, {}, {}\n'''.format(str(job_name),str(Total_time), str(total_records.strip()), str(num_good), str(len(bad_input_data['elements'])))
                            if os.path.exists('metrics.csv'):
                                with open('metrics.csv', 'a') as f:
                                    f.write(msg)
                            else:
                                with open('metrics.csv', 'w') as f:
                                    f.write('Project Id, Execution Time, Total Rows, Total good Rows, Total Bad Rows\n')
                                    f.write(msg)
                        except Exception as e:
                                message = "Error writting results"
                                print(e)
                                
                    #If the file is not a json or the csv this will run
                    else:
                        message = "File type is not excepted"
                    #endif
                except Exception as error:
                    print('This was the error: ', error)
                    message = "There was an error in creating the request"
            #endif
        #This will run if the submition is not a file type
        elif 'file' not in request.files:
            message = "There was no file to upload"
        #endif
    #endif

    #this will render the template on the website
    return render_template("front.html", message = message, tables = tables)


In [None]:
if __name__ == '__main__':
    app.run()

 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: off


 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
127.0.0.1 - - [27/Apr/2022 15:41:27] "[32mGET / HTTP/1.1[0m" 302 -
127.0.0.1 - - [27/Apr/2022 15:41:27] "[37mGET /upload HTTP/1.1[0m" 200 -
INFO:werkzeug:127.0.0.1 - - [27/Apr/2022 15:41:27] "[37mGET /upload HTTP/1.1[0m" 200 -


  is_streaming_pipeline = p.options.view_as(StandardOptions).streaming
ERROR:apache_beam.io.gcp.bigquery:There were errors inserting to BigQuery. Will not retry. Errors were [{'index': 3, 'errors': [{'reason': 'invalid', 'location': 'num', 'debugInfo': '', 'message': 'NUMERIC(10) has precision 10 and scale 0 but got a value that is not in range of [-9999999999, 9999999999]'}]}, {'index': 4, 'errors': [{'reason': 'invalid', 'location': 'limit', 'debugInfo': '', 'message': 'Field limit: STRING(5) has maximum length 5 but got a value with length 12'}]}, {'index': 5, 'errors': [{'reason': 'invalid', 'location': 'num', 'debugInfo': '', 'message': 'Invalid NUMERIC value: 1e+41'}]}, {'index': 6, 'errors': [{'reason': 'invalid', 'location': 'bytes', 'debugInfo': '', 'message': 'Failed to decode bytes input. Byte fields must be base64 encoded, value: @@@@#####)(*&%$.'}]}, {'index': 7, 'errors': [{'reason': 'invalid', 'location': 'time', 'debugInfo': '', 'message': "Could not parse 'nowtime' as 