In [43]:
import pandas as pd
import numpy as np
from datetime import datetime
from sqlalchemy import create_engine
import psycopg2
import pytz
import time

# Batch data

In [37]:
# batch

def convert_date_to_recent_data(df:pd.DataFrame, date_column:str='lpep_pickup_datetime') -> pd.Series:
    """ This function simulates a real-world scenario where the date column is always represented as recent data.
    """
    lpep_pickup_datetime_f = pd.to_datetime(df[date_column].dt.strftime('%Y-%m-%d'))
    today = pd.to_datetime(datetime.now().strftime('%Y-%m-%d'))
    time_diff = today - lpep_pickup_datetime_f
    return lpep_pickup_datetime_f + np.min(time_diff)

def recreate_empty_table(conn, table_name:str='trips') -> None:
    cursor = conn.cursor()

    # Create a table with the desired columns
    create_table_query = f"""

        DROP TABLE IF EXISTS {table_name};

        CREATE TABLE {table_name} (
            time TIMESTAMPTZ,
            trip_distance FLOAT,
            payment_type VARCHAR(10)
        )
        """
    cursor.execute(create_table_query)
    conn.commit()

def insert_sample_data(df:pd.DataFrame, engine, table_name:str='trips') -> None:
    # Insert data from the DataFrame into the table
    engine = create_engine(f'postgresql://postgres:example@localhost:5432/postgres')
    df.to_sql(table_name, engine, if_exists='append', index=False)
    print(f'FINISHED')


####################################
####################################
####################################

df = pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-01.parquet")

time = convert_date_to_recent_data(df)
cols = ['trip_distance', 'payment_type']
df2 = df[cols].copy() # do a smaller copy of the dataframe
df2['time'] = time

# Create a PostgreSQL engine
conn = psycopg2.connect(
    host="localhost",
    database="postgres",
    user="postgres",
    password="example"
)
    
recreate_empty_table(conn)
engine = create_engine(f'postgresql://postgres:example@localhost:5432/postgres')
insert_sample_data(df2, engine)


FINISHED


# Simulate realtime data

In [45]:
df = pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-01.parquet")

time = convert_date_to_recent_data(df)
cols = ['trip_distance', 'payment_type']
df2 = df[cols].copy() # do a smaller copy of the dataframe
df2['time'] = time

# Create a PostgreSQL engine
conn = psycopg2.connect(
    host="localhost",
    database="postgres",
    user="postgres",
    password="example"
)

recreate_empty_table(conn)
df2_ix = 0
while df2_ix < df2.shape[0]:                            # while all rows haven't been processed
    # 
    data_amount = int(np.random.normal(100, 200))       # sort a specific number of rows
    sub_df = df2.iloc[df2_ix:df2_ix+data_amount].copy() # get the subset of the data
    tz = pytz.timezone('Japan')
    sub_df['time'] = tz.localize(datetime.now())        # simulate that data came from specific day
    df2_ix = df2_ix + data_amount                       # update the index
    sub_df.to_sql('trips', engine, if_exists='append', index=False)
    print(f'{sub_df.shape[0]} inserted into the database')
    # sort a integer between 1 and 5 with uniform distribution
    wait_time = np.random.randint(1, 5)
    time.sleep(wait_time)



115 inserted into the database
119 inserted into the database
0 inserted into the database
272 inserted into the database
35 inserted into the database
85 inserted into the database
25 inserted into the database
349 inserted into the database
48 inserted into the database
172 inserted into the database
336 inserted into the database
80 inserted into the database
188 inserted into the database
18 inserted into the database
106 inserted into the database
5 inserted into the database
0 inserted into the database
126 inserted into the database
319 inserted into the database
175 inserted into the database
0 inserted into the database
0 inserted into the database
142 inserted into the database
118 inserted into the database
0 inserted into the database
251 inserted into the database
88 inserted into the database
18 inserted into the database
0 inserted into the database
0 inserted into the database
0 inserted into the database
251 inserted into the database
144 inserted into the database
221

KeyboardInterrupt: 

In [5]:
df.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2024-02-01 00:15:53,2024-02-01 00:24:20,N,1.0,75,161,1.0,2.77,13.5,1.0,0.5,3.75,0.0,,1.0,22.5,1.0,1.0,2.75
1,2,2024-01-31 22:59:22,2024-01-31 23:27:14,N,1.0,43,242,1.0,7.76,36.6,1.0,0.5,0.0,0.0,,1.0,39.1,2.0,1.0,0.0
2,2,2024-02-01 00:30:29,2024-02-01 00:35:32,N,1.0,75,238,1.0,1.03,7.9,1.0,0.5,2.6,0.0,,1.0,13.0,1.0,1.0,0.0
3,2,2024-01-31 23:56:42,2024-02-01 00:06:53,N,1.0,80,17,2.0,1.36,11.4,1.0,0.5,4.17,0.0,,1.0,18.07,1.0,1.0,0.0
4,2,2024-02-01 00:31:14,2024-02-01 00:31:16,N,5.0,95,264,1.0,0.0,50.0,0.0,0.0,1.0,0.0,,1.0,52.0,1.0,2.0,0.0
