In [None]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
import os
import re

load_dotenv()
database_url = os.getenv("DATABASE_URL")

table_name = 'landing_pages'
category = 'behavior'

data_types = {
    'ga:pagePath': 'object',
    'ga:date': 'object',
    'ga:sessions': 'int64',
    'ga:newUsers': 'int64',
    'ga:bounces': 'int64',
    'ga:transactions': 'int64',
    'ga:transactionRevenue': 'float64',
    'ga:pageviews': 'int64',
    'ga:uniquePageviews': 'int64',
    'ga:entrances': 'int64',
}

directory = f'data/{category}/{table_name}'
tables = []



for filename in os.listdir(directory):
    if os.path.isfile(os.path.join(directory, filename)) and filename.endswith('.csv'):
        # Perform operations on the file
        view_name = filename.split('_')[0]

        df = pd.read_csv(f"{directory}/{filename}", dtype=data_types)

        for col in data_types.keys():
            if col not in df.columns:
                print(f'Column {col} not found in the dataset')
            assert col in df.columns

        df.columns = df.columns.str.replace('ga:', '')
        df.columns = df.columns.map(lambda x: re.sub(r'([A-Z])', r'_\1', x).lower())

        df['date'] = pd.to_datetime(df['date'], format='%Y%m%d')

        df['view_name'] = view_name

        tables.append(df)


df = pd.concat(tables)
print(f'{len(df)} records to upload...')


df.to_sql(table_name, database_url, if_exists='replace', chunksize=50000, index=False)

print('Done!')