In [36]:
import os
import gdown
import duckdb as db
import pandas as pd 
from sqlalchemy import create_engine
from dotenv import load_dotenv

from duckdb import DuckDBPyRelation

# Loading variable set on .env file
load_dotenv()  

url_files = "https://drive.google.com/drive/u/0/folders/1sHWsb1_Dnu8xTzOznWOqFGBZCs5aLNA9"
local_dir = "./pasta_down"

In [5]:
#------------------------------
# GET FILES FROM GOOGLE DRIVE
#------------------------------
def download_files(url_files, local_dir):
    os.makedirs(local_dir, exist_ok=True)
    gdown.download_folder(url_files, output=local_dir, quiet=False, use_cookies=False)

download_files(url_files=url_files, local_dir=local_dir)

Retrieving folder contents


Processing file 1l_fT39kg5oKoA4SMdKwubUyGXDtIIATX orders_20240817.csv
Processing file 1Ojx4-T89VepW3FHBYNsX0Z1YjmjAyPHy orders_20240818.csv
Processing file 1QQ2yV3RGV89ERIadThUE6Fd1v4ug91iz orders_20240819.csv


Retrieving folder contents completed
Building directory structure
Building directory structure completed
Downloading...
From: https://drive.google.com/uc?id=1l_fT39kg5oKoA4SMdKwubUyGXDtIIATX
To: c:\Users\Henrique Hashimoto\Desktop\repos\duckdb-docker-etl\pasta_down\orders_20240817.csv
100%|██████████| 974/974 [00:00<?, ?B/s] 
Downloading...
From: https://drive.google.com/uc?id=1Ojx4-T89VepW3FHBYNsX0Z1YjmjAyPHy
To: c:\Users\Henrique Hashimoto\Desktop\repos\duckdb-docker-etl\pasta_down\orders_20240818.csv
100%|██████████| 983/983 [00:00<00:00, 984kB/s]
Downloading...
From: https://drive.google.com/uc?id=1QQ2yV3RGV89ERIadThUE6Fd1v4ug91iz
To: c:\Users\Henrique Hashimoto\Desktop\repos\duckdb-docker-etl\pasta_down\orders_20240819.csv
100%|██████████| 983/983 [00:00<00:00, 983kB/s]
Download completed


In [6]:
def list_files_csv(dir):
    csv_files = []

    all_files = os.listdir(dir)
    for file in all_files:
        if file.endswith("csv"): # Getting the csv files 
            path = os.path.join(dir, file)
            csv_files.append(path)
    return csv_files

list_files_csv(local_dir)

['./pasta_down\\orders_20240817.csv',
 './pasta_down\\orders_20240818.csv',
 './pasta_down\\orders_20240819.csv',
 './pasta_down\\titanic_clean.csv']

In [50]:
def read_csv(file_path):
    df_duckb = db.read_csv(file_path).df()
    return df_duckb

In [44]:
def save_on_psql(df_db, table):    
    db_url = os.getenv("DATABASE_URL")
    engine = create_engine(db_url)
    
    df_db = db.sql("""
       select dt.*
       from df_test as dt 
       left join df on dt.order_id = df.order_id
       where df.order_id is null
    """).df()

    df_db.to_sql(table, con=engine, if_exists="append", index=False)

In [52]:
df_test = read_csv("./pasta_down/orders_20240817.csv")
df_test.head()

Unnamed: 0,order_id,sell_date,client_id,product_id,order_value
0,1,2024-01-01,1001,2001,29.99
1,2,2024-01-02,1002,2002,59.99
2,3,2024-01-03,1003,2003,39.99
3,4,2024-01-04,1004,2004,89.99
4,5,2024-01-05,1005,2005,49.99


In [103]:
import psycopg2

def verify_data(query, df_new, key):
    conn = psycopg2.connect(
        dbname='dbduck',
        user=os.getenv("pguser"),
        password=os.getenv("pgpass"),
        host=os.getenv("pghost"),
        port=os.getenv("pgport")
    )

    # select the table    
    df_original = pd.DataFrame(pd.read_sql_query(query, conn))

    # closing the connection
    conn.close()


    #Returning only data that is not on database yet
    df_verified = db.sql(f"""
                        select dn.*
                        from df_new as dn
                        left join df_original as dfo on dn."{key}" = dfo."{key}"
                        where dfo."{key}" is null;
    """).df()

    return df_verified

In [104]:
test_verify = verify_data('select distinct order_id from orders limit 10', df_test, 'order_id')
test_verify.head()

  df_original = pd.DataFrame(pd.read_sql_query(query, conn))


Unnamed: 0,order_id,sell_date,client_id,product_id,order_value
0,11,2024-01-11,1011,2011,39.99
1,12,2024-01-12,1012,2012,29.99
2,13,2024-01-13,1013,2013,49.99
3,14,2024-01-14,1014,2014,69.99
4,15,2024-01-15,1015,2015,79.99


In [116]:
def transform(df):
    # Creating new column
    df_new = db.sql("""
                    select
                        concat(year(sell_date), '-', month(sell_date)) as sell_month,
                        sum(order_value) as total_value
                    from 
                        df
                    group by 
                        concat(year(sell_date), '-', month(sell_date))
                    """).df()
    return df_new

df_t = transform(df_verified)
df_t.head()

Unnamed: 0,sell_month,total_value
0,2024-1,1229.8


In [118]:
def verify_data(query, df_new, key):
    conn = psycopg2.connect(
        dbname='dbduck',
        user=os.getenv("pguser"),
        password=os.getenv("pgpass"),
        host=os.getenv("pghost"),
        port=os.getenv("pgport")
    )

    # select the table    
    df_original = pd.DataFrame(pd.read_sql_query(query, conn))

    # closing the connection
    conn.close()


    #Returning only data that is not on database yet
    df_verified = db.sql(f"""
                        select dn.*
                        from df_new as dn
                        left join df_original as dfo on dn."{key}" = dfo."{key}"
                        where dfo."{key}" is null;
    """).df()

    return df_verified

df_transform_v = verify_data("select distinct sell_month from orders_calc", df_t, "sell_month")
df_transform_v.head()

  df_original = pd.DataFrame(pd.read_sql_query(query, conn))


Unnamed: 0,sell_month,total_value
0,2024-1,1229.8


In [114]:
def transform(df):
    # Creating new column
    df_new = db.sql("""
                    select
                        concat(year(sell_date), '-', month(sell_date)) as sell_month,
                        sum(order_value) as total_value
                    from 
                        df
                    group by 
                        concat(year(sell_date), '-', month(sell_date))
                    """).df()
    return df_new

transform(test_verify)

Unnamed: 0,sell_month,total_value
0,2024-1,1229.8
