In [None]:
from os import environ
import logging
import duckdb
import pandas as pd
import psycopg
from sqlalchemy import create_engine

In [None]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%dT%H:%M:%S',
)

# Load Data

In [None]:
sql = '''
SELECT
    bikeid AS bike_id, 
    date_trunc('month', start_time) AS month, 
    COUNT(bikeid) AS count 
FROM rides 
GROUP BY 
    bikeid, date_trunc('month', start_time)
;
'''

db_file = environ.get('BIKES_DB', '../../data/bikes.ddb')
logging.info('loading rides from %s', db_file)
with create_engine('duckdb:///' + db_file).connect() as conn:
    df = pd.read_sql(sql, conn)
logging.info('loaded %d rides', len(df))

# Cleanup


In [None]:
logging.info('cleaning')
df = df[~pd.isnull(df['bike_id'])].copy()
df['bike_id'] = df['bike_id'].astype('int64')
logging.info('size after cleanup: %d', len(df))

# Upload

In [None]:
db_user = environ.get('DB_USER', 'postgres')
db_passwd = environ.get('DB_PASSWD', 's3cr3t')
db_host = environ.get('DB_HOST', 'localhost')

logging.info('%s uploading to %s', db_user, db_host)
url = f'postgresql+psycopg://{db_user}:{db_passwd}@{db_host}'
with create_engine(url).connect() as conn:
    df.to_sql('monthly_rides', conn, if_exists='append', index=False)
logging.info('upload done')