To build this pipeline we need to
1. connect to the databases - source & destination
2. extract the data
3. load to staging 
4. transform the data (stored procedures)

In [1]:
## import libraries
!pip install clickhouse_connect



In [2]:
## source connection
import clickhouse_connect
import os
from dotenv import load_dotenv

load_dotenv(override=True)

host = os.getenv('ch_host')
username = os.getenv('ch_user')
password = os.getenv('ch_password')
port = os.getenv('ch_port') 

client = clickhouse_connect.get_client(host=host, port=port, username=username, password=password, secure=True)
  

In [3]:
import pandas as pd

result = client.query('select * from tripdata limit 10')
rows = result.result_rows
cols = result.column_names

client.close()

df = pd.DataFrame(rows, columns = cols)
df.head()

Unnamed: 0,pickup_date,id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,...,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,pickup_location_id,dropoff_location_id,junk1,junk2
0,2009-01-01,0,CMT,2009-01-01 00:00:00,2009-01-01 00:05:03,2,0.9,-73.997482,40.725956,,...,0,0.0,0.0,0.0,0.0,5.4,0,0,,
1,2009-01-01,0,CMT,2009-01-01 00:00:00,2009-01-01 00:04:12,1,1.3,-73.965912,40.77124,,...,0,0.0,0.0,0.0,0.0,5.8,0,0,,
2,2009-01-01,0,CMT,2009-01-01 00:00:02,2009-01-01 00:05:40,1,1.0,-73.964798,40.767399,,...,0,0.0,0.0,0.0,0.0,5.8,0,0,,
3,2009-01-01,0,CMT,2009-01-01 00:00:04,2009-01-01 00:03:08,1,0.8,-74.011604,40.708832,,...,0,0.0,0.0,0.0,0.0,4.6,0,0,,
4,2009-01-01,0,CMT,2009-01-01 00:00:07,2009-01-01 00:19:01,1,5.5,-74.000648,40.718575,,...,0,0.0,0.0,0.0,0.0,27.799999,0,0,,


In [4]:
## Loading to database

user = os.getenv('pg_user')
password = os.getenv('pg_password')
host = os.getenv('pg_host')
port = os.getenv('pg_port') 
dbname = os.getenv('pg_dbname')

from sqlalchemy import create_engine
url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"
engine = create_engine(url)
                           



In [5]:
df.to_sql('src_tripdata',con=engine, if_exists='replace', index=False, schema='STG')

10