In [5]:
import duckdb
import pandas as pd
import os
from sklearn.preprocessing import MinMaxScaler
from fuzzywuzzy import process

In [7]:
def read_and_clean_database(db_path, output_db_path):
    # Connect to the DuckDB database
    conn = duckdb.connect(db_path)
    
    # Get list of all tables in the database
    tables = conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'main';").fetchall()
    
    # Iterate over each table and load into a DataFrame
    for table in tables:
        table_name = table[0]
        print(f"Reading and cleaning table: {table_name}")
        try:
            # Read the table into a DataFrame
            df = conn.execute(f"SELECT * FROM {table_name}").fetchdf()
            
            # Deduplication: Remove duplicate rows
            df = df.drop_duplicates()
            
            # Consistent formatting: Ensure consistent datetime format
            for col in df.select_dtypes(include=['object']).columns:
                try:
                    df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
                except ValueError:
                    pass
            
            # Quality checks: Handle missing values by dropping or imputing
            missing_value_threshold = 0.5
            for col in df.columns:
                if df[col].isnull().mean() > missing_value_threshold:
                    # Drop columns with more than 50% missing values
                    df = df.drop(columns=[col])
                else:
                    # Fill missing values with the median for numeric columns or mode for categorical columns
                    if df[col].dtype in ['int64', 'float64']:
                        df[col].fillna(df[col].median(), inplace=True)
                    else:
                        df[col].fillna(df[col].mode()[0], inplace=True)
            
            # Data normalization: Normalize numeric columns
            numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
            scaler = MinMaxScaler()
            df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
            
            # Save the cleaned DataFrame to a new DuckDB database
            if not os.path.exists(output_db_path):
                output_conn = duckdb.connect(output_db_path)
                output_conn.register('cleaned_df', df)
                output_conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM cleaned_df")
                output_conn.close()
            else:
                output_conn = duckdb.connect(output_db_path)
                try:
                    output_conn.execute(f"INSERT INTO {table_name} SELECT * FROM cleaned_df")
                except duckdb.CatalogException:
                    output_conn.register('cleaned_df', df)
                    output_conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM cleaned_df")
                output_conn.close()
        except Exception as e:
            print(f"Error while cleaning table '{table_name}': {e}")
    
    # Close the connection
    conn.close()

In [8]:
read_and_clean_database('../trusted_zone/trusted.db', 'path/to/treated_trusted.db')

Reading and cleaning table: fotocasa
Error while cleaning table 'fotocasa': unhashable type: 'numpy.ndarray'
Reading and cleaning table: idealista


  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')


Error while cleaning table 'idealista': 'utf-8' codec can't decode byte 0xd5 in position 134: invalid continuation byte
Reading and cleaning table: income
Error while cleaning table 'income': 'utf-8' codec can't decode byte 0xd5 in position 134: invalid continuation byte


  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
  df[col] = pd.to_datetime(df[col], errors='coerce', dayfirst=True).dt.strftime('%d/%m/%Y')
