In [45]:
!pip install pandas sqlalchemy pyodbc



In [3]:
import  glob, os, pandas as pd
from sqlalchemy import create_engine, text

In [5]:
# Credentials
server = 'sqlmsazure.database.windows.net'
database = 'taxis'
username = 'sarmadcute83'
password = 'Azurepowerbi@123'  # Replace with your actual password
driver = 'ODBC Driver 17 for SQL Server'

# Encode special characters in password (e.g., @ = %40)
password = password.replace('@', '%40')

# Build connection string
connection_string = (
    f"mssql+pyodbc://{username}:{password}@{server}:1433/{database}"
    f"?driver={driver.replace(' ', '+')}"
)

# Create SQLAlchemy engine
engine = create_engine(connection_string, fast_executemany=True)

# Test connection
with engine.connect() as conn:
    result = conn.execute(text("SELECT GETDATE();"))
    print("✅ Connected successfully. Server time:", result.fetchone())


✅ Connected successfully. Server time: (datetime.datetime(2025, 5, 29, 18, 8, 27, 393000),)


In [7]:
folder_path = "C:/Users/sarma/Power BI/NYC Yellow Taxi/dataset"
files_2024 = glob.glob(os.path.join(folder_path, 'yellow_tripdata_2024-*.parquet'))

In [9]:
files_2023 = glob.glob(os.path.join(folder_path, 'yellow_tripdata_2023-*.parquet'))

In [11]:
df_2023 = pd.concat([pd.read_parquet(file) for file in files_2023])

In [12]:
df_2024 = pd.concat([pd.read_parquet(file) for file in files_2024])

In [13]:
# Merge airport fee columns based on which one has the values
df_2023['Airport_fee'] = df_2023['airport_fee'].combine_first(df_2023['Airport_fee'])

In [14]:
# Lets drop the extra airport fee column
df_2023.drop(columns = ['airport_fee'],  inplace=True)

In [15]:
final_df = pd.concat([df_2023, df_2024], ignore_index=True)

In [16]:
columns = [
    'tpep_pickup_datetime', # The date and time when the meter was engaged.
    'tpep_dropoff_datetime', # The date and time when the meter was disengaged.
    'passenger_count', # The number of passengers in the vehicle.
    'trip_distance', # The elapsed trip distance in miles reported by the taximeter.
    'PULocationID', # TLC Taxi Zone in which the taximeter was engaged
    'DOLocationID', # TLC Taxi Zone in which the taximeter was disengaged
    'payment_type', # 0 = Flex Fare trip 1 = Credit card 2 = Cash 3 = No charge 4 = Dispute 5 = Unknown 6 = Voided trip
    'fare_amount', # The time-and-distance fare calculated by the meter.
    'extra', # Miscellaneous extras and surcharges.
    'mta_tax', # Tax that is automatically triggered based on the metered rate in use.
    'tip_amount', # Tip amount – This field is automatically populated for credit card tips. Cash tips are not included.
    'tolls_amount', # Total amount of all tolls paid in trip.
    'total_amount', # The total amount charged to passengers. Does not include cash tips.
    'congestion_surcharge', # Total amount collected in trip for NYS congestion surcharge.
    'Airport_fee', # For pick up only at LaGuardia and John F. Kennedy Airports.
]

In [17]:
taxis_df = final_df[columns]

In [18]:
# Lets create a record_id to match with our SQL table
taxis_df = taxis_df.copy() # Fix copy of a slice warning from pandas
taxis_df.reset_index(drop=True, inplace=True)
taxis_df['record_id'] = taxis_df.index + 1

In [19]:
taxis_df.shape

(79479946, 16)

In [None]:
table_name = 'yellow_tripdata_tmp'
staging_table = 'staging_chunk'
chunk_size = 200_000
total_rows = len(taxis_df)
checkpoint_file = 'upload_checkpoint.txt'
start = 0
if os.path.exists(checkpoint_file):
    with open(checkpoint_file, 'r') as f:
        start = int(f.read().strip())

