In [1]:
import duckdb
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import urllib.request
import zipfile
import io

# ETL

In [353]:
class ETLPipeline:
    """Class untuk mengotomatisasi proses ETL"""
    
    def __init__(self, db_path='brazil_stock_market.db'):
        """Inisialisasi pipeline"""
        self.conn = duckdb.connect(db_path)
        self.initialized = False
        self.log_table_setup()
    
    def log_table_setup(self):
        """Membuat tabel log untuk melacak proses ETL"""
        self.conn.execute("""
        CREATE TABLE IF NOT EXISTS "etlLog" (
            "logId" INTEGER PRIMARY KEY,
            "processName" VARCHAR,
            "startTime" TIMESTAMP,
            "endTime" TIMESTAMP,
            "recordsProcessed" INTEGER,
            "status" VARCHAR,
            "message" VARCHAR
        )
        """)
    
    def log_process_start(self, process_name):
        """Mencatat mulainya proses ETL"""
        log_id = self.conn.execute('SELECT COALESCE(MAX(logId), 0) + 1 FROM "etlLog"').fetchone()[0]
        self.conn.execute("""
        INSERT INTO "etlLog" (logId, processName, startTime, status)
        VALUES (?, ?, CURRENT_TIMESTAMP, 'RUNNING')
        """, [log_id, process_name])
        return log_id
    
    def log_process_end(self, log_id, records=0, status='SUCCESS', message=None):
        """Mencatat selesainya proses ETL"""
        self.conn.execute("""
        UPDATE "etlLog"
        SET endTime = CURRENT_TIMESTAMP,
            recordsProcessed = ?,
            status = ?,
            message = ?
        WHERE logId = ?
        """, [records, status, message, log_id])

    def ensure_common_columns(self):
        """
        Ensure all target tables have 'effectiveDate', 'expirationDate', and 'currentFlag' columns.
        """
        columns_to_add = [
            ("effectiveDate", "DATE"),
            ("expirationDate", "DATE"),
            ("currentFlag", "BOOLEAN")
        ]

        target_tables = ["dimCoin", "dimCompany"]

        for table in target_tables:
            for column_name, column_type in columns_to_add:
                alter_sql = f"""
                ALTER TABLE {table}
                ADD COLUMN IF NOT EXISTS {column_name} {column_type}
                """
                self.conn.execute(alter_sql)
                
    def initialize_warehouse(self):
        """Inisialisasi skema data warehouse jika belum ada"""
        if self.initialized:
            return
        
        log_id = self.log_process_start("initialize_warehouse")
        
        try:
            # Dimensi Company
            self.conn.execute("""
            CREATE TABLE IF NOT EXISTS "dimCompany" (
                "keyCompany" INT NOT NULL,
                "stockCodeCompany" VARCHAR(32) NOT NULL,
                "nameCompany" VARCHAR(64) NOT NULL,
                "sectorCodeCompany" VARCHAR(32) NOT NULL,
                "sectorCompany" VARCHAR(256) NOT NULL,
                "segmentCompany" VARCHAR(256) NOT NULL,
                "startedAt" TIMESTAMP NOT NULL DEFAULT NOW(),
                "endedAt" TIMESTAMP NULL,
                "isActive" BOOLEAN NOT NULL DEFAULT TRUE,
                
                CONSTRAINT companyPK PRIMARY KEY ("keyCompany")
            );
            """)
            
            # Dimensi Coin
            self.conn.execute("""
            CREATE TABLE IF NOT EXISTS "dimCoin" (
                "keyCoin" INT NOT NULL,
                "abbrevCoin" VARCHAR(32) NOT NULL,
                "nameCoin" VARCHAR(32) NOT NULL,
                "symbolCoin" VARCHAR(8) NOT NULL,
                
                CONSTRAINT coinPK PRIMARY KEY (keyCoin)
            );
            """)
            
            # Dimensi Time
            self.conn.execute("""
            CREATE TABLE IF NOT EXISTS "dimTime" (
                "keyTime" INT NOT NULL,
                "datetime" VARCHAR(32) NOT NULL,
                "dayTime" SMALLINT NOT NULL,
                "dayWeekTime" SMALLINT NOT NULL,
                "dayWeekAbbrevTime" VARCHAR(32) NOT NULL,
                "dayWeekCompleteTime" VARCHAR(32) NOT NULL,
                "monthTime" SMALLINT NOT NULL,
                "monthAbbrevTime" VARCHAR(32) NOT NULL,
                "monthCompleteTime" VARCHAR(32) NOT NULL,
                "bimonthTime" SMALLINT NOT NULL,
                "quarterTime" SMALLINT NOT NULL,
                "semesterTime" SMALLINT NOT NULL,
                "yearTime" INT NOT NULL,
                
                CONSTRAINT timePK PRIMARY KEY ("keyTime")
            );
            """)
            
            # Tabel fakta Coins
            self.conn.execute("""
            CREATE TABLE IF NOT EXISTS "factCoins" (
                "keyTime" INT NOT NULL,
                "keyCoin" INT NOT NULL,
                "valueCoin" FLOAT NOT NULL,
                
                FOREIGN KEY (keyTime) REFERENCES dimTime(keyTime),
                FOREIGN KEY (keyCoin) REFERENCES dimCoin(keyCoin),
                CONSTRAINT coinsPK PRIMARY KEY(keyTime, keyCoin)
            );
            """)
            
            # Tabel fakta Stocks
            self.conn.execute("""
            CREATE TABLE IF NOT EXISTS "factStocks" (
                "keyTime" INT NOT NULL,
                "keyCompany" INT NOT NULL,
                "openValueStock" FLOAT NOT NULL,
                "closeValueStock" FLOAT NOT NULL,
                "highValueStock" FLOAT NOT NULL,
                "lowValueStock" FLOAT NOT NULL,
                "quantityStock" FLOAT NOT NULL,
                
                FOREIGN KEY (keyTime) REFERENCES dimTime(keyTime),
                FOREIGN KEY (keyCompany) REFERENCES dimCompany(keyCompany),
                CONSTRAINT stocksPK PRIMARY KEY(keyTime, keyCompany)
            );
            """)
            
            # Menambahkan column SCD
            self.ensure_common_columns()
            
            # Berhasil inisialisasi
            self.initialized = True
            self.log_process_end(log_id, status='SUCCESS', message='Data warehouse schema initialized')
            
        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise
    
    def extract_all_sources(self):
        """Ekstrak data dari semua sumber"""
        log_id = self.log_process_start("extract_all_sources")
        
        try:
            # Bersihkan tabel staging
            self.conn.execute("DROP TABLE IF EXISTS stagingCoin")
            self.conn.execute("DROP TABLE IF EXISTS stagingCompany")
            self.conn.execute("DROP TABLE IF EXISTS stagingTime")
            self.conn.execute("DROP TABLE IF EXISTS stagingCoinValue")
            self.conn.execute("DROP TABLE IF EXISTS stagingStockValue")
            
            # Ekstrak data dari file CSV
            # Create staging tables from CSVs
            self.conn.execute("CREATE TABLE stagingCoin AS SELECT * FROM read_csv_auto('data/dimCoin.csv')")
            self.conn.execute("CREATE TABLE stagingCompany AS SELECT * FROM read_csv_auto('data/dimCompany.csv')")
            self.conn.execute("CREATE TABLE stagingTime AS SELECT * FROM read_csv_auto('data/dimTime.csv')")
            self.conn.execute("CREATE TABLE stagingCoinValue AS SELECT * FROM read_csv_auto('data/factCoins.csv')")
            self.conn.execute("CREATE TABLE stagingStockValue AS SELECT * FROM read_csv_auto('data/factStocks.csv')")
            
            # Hitung total jumlah catatan
            total_records = 0
            total_records += self.conn.execute("SELECT COUNT(*) FROM stagingCoin").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stagingCompany").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stagingTime").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stagingCoinValue").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stagingStockValue").fetchone()[0]
            
            self.log_process_end(log_id, records=total_records, status='SUCCESS')
            return True
            
        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise
    
    def transform_data(self):
        """Transformasi data dari tabel staging"""
        log_id = self.log_process_start("transform_data")
        
        try:
            # 1. Transformasi dimensi Coin
            self.conn.execute("""
            CREATE OR REPLACE TABLE stagingDimCoin AS
            SELECT
                keyCoin,
                abbrevCoin,
                nameCoin,
                symbolCoin,
                '2022-01-01'::DATE AS effectiveDate,
                NULL::DATE AS expirationDate,
                TRUE AS currentFlag
            FROM stagingCoin
            """)

            # 2. Transformasi dimensi Company
            self.conn.execute("""
            CREATE OR REPLACE TABLE stagingDimCompany AS
            SELECT
                keyCompany,
                stockCodeCompany,
                nameCompany,
                sectorCodeCompany,
                sectorCompany,
                segmentCompany,
                NOW() startedAt,
                NULL endedAt,
                TRUE isActive,
                '2022-01-01'::DATE effectiveDate,
                NULL::DATE expirationDate,
                TRUE currentFlag
            FROM stagingCompany
            """)

            # 3. Transformasi dimensi Time dengan translasi nama hari & bulan ke Bahasa Inggris
            self.conn.execute("""
            CREATE OR REPLACE TABLE stagingDimTime AS
            SELECT
                keyTime,
                datetime,
                dayTime,
                dayWeekTime,

                CASE dayWeekAbbrevTime
                    WHEN 'SEG' THEN 'MON'
                    WHEN 'TER' THEN 'TUE'
                    WHEN 'QUA' THEN 'WED'
                    WHEN 'QUI' THEN 'THU'
                    WHEN 'SEX' THEN 'FRI'
                    WHEN 'SAB' THEN 'SAT'
                    WHEN 'DOM' THEN 'SUN'
                    ELSE dayWeekAbbrevTime
                END dayWeekAbbrevTime,

                CASE dayWeekCompleteTime
                    WHEN 'SEGUNDA' THEN 'MONDAY'
                    WHEN 'TERCA' THEN 'TUESDAY'
                    WHEN 'QUARTA' THEN 'WEDNESDAY'
                    WHEN 'QUINTA' THEN 'THURSDAY'
                    WHEN 'SEXTA' THEN 'FRIDAY'
                    WHEN 'SABADO' THEN 'SATURDAY'
                    WHEN 'DOMINGO' THEN 'SUNDAY'
                    ELSE dayWeekCompleteTime
                END dayWeekCompleteTime,
                
                monthTime,

                CASE monthAbbrevTime
                    WHEN 'JAN' THEN 'JAN'
                    WHEN 'FEV' THEN 'FEB'
                    WHEN 'MAR' THEN 'MAR'
                    WHEN 'ABR' THEN 'APR'
                    WHEN 'MAI' THEN 'MAY'
                    WHEN 'JUN' THEN 'JUN'
                    WHEN 'JUL' THEN 'JUL'
                    WHEN 'AGO' THEN 'AUG'
                    WHEN 'SET' THEN 'SEP'
                    WHEN 'OUT' THEN 'OCT'
                    WHEN 'NOV' THEN 'NOV'
                    WHEN 'DEZ' THEN 'DEC'
                    ELSE monthAbbrevTime
                END monthAbbrevTime,
                
                CASE monthCompleteTime
                    WHEN 'JANEIRO' THEN 'JANUARY'
                    WHEN 'FEVEREIRO' THEN 'FEBRUARY'
                    WHEN 'MARCO' THEN 'MARCH'
                    WHEN 'ABRIL' THEN 'APRIL'
                    WHEN 'MAIO' THEN 'MAY'
                    WHEN 'JUNHO' THEN 'JUNE'
                    WHEN 'JULHO' THEN 'JULY'
                    WHEN 'AGOSTO' THEN 'AUGUST'
                    WHEN 'SETEMBRO' THEN 'SEPTEMBER'
                    WHEN 'OUTUBRO' THEN 'OCTOBER'
                    WHEN 'NOVEMBRO' THEN 'NOVEMBER'
                    WHEN 'DEZEMBRO' THEN 'DECEMBER'
                    ELSE monthCompleteTime
                END monthCompleteTime,

                bimonthTime,
                quarterTime,
                semesterTime,
                yearTime
            FROM stagingTime
            """)

            # 4. Transformasi fakta Coins
            self.conn.execute("""
            CREATE OR REPLACE TABLE stagingFactCoins AS
            SELECT
                keyTime,
                keyCoin,
                valueCoin
            FROM stagingCoinValue
            """)

            # 5. Transformasi fakta Stocks
            self.conn.execute("""
            CREATE OR REPLACE TABLE stagingFactStocks AS
            SELECT
                keyTime,
                keyCompany,
                openValueStock,
                closeValueStock,
                highValueStock,
                lowValueStock,
                quantityStock
            FROM stagingStockValue
            """)
            
            # Hitung jumlah catatan
            total_records = 0
            total_records += self.conn.execute("SELECT COUNT(*) FROM stagingDimCoin").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stagingDimCompany").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stagingDimTime").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stagingFactCoins").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stagingFactStocks").fetchone()[0]

            self.log_process_end(log_id, records=total_records, status='SUCCESS')
            return True
            
        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise
    
    def delete_tables_data(self):
        # Hapus data tabel
        self.conn.execute("TRUNCATE factCoins")
        self.conn.execute("TRUNCATE factStocks")
        self.conn.execute("TRUNCATE dimCoin")
        self.conn.execute("TRUNCATE dimCompany")
        self.conn.execute("TRUNCATE dimTime")
    
    def load_data(self):
        """Load data ke data warehouse"""
        log_id = self.log_process_start("load_data")
        
        try:            
            # 1. Muat dimensi Coin
            self.conn.execute("INSERT INTO dimCoin SELECT * FROM stagingDimCoin")
            
            # 2. Muat dimensi Company
            self.conn.execute("INSERT INTO dimCompany SELECT * FROM stagingDimCompany")
            
            # 3. Muat dimensi Time
            self.conn.execute("INSERT INTO dimTime SELECT * FROM stagingDimTime")
            
            # 4. Muat fakta Coins dengan pemetaan kunci yang benar
            self.conn.execute("""
            INSERT INTO factCoins
            SELECT 
                t.keyTime,
                c.keyCoin,
                f.valueCoin
            FROM stagingFactCoins f
            JOIN dimTime t ON f.keyTime = t.keyTime
            JOIN dimCoin c ON f.keyCoin = c.keyCoin
            """)
            
            # 5. Muat fakta Stocks dengan pemetaan kunci yang benar
            self.conn.execute("""
            INSERT INTO factStocks
            SELECT 
                t.keyTime,
                c.keyCompany,
                f.openValueStock,
                f.closeValueStock,
                f.highValueStock,
                f.lowValueStock,
                f.quantityStock
            FROM stagingFactStocks f
            JOIN dimTime t ON f.keyTime = t.keyTime
            JOIN dimCompany c ON f.keyCompany = c.keyCompany
            """)
            
            # Menghitung jumlah baris yang dimuat
            coin_count = self.conn.execute("SELECT COUNT(*) FROM dimCoin").fetchone()[0]
            company_count = self.conn.execute("SELECT COUNT(*) FROM dimCompany").fetchone()[0]
            time_count = self.conn.execute("SELECT COUNT(*) FROM dimTime").fetchone()[0]
            fact_coin_count = self.conn.execute("SELECT COUNT(*) FROM factCoins").fetchone()[0]
            fact_stock_count = self.conn.execute("SELECT COUNT(*) FROM factStocks").fetchone()[0]

            total_records = coin_count + company_count + time_count + fact_coin_count + fact_stock_count

            self.log_process_end(log_id, records=total_records, status='SUCCESS', 
                message=f'Loaded {coin_count} coins, {company_count} companies, '
                        f'{time_count} time records, {fact_coin_count} coin facts, '
                        f'{fact_stock_count} stock facts')
            return True
            
        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise
    
    def delete_staging_table_value(self):
        self.conn.execute("DELETE FROM stagingFactCoins")
        self.conn.execute("DELETE FROM stagingFactStocks")
        self.conn.execute("DELETE FROM stagingDimTime")
        self.conn.execute("DELETE FROM stagingDimCoin")
        self.conn.execute("DELETE FROM stagingDimCompany")
        

    def insert_coin_data(self, csv_path):
        self.delete_staging_table_value()
        # Load CSV
        df = pd.read_csv(csv_path, parse_dates=['time'])
        key_time = con.execute("SELECT MAX(keyTime) FROM dimTime").fetchone()[0]

        for _, row in df.iterrows():
            time = row['time']
            coin_abbrev = row['coin']
            value_coin = row['valueCoin']

            # 1. Check if datetime exists in dimTime
            time_row = self.conn.execute("SELECT keyTime FROM dimTime WHERE datetime = ?", (time.strftime('%Y-%m-%d'),)).fetchone()

            if time_row:
                key_time = time_row[0]
            else:
                # Prepare datetime parts
                day = time.day
                day_of_week = time.weekday() + 1  # Monday=0 → SQL: Monday=1
                day_abbrev = time.strftime('%a').upper()
                day_full = time.strftime('%A').upper()
                month = time.month
                month_abbrev = time.strftime('%b').upper()
                month_full = time.strftime('%B').upper()
                bimonth = ((month - 1) // 2) + 1
                quarter = ((month - 1) // 3) + 1
                semester = 1 if month <= 6 else 2
                year = time.year

                # Insert into stagingDimTime
                self.conn.execute("""
                    INSERT INTO stagingDimTime (
                        keyTime, datetime, dayTime, dayWeekTime, dayWeekAbbrevTime,
                        dayWeekCompleteTime, monthTime, monthAbbrevTime,
                        monthCompleteTime, bimonthTime, quarterTime,
                        semesterTime, yearTime
                    ) VALUES (
                        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
                    )
                """, (
                    key_time + 1 , time, day, day_of_week, day_abbrev,
                    day_full, month, month_abbrev, month_full,
                    bimonth, quarter, semester, year
                ))

                # Get the new keyTime (assumes auto-increment or computed keyTime)
                key_time_id = self.conn.execute("SELECT keyTime FROM stagingDimTime WHERE datetime = ?", (time,)).fetchone()[0]

            # 2. Get keyCoin from dimCoin
            key_coin_row = self.conn.execute("SELECT keyCoin FROM dimCoin WHERE abbrevCoin = ?", (coin_abbrev,)).fetchone()
            if not key_coin_row:
                print(f"⚠️ Coin '{coin_abbrev}' not found in dimCoin. Skipping row.")
                continue
            key_coin = key_coin_row[0]

            # 3. Insert into factCoins
            self.conn.execute("""
                INSERT INTO stagingFactCoins (
                    keyTime, keyCoin, valueCoin
                ) VALUES (
                    ?, ?, ?
                )
            """, (key_time_id, key_coin, value_coin))
            
            key_time = key_time + 1

        self.load_data()
        print("✅ Data successfully inserted into factCoins.")
    
    def run_full_pipeline(self):
        """Jalankan seluruh pipeline ETL"""
        print("Menjalankan ETL Pipeline lengkap...")
        
        start_time = datetime.now()
        
        try:                
            # 1. Inisialisasi warehouse
            self.initialize_warehouse()
            print("✓“ Data warehouse diinisialisasi")
            
            # 2. Ekstrak data
            self.extract_all_sources()
            print("✓“ Data diekstrak dari semua sumber")
            
            # 3. Transform data
            self.transform_data()
            print("✓“ Data ditransformasi")
            
            # 4. Load data
            self.delete_tables_data()
            self.load_data()
            print("✓“ Data dimuat ke data warehouse")
            
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            
            print(f"\nPipeline ETL selesai dalam {duration:.2f} detik")
            
            # Menampilkan statistik
            stats = self.conn.execute("""
                SELECT 
                    (SELECT COUNT(*) FROM dimCoin) as coin_count,
                    (SELECT COUNT(*) FROM dimCompany) as company_count,
                    (SELECT COUNT(*) FROM dimTime) as time_count,
                    (SELECT COUNT(*) FROM factCoins) as fact_coin_count,
                    (SELECT COUNT(*) FROM factStocks) as fact_stock_count
            """).fetchone()

            print("\nStatistik Data Warehouse:")
            print(f"✓ Dimensi Coin: {stats[0]} baris")
            print(f"✓ Dimensi Company: {stats[1]} baris")
            print(f"✓ Dimensi Time: {stats[2]} baris")
            print(f"✓ Fakta Coin: {stats[3]} baris")
            print(f"✓ Fakta Stock: {stats[4]} baris")
            
            return True
            
        except Exception as e:
            print(f"ERROR: Pipeline ETL gagal: {str(e)}")
            return False
    
    def get_etl_log(self, limit=10):
        """Menampilkan log ETL terakhir"""
        return self.conn.execute(f"""
            SELECT 
                logId,
                processName,
                startTime,
                endTime,
                EXTRACT(EPOCH FROM (endTime - startTime)) as duration_seconds,
                recordsProcessed,
                status,
                message
            FROM etlLog
            ORDER BY logId DESC
            LIMIT {limit}
        """).fetchdf()
    
    def __del__(self):
        """Menutup koneksi saat objek dihapus"""
        try:
            if hasattr(self, 'conn'):
                self.conn.close()
                print("Koneksi DuckDB ditutup.")
        except:
            pass

In [386]:
pipeline = ETLPipeline()
pipeline.run_full_pipeline()

Menjalankan ETL Pipeline lengkap...
✓“ Data warehouse diinisialisasi
✓“ Data diekstrak dari semua sumber
✓“ Data ditransformasi
✓“ Data dimuat ke data warehouse

Pipeline ETL selesai dalam 7.01 detik

Statistik Data Warehouse:
✓ Dimensi Coin: 2 baris
✓ Dimensi Company: 607 baris
✓ Dimensi Time: 9680 baris
✓ Fakta Coin: 13300 baris
✓ Fakta Stock: 680150 baris


True

In [387]:
pipeline.insert_coin_data(csv_path="data/generated/coin_values.csv")

✅ Data successfully inserted into factCoins.


In [388]:
# Menutup kelas ETLPipeline
del pipeline

Koneksi DuckDB ditutup.


In [43]:
pipeline.get_etl_log()

Unnamed: 0,logId,processName,startTime,endTime,duration_seconds,recordsProcessed,status,message
0,9,transform_data,2025-03-20 22:26:13.063,2025-03-20 22:26:13.416,0.353,703741,SUCCESS,
1,8,extract_all_sources,2025-03-20 22:26:12.219,2025-03-20 22:26:13.058,0.839,703741,SUCCESS,
2,7,initialize_warehouse,2025-03-20 22:26:12.202,2025-03-20 22:26:12.216,0.014,0,SUCCESS,Data warehouse schema initialized
3,6,transform_data,2025-03-20 22:25:18.547,2025-03-20 22:25:18.888,0.341,703741,SUCCESS,
4,5,extract_all_sources,2025-03-20 22:25:17.688,2025-03-20 22:25:18.543,0.855,703741,SUCCESS,
5,4,initialize_warehouse,2025-03-20 22:25:17.671,2025-03-20 22:25:17.685,0.014,0,SUCCESS,Data warehouse schema initialized
6,3,transform_data,2025-03-20 22:20:10.415,2025-03-20 22:20:10.420,0.005,0,ERROR,"Binder Error: Referenced column ""coin_id"" not ..."
7,2,extract_all_sources,2025-03-20 22:20:09.548,2025-03-20 22:20:10.411,0.863,703741,SUCCESS,
8,1,initialize_warehouse,2025-03-20 22:20:09.534,2025-03-20 22:20:09.545,0.011,0,SUCCESS,Data warehouse schema initialized


# Generate Random Data

In [383]:
def generate_random_coin_csv(filepath='data/generated/coin_values.csv', num_rows=20, start_date='2021-01-01'):
    # Extract the directory from the filename
    dir_path = os.path.dirname(filepath)

    # Create directory (and all intermediate directories) if not exist
    os.makedirs(dir_path, exist_ok=True)

    start_dt = pd.to_datetime(start_date)
    
    # Generate random dates using timedelta offsets
    random_days = np.random.randint(0, 366 * 4, size=num_rows)
    time = start_dt + pd.to_timedelta(random_days, unit='D')
    
    # Random coins
    coins = np.random.choice(['USD', 'EUR'], size=num_rows)
    
    # Random float values
    valueCoin = np.round(np.random.uniform(5.0, 10.0, size=num_rows), 2)
    
    # Create DataFrame
    df = pd.DataFrame({
        'time': time,
        'coin': coins,
        'valueCoin': valueCoin
    })
    
    df.to_csv(filepath, index=False)
    print(f"✅ CSV generated using NumPy + Pandas: {filepath}")

In [384]:
generate_random_coin_csv()

✅ CSV generated using NumPy + Pandas: data/generated/coin_values.csv


# Debug

In [253]:
# Connect to DuckDB (in-memory or file)
con = duckdb.connect('brazil_stock_market.db')  # or ':memory:' for in-memory DB

In [251]:
con.close()

In [273]:
con.execute("DELETE FROM DimCoin")
con.execute("TRUNCATE dimCoin")

<duckdb.duckdb.DuckDBPyConnection at 0x2ca10ef11f0>

In [327]:
con.execute("SELECT * FROM stagingfactCoins").df()

Unnamed: 0,keyTime,keyCoin,valueCoin
0,9681,1,9.64
1,9682,1,8.67
2,9683,1,6.86
3,9684,2,9.43
4,9685,2,7.85
...,...,...,...
95,9710,2,5.95
96,9711,1,6.60
97,9712,2,9.12
98,9713,1,8.08


## Debug Table

In [373]:
con.execute("SELECT * FROM stagingDimTime WHERE keyTime = 9683").df()

Unnamed: 0,keyTime,datetime,dayTime,dayWeekTime,dayWeekAbbrevTime,dayWeekCompleteTime,monthTime,monthAbbrevTime,monthCompleteTime,bimonthTime,quarterTime,semesterTime,yearTime
0,9683,2024-10-21,21,1,MON,MONDAY,10,OCT,OCTOBER,5,4,2,2024


In [247]:
con.execute("PRAGMA show_tables_expanded").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,brazil_stock_market,main,dimCoin,"[keyCoin, abbrevCoin, nameCoin, symbolCoin, ef...","[INTEGER, VARCHAR, VARCHAR, VARCHAR, DATE, DAT...",False
1,brazil_stock_market,main,dimCompany,"[keyCompany, stockCodeCompany, nameCompany, se...","[INTEGER, VARCHAR, VARCHAR, VARCHAR, VARCHAR, ...",False
2,brazil_stock_market,main,dimTime,"[keyTime, datetime, dayTime, dayWeekTime, dayW...","[INTEGER, VARCHAR, SMALLINT, SMALLINT, VARCHAR...",False
3,brazil_stock_market,main,etlLog,"[logId, processName, startTime, endTime, recor...","[INTEGER, VARCHAR, TIMESTAMP, TIMESTAMP, INTEG...",False
4,brazil_stock_market,main,factCoins,"[keyTime, keyCoin, valueCoin]","[INTEGER, INTEGER, FLOAT]",False
5,brazil_stock_market,main,factStocks,"[keyTime, keyCompany, openValueStock, closeVal...","[INTEGER, INTEGER, FLOAT, FLOAT, FLOAT, FLOAT,...",False
6,brazil_stock_market,main,stagingCoin,"[keyCoin, abbrevCoin, nameCoin, symbolCoin]","[BIGINT, VARCHAR, VARCHAR, VARCHAR]",False
7,brazil_stock_market,main,stagingCoinValue,"[keyTime, keyCoin, valueCoin]","[BIGINT, BIGINT, DOUBLE]",False
8,brazil_stock_market,main,stagingCompany,"[keyCompany, stockCodeCompany, nameCompany, se...","[BIGINT, VARCHAR, VARCHAR, VARCHAR, VARCHAR, V...",False
9,brazil_stock_market,main,stagingDimCoin,"[keyCoin, abbrevCoin, nameCoin, symbolCoin, ef...","[BIGINT, VARCHAR, VARCHAR, VARCHAR, DATE, DATE...",False


## Debug show tables

In [183]:
# Show all tables
tables = con.execute("SHOW TABLES").fetchall()

# Print the list of tables
for table in tables:
    print(table[0])

dimCoin
dimCompany
dimTime
etlLog
factCoins
factStocks
stagingCoin
stagingCoinValue
stagingCompany
stagingDimCoin
stagingDimCompany
stagingDimTime
stagingFactCoins
stagingFactStocks
stagingStockValue
stagingTime


## Debug delete all tables

In [385]:
con.execute(f"DROP TABLE IF EXISTS factCoins")
con.execute(f"DROP TABLE IF EXISTS factStocks")

# Get list of all tables in the 'main' schema
tables = con.execute("""
    SELECT table_name 
    FROM information_schema.tables 
    WHERE table_schema = 'main'
""").fetchall()

# Drop each table
for table in tables:
    table_name = table[0]
    print(table_name)
    con.execute(f"DROP TABLE IF EXISTS {table_name}")

print("All tables deleted.")


dimCoin
dimCompany
dimTime
etlLog
stagingCoin
stagingCoinValue
stagingCompany
stagingDimCoin
stagingDimCompany
stagingDimTime
stagingFactCoins
stagingFactStocks
stagingStockValue
stagingTime
All tables deleted.
