In [4]:
import pandas as pd
import os
from google.cloud import bigquery
from io import StringIO
from google.cloud import storage

# Set the path to your credentials file
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "credentials.json"
project_id = 'lbg-labs-project2'
bucket_name = 'lbg-labs-project2'
file_name = 'ny-taxi-2018-sample.csv'
dataset_id = 'trips'
source_table_id = 'zone_id_mappings'
destination_table_id = 'ny_taxi_with_zone_data'

# Initialize Google Cloud Storage client
gcsclient = storage.Client.from_service_account_json('credentials.json')

# Get the GCS bucket
bucket = gcsclient.get_bucket(bucket_name)

# Specify the GCS blob for the source data
blob = bucket.blob(file_name)

# Initialize BigQuery client
bqclient = bigquery.Client(project=project_id)

# Download CSV data from GCS and read it into a Pandas DataFrame
file_as_string = blob.download_as_string()
taxi_data = pd.read_csv(StringIO(file_as_string.decode('utf-8')))

# Filter out rows with trip_distance > 0
taxi_data_filtered = taxi_data[taxi_data['trip_distance'] > 0.0]

# Define the BigQuery table reference for the source table
table_ref_source = bqclient.dataset(dataset_id).table(source_table_id)

# Construct a SQL query to select all rows from the source table
query = f"SELECT * FROM `{table_ref_source.project}.{table_ref_source.dataset_id}.{table_ref_source.table_id}`"

# Execute the query and retrieve the result into a DataFrame
query_job = bqclient.query(query)
zone_data = query_job.to_dataframe()

# Convert 'zone_id' columns to integer type for merging
zone_data['zone_id'] = zone_data['zone_id'].astype(int)
taxi_data_filtered = taxi_data_filtered.copy()
taxi_data_filtered['dropoff_location_id'] = taxi_data_filtered['dropoff_location_id'].astype(int)

# Merge the filtered taxi data with zone data based on 'dropoff_location_id'
merged_data = taxi_data_filtered.merge(zone_data, how='inner', left_on='dropoff_location_id', right_on='zone_id')

# Drop the specified columns
merged_data = merged_data.drop(columns=['pickup_location_id', 'dropoff_location_id', 'zone_id'])

# Rename the specified columns
merged_data = merged_data.rename(columns={'zone_name': 'dropoff_zone', 'borough': 'dropoff_borough'})

# Define the BigQuery destination dataset and table
table_ref_sink = bqclient.dataset(dataset_id).table(destination_table_id)

# Create a new table in BigQuery
table = bigquery.Table(table_ref_sink)
bqclient.create_table(table)

# Configure the job for loading data into BigQuery
job_config = bigquery.LoadJobConfig(
    autodetect=True,
    write_disposition="WRITE_TRUNCATE",  # Overwrite existing data
)

# Load the merged data into the destination table in BigQuery
bqclient.load_table_from_dataframe(merged_data, table_ref_sink, job_config=job_config).result()

# Print the first few rows of the merged data
print(merged_data.head())


       pickup_datetime     dropoff_datetime  passenger_count  trip_distance  \
0  2018-03-27T13:17:01  2018-03-27T13:45:15                2           7.45   
1  2018-08-18T22:48:08  2018-08-18T23:03:14                1           9.10   
2  2018-01-07T15:03:56  2018-01-07T15:41:36                5          13.39   
3  2018-08-29T16:46:15  2018-08-29T16:58:10                1           1.30   
4  2018-11-07T15:41:10  2018-11-07T15:49:23                1           0.88   

   payment_type  fare_amount  extra  tip_amount  total_amount dropoff_zone  \
0             1         25.5    0.0        0.00         26.30  Parkchester   
1             1         25.5    0.5        6.50         39.06  Parkchester   
2             1         41.5    0.0        9.61         57.67     Flatiron   
3             2          9.0    1.0        0.00         10.80     Flatiron   
4             2          6.5    0.0        0.00          7.30     Flatiron   

  dropoff_borough  
0           Bronx  
1           Bron