In [None]:
#Authentication and Installing Beam
!gcloud auth application-default login

In [None]:
!!pip install --upgrade -r requirements.txt

In [None]:
!pip install virtualenv
!python -m venv beam-env

In [None]:
#Activate Virtual Environment
!beam-env\Scripts\activate

In [None]:
#checking Beam Version
import apache_beam as beam
beam.__version__

In [None]:
#checking Default Credentials
from google.auth import default

credentials, project = default()
print(f"Active project: {project}")
print(f"Credentials: {credentials}")

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions
import json
from apache_beam.transforms.sql import SqlTransform
from datetime import datetime
import logging

# Configuration constants
PROJECT_ID = "coastal-throne-433510-a5"
DATASET_ID = "pubsub_data"
TABLE_NAME = "employee_salaries_with_bonus_tbl"
BUCKET_NAME = "coastal-throne-433510-a5-dataflow-bucket"
REGION = "us-central1"
SUBSCRIPTION = f"projects/{PROJECT_ID}/subscriptions/employee_sal_data_sub"
TEMP_LOCATION = f"gs://{BUCKET_NAME}/temp"
STAGING_LOCATION = f"gs://{BUCKET_NAME}/staging"

# Add schema using beam.Row
def add_schema(record):
    return beam.Row(
        first_name=str(record['First Name']),
        gender=str(record['Gender']),
        start_date=str(record['Start Date']),
        last_login_time=str(record['Last Login Time']),
        salary=float(record['Salary']),
        bonus_percent=float(record['Bonus %']),
        senior_management=bool(record['Senior Management']),
        team=str(record['Team'])
    )

# Convert beam.Row to a dictionary for BigQuery serialization, adding insert_timestamp
def row_to_dict(row):
    return {
        'insert_timestamp': datetime.utcnow().isoformat(),
        'first_name': row.first_name,
        'gender': row.gender,
        'start_date': row.start_date,
        'last_login_time': row.last_login_time,
        'salary': row.salary,
        'bonus_percent': row.bonus_percent,
        'bonus_actual': row.bonus_actual,
        'senior_management': row.senior_management,
        'team': row.team
    }

def run():
    # Define your pipeline options with autoscaling limited to 2 workers
    options = PipelineOptions(
        runner='DataflowRunner',
        project=PROJECT_ID,
        temp_location=TEMP_LOCATION,
        staging_location=STAGING_LOCATION,
        region=REGION,
        streaming=True,
        save_main_session=True
    )
    worker_options = options.view_as(WorkerOptions)
    worker_options.max_num_workers = 2

    # Create the pipeline
    with beam.Pipeline(options=options) as p:
        # Read data from Pub/Sub
        pubsub_data = (
            p
            | "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION)
            | "Parse JSON" >> beam.Map(lambda x: json.loads(x))
            | "Add Schema" >> beam.Map(add_schema)
        )

        # Apply a fixed window of 20 seconds
        windowed_data = (
            pubsub_data
            | "Apply 20-second Window" >> beam.WindowInto(beam.window.FixedWindows(20))
        )

        # Use an SQL transform to calculate the bonus
        bonus_calculation_query = """
        SELECT 
            first_name, 
            gender, 
            start_date, 
            last_login_time, 
            salary, 
            bonus_percent, 
            (bonus_percent / 100) * salary AS bonus_actual, 
            senior_management, 
            team
        FROM PCOLLECTION
        """
        
        standardized_data = (
            windowed_data
            | "Calculate Bonus" >> SqlTransform(bonus_calculation_query)
        )

        # Convert rows to dictionaries before writing to BigQuery
        standardized_data_dict = standardized_data | "Row to Dict" >> beam.Map(row_to_dict)

        # Write to BigQuery - Employee Salaries with insert_timestamp
        standardized_data_dict | "Write to BigQuery - Employee Salaries" >> beam.io.WriteToBigQuery(
            table=f"{PROJECT_ID}:{DATASET_ID}.{TABLE_NAME}",
            schema=(
                'insert_timestamp:TIMESTAMP, first_name:STRING, gender:STRING, start_date:STRING, '
                'last_login_time:STRING, salary:FLOAT, bonus_percent:FLOAT, bonus_actual:FLOAT, '
                'senior_management:BOOLEAN, team:STRING'
            ),
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )

# Execute the pipeline
if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

In [None]:
!deactivate