In [1]:
import pandas as pd
# connection to Postgres
from sqlalchemy import create_engine

## taxi_zone_lookup

In [3]:
# taxi_zone_lookup.csv data table

engine = create_engine('postgresql://postgres:postgres@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x1f701634880>

In [5]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

In [6]:
df_zones = pd.read_csv("taxi_zone_lookup.csv")

In [8]:
schema = pd.io.sql.get_schema(df_zones,name='zones',con=engine)
print(schema)


CREATE TABLE zones (
	"LocationID" BIGINT, 
	"Borough" TEXT, 
	"Zone" TEXT, 
	service_zone TEXT
)




In [9]:
df_zones.to_sql(name='zones', con=engine, if_exists='replace')

265

## yellow_taxi_data

In [2]:
df = pd.read_csv('yellow_tripdata_2021-01.csv', nrows = 100)

In [3]:
# convert text to TIMESTAMP
# 错误处理：无法转换的值变 NaT
# SQL不识别NaT格式 ，需转换为None（SQL NULL）或者NaN
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime,errors='coerce')
df['tpep_pickup_datetime'] = df['tpep_pickup_datetime'].where(df['tpep_pickup_datetime'].notna(), None)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime,errors='coerce')
df['tpep_dropoff_datetime'] = df['tpep_dropoff_datetime'].where(df['tpep_dropoff_datetime'].notna(), None)

In [6]:
# engine
engine = create_engine('postgresql://postgres:postgres@localhost:5432/ny_taxi')

In [7]:
# check the engine
engine.connect()

<sqlalchemy.engine.base.Connection at 0x1eca53438e0>

In [5]:
# pip install sqlalchemy

In [8]:
# Get the SQL schema of the DataFrame --> generate SQL statement
# ddl --> data definition language
# 这个语句只是用来查看 DataFrame 的结构在 SQL 中会被如何映射，不会实际创建表。
# schema = pd.io.sql.get_schema(df,name='yellow_taxi_data')
# 确保 get_schema() 生成符合 PostgreSQL 语法的 SQL
schema = pd.io.sql.get_schema(df,name='yellow_taxi_data',con=engine)
print(schema)


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




<span style="color:darkblue">As we can see, the "tpep_pickup_datetime" TEXT, "tpep_dropoff_datetime" TEXT,is text format, but we need Timestamp format</span>


## Load the data into PostgreSQL with Pandas

In [9]:
# read the data into chunk
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv',iterator=True,chunksize=100000)
df = next(df_iter)

In [10]:
len(df)

100000

In [11]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime,errors='coerce')# 注意处理error情况！！！
df['tpep_pickup_datetime'] = df['tpep_pickup_datetime'].where(df['tpep_pickup_datetime'].notna(), None)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime,errors='coerce')
df['tpep_dropoff_datetime'] = df['tpep_dropoff_datetime'].where(df['tpep_dropoff_datetime'].notna(), None)

In [12]:
df.head(n=0)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge


In [13]:
# 在数据库中创建一个空表，该表结构完全基于 DataFrame 的列名和类型
df.head(n=0).to_sql(name='yellow_taxi_data',con=engine,if_exists='replace')

0

### Insert the data in chunk

In [14]:
%time df.to_sql(name='yellow_taxi_data',con=engine,if_exists='append')

CPU times: total: 3.23 s
Wall time: 9.05 s


1000

In [17]:
from time import time

In [18]:
i = 0
while i <= 5 :
    t_start = time()
    df = next(df_iter)
    
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime,errors='coerce')
    df['tpep_pickup_datetime'] = df['tpep_pickup_datetime'].where(df['tpep_pickup_datetime'].notna(), None)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime,errors='coerce')
    df['tpep_dropoff_datetime'] = df['tpep_dropoff_datetime'].where(df['tpep_dropoff_datetime'].notna(), None)
    
    df.to_sql(name='yellow_taxi_data',con=engine,if_exists='append')
    i += 1
    
    t_end = time()
    print('inserted another chunk...%.3f second' %(t_end - t_start))

inserted another chunk...8.458 second
inserted another chunk...8.248 second
inserted another chunk...8.303 second
inserted another chunk...8.537 second
inserted another chunk...8.139 second
inserted another chunk...8.184 second