while start < total_rows: 
    end = min(start + chunk_size, total_rows)
    chunk = taxis_df.iloc[start:end]
    try:
        chunk.to_sql(
            staging_table,
            con=engine,
            if_exists='append',
            index=False,
            method=None
        )
        print(f"Uploaded chunk {start}-{end} to staging table")
        merge_sql = f"""
        MERGE {table_name} AS target
        USING {staging_table} AS source
        ON target.record_id = source.record_id
        WHEN MATCHED THEN
            UPDATE SET
                target.tpep_pickup_datetime = source.tpep_pickup_datetime,
                target.tpep_dropoff_datetime = source.tpep_dropoff_datetime,
                target.passenger_count = source.passenger_count,
                target.trip_distance = source.trip_distance,
                target.PULocationID = source.PULocationID,
                target.DOLocationID = source.DOLocationID,
                target.payment_type = source.payment_type,
                target.fare_amount = source.fare_amount,
                target.extra = source.extra,
                target.mta_tax = source.mta_tax,
                target.tip_amount = source.tip_amount,
                target.tolls_amount = source.tolls_amount,
                target.total_amount = source.total_amount,
                target.congestion_surcharge = source.congestion_surcharge,
                target.Airport_fee = source.Airport_fee,
                target.updated_at = GETDATE()
        WHEN NOT MATCHED THEN
            INSERT (
                record_id,
                tpep_pickup_datetime,
                tpep_dropoff_datetime,
                passenger_count,
                trip_distance,
                PULocationID,
                DOLocationID,
                payment_type,
                fare_amount,
                extra,
                mta_tax,
                tip_amount,
                tolls_amount,
                total_amount,
                congestion_surcharge,
                Airport_fee,
                created_at
            )
            VALUES (
                source.record_id,
                source.tpep_pickup_datetime,
                source.tpep_dropoff_datetime,
                source.passenger_count,
                source.trip_distance,
                source.PULocationID,
                source.DOLocationID,
                source.payment_type,
                source.fare_amount,
                source.extra,
                source.mta_tax,
                source.tip_amount,
                source.tolls_amount,
                source.total_amount,
                source.congestion_surcharge,
                source.Airport_fee,
                GETDATE()
            );
        """
        with engine.begin() as conn:
            conn.execute(text(merge_sql)) # Merge record into tmp table
            conn.execute(text("TRUNCATE TABLE staging_chunk;")) # Clear the staging table for the next chunk
        
        with open(checkpoint_file, 'w') as f:
            f.write(str(end))
        
        print(f"MERGE successful for rows {start}–{end}\n")
        start = end
    except Exception as e:
        print(f"Error in chunk {start}–{end}: {e}")
        print("Stopping upload to avoid duplicate or partial inserts.")
        break

Uploaded chunk 24095000-24295000 to staging table
MERGE successful for rows 24095000–24295000

Uploaded chunk 24295000-24495000 to staging table
MERGE successful for rows 24295000–24495000

Uploaded chunk 24495000-24695000 to staging table
MERGE successful for rows 24495000–24695000

Uploaded chunk 24695000-24895000 to staging table
MERGE successful for rows 24695000–24895000

Uploaded chunk 24895000-25095000 to staging table
MERGE successful for rows 24895000–25095000

Uploaded chunk 25095000-25295000 to staging table
MERGE successful for rows 25095000–25295000

Uploaded chunk 25295000-25495000 to staging table
MERGE successful for rows 25295000–25495000

Uploaded chunk 25495000-25695000 to staging table
MERGE successful for rows 25495000–25695000

Uploaded chunk 25695000-25895000 to staging table
MERGE successful for rows 25695000–25895000

Uploaded chunk 25895000-26095000 to staging table
MERGE successful for rows 25895000–26095000

Uploaded chunk 26095000-26295000 to staging table
