In [1]:
import pandas as pd
import gcsfs
import os
import pytz
import uuid
from datetime import datetime
import numpy as np
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions,SetupOptions
from google.cloud import bigquery,secretmanager
from pandas_gbq import schema

In [None]:

gcs_file_path = "gs://input_data_capstone/input_data/cleaned_data.csv"
bigquery_client = bigquery.Client()

In [2]:
# Configuration
BUCKET_NAME = 'input_data_capstone'
CSV_FILE_NAME = 'input_data/cleaned_data.csv'
BQ_DATASET = 'CAPSTONE_PROJECT'
BQ_TABLE = 'HOTEL_BOOKING'
BQ_PROJECT = "iitj-capstone-project-group-18"
BIGQUERY_SCHEMA ={'fields': [{'name': 'guid', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'hotel', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'is_canceled', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'lead_time', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'arrival_date_year', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'arrival_date_month', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'arrival_date_week_number', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'arrival_date_day_of_month', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'stays_in_weekend_nights', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'stays_in_week_nights', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'adults', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'children', 'type': 'FLOAT', "mode": "NULLABLE"},
  {'name': 'babies', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'meal', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'country', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'market_segment', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'distribution_channel', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'is_repeated_guest', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'previous_cancellations', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'previous_bookings_not_canceled', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'reserved_room_type', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'assigned_room_type', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'booking_changes', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'deposit_type', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'agent', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'company', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'days_in_waiting_list', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'customer_type', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'adr', 'type': 'FLOAT', "mode": "NULLABLE"},
  {'name': 'required_car_parking_spaces', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'total_of_special_requests', 'type': 'INTEGER', "mode": "NULLABLE"},
  {'name': 'reservation_status', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'reservation_status_date', 'type': 'STRING', "mode": "NULLABLE"},
  {'name': 'insert_timestamp', 'type': 'TIMESTAMP', "mode": "NULLABLE"}]}

In [None]:
input_df = pd.read_csv(gcs_file_path)

In [3]:
def generate_synthetic_data(input_df,num_synthetic_samples):
    synthetic_data = pd.DataFrame()
    for column in input_df.columns:
        synthetic_data[column] = np.random.choice(input_df[column], size=num_synthetic_samples, replace=True)
    # Add GUID and timestamp to the synthetic data
    guid = str(uuid.uuid4())  # Generate a unique GUID as a string
    UTC = pytz.utc
    ind_zone = pytz.timezone('Asia/Kolkata')
    insert_timestamp = datetime.now(ind_zone)  # Current timestamp in IST
    synthetic_data['guid'] = guid
    synthetic_data['insert_timestamp'] = insert_timestamp
    synthetic_data = synthetic_data.drop('Unnamed: 0', axis=1)
    return synthetic_data

In [None]:
synthetic_df = generate_synthetic_data(input_df, 60)
records = synthetic_df.to_dict(orient='records')

In [None]:
options = PipelineOptions(
        project=BQ_PROJECT,
        temp_location=f'gs://dataflow-apache-quickstart_iitj-capstone-project-group-18/temp',
        region='asia-south1',  
        runner='DataflowRunner' ,
        job_name="ingestion-pipeline" 
    )

In [None]:
with beam.Pipeline(options=options) as p:
        (
            p
            | 'Create Records' >> beam.Create(records)
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                table=f"iitj-capstone-project-group-18:CAPSTONE_PROJECT.HOTEL_BOOKING",
                schema=BIGQUERY_SCHEMA,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
        )