In [2]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
import json
import logging
from apache_beam.io.gcp.bigquery import WriteToBigQuery

# Define the schema for BigQuery table
schema = 'id:INTEGER, name:STRING, age:INTEGER, gender:STRING, position:STRING, department:STRING, location:STRING, salary:FLOAT, start_date:DATE, email:STRING, phone:STRING, address:STRING, manager:STRING, team:STRING, skills:STRING, projects:STRING, status:STRING, performance_rating:FLOAT, last_promotion:DATE, comments:STRING'

class ParseJson(beam.DoFn):
    def process(self, element):
        try:
            record = json.loads(element)
            # Convert lists to comma-separated strings for BigQuery
            record['skills'] = ','.join(record['skills'])
            record['projects'] = ','.join(record['projects'])
            yield record
        except json.JSONDecodeError as e:
            logging.error(f"Error decoding JSON: {e} - Element: {element}")

def run():
    # Set your Google Cloud project ID
    project_id = 'techlanders-internal'
    bucket = 'gs://sandeep-apache/employee.json'
    dataset_id = 'employee'
    table_id = 'employee_table'
    region = 'us-east1'  # Specify your region

    # Beam pipeline options
    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = project_id
    google_cloud_options.job_name = 'employee-dataflow-job'
    google_cloud_options.staging_location = 'gs://sandeep-apache/staging'
    google_cloud_options.temp_location = 'gs://sandeep-apache/temp'
    google_cloud_options.region = region
    options.view_as(StandardOptions).runner = 'DataflowRunner'
    
    with beam.Pipeline(options=options) as p:
        (p
         | 'Read from GCS' >> beam.io.ReadFromText(bucket)  # Read JSON data from GCS
         | 'Parse JSON' >> beam.ParDo(ParseJson())  # Parse JSON and format for BigQuery
         | 'Write to BigQuery' >> WriteToBigQuery(  # Write to BigQuery table
                table=f'{project_id}:{dataset_id}.{table_id}',
                schema=schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            ))

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()


INFO:root:Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/beam_python3.10_sdk:2.54.0
INFO:root:Python SDK container image set to "gcr.io/cloud-dataflow/v1beta3/beam_python3.10_sdk:2.54.0" for Docker environment
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://sandeep-apache/staging/employee-dataflow-job.1719578254.291491/pipeline.pb...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://sandeep-apache/staging/employee-dataflow-job.1719578254.291491/pipeline.pb in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
 clientRequestId: '20240628123734292431-2331'
 createTime: '2024-06-28T12:37:34.906673Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2024-06-28_05_37_34-15191894352054664916'
 location: 'us-east1'
 name: 'employee-dataflow-job'
 projectId: 'techlanders-internal'
 stageStates: []
 startTime: '2024-06-28T12:37:34.906673Z'
 steps: []
 tempFiles: []
 type: TypeV