<a href="https://colab.research.google.com/github/erikwidman/DuckDB/blob/main/DuckDB_Playground.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#DuckDB Playground

A notebook to get familiar with DuckDB, train models directly from a database without using pandas, and using Dask to parallelize the training.

*Duck DB runs inside your python environment.*

**Introduction to DuckDB:**

DuckDB is an in-process SQL OLAP (Online Analytical Processing) database designed for fast query performance, ease of use, and seamless integration with Python, R, and other programming environments. It offers several key benefits:

- **Fast Query Execution**: DuckDB uses vectorized query execution (fast joins), enabling high-speed analytics on large datasets.
- **No Server Overhead**: As an in-process database, it runs directly within your application, eliminating the need for a separate database server.
- **Zero Configuration**: It is easy to set up and requires no complex configurations, making it ideal for quick prototyping and analysis.
- **Efficient Storage**: DuckDB supports efficient storage formats like Parquet, allowing you to query large datasets directly from these files without the need for preprocessing.
- **Columnar Storage**: DuckDB uses a columnar storage format similar to Parquet, optimizing both memory usage and query performance, especially for analytical workloads.
- **Parallel Execution**: DuckDB performs parallel query execution across multiple CPU cores by default, making it highly efficient for running complex analytical queries.
- **Lightweight & Portable**: Its lightweight nature allows it to be embedded within applications, making it highly portable for different environments.
- **Python UDFs (User Defined Functions)**: You can write Python functions and use them directly in SQL queries, allowing for custom operations that SQL does not support natively.

This makes DuckDB an excellent choice for data exploration, ad-hoc analysis, and integrating into data science workflows.

https://duckdb.org/


##Load Data

In [1]:
import pandas as pd
from google.colab import drive
from tabulate import tabulate
import duckdb

drive.mount('/content/drive')

#Load data into data frame
df = pd.read_csv('/content/drive/My Drive/Learning/Datasets/Airbnb_Open_Data.csv')

# Make all headers lowercase and use underscores in spaces
df.columns = df.columns.str.lower().str.replace(' ', '_')

#View all columns
pd.set_option('display.max_columns', None)
df.head()

Mounted at /content/drive


  df = pd.read_csv('/content/drive/My Drive/Learning/Datasets/Airbnb_Open_Data.csv')


Unnamed: 0,id,name,host_id,host_identity_verified,host_name,neighbourhood_group,neighbourhood,lat,long,country,country_code,instant_bookable,cancellation_policy,room_type,construction_year,price,service_fee,minimum_nights,number_of_reviews,last_review,reviews_per_month,review_rate_number,calculated_host_listings_count,availability_365,house_rules,license
0,1001254,Clean & quiet apt home by the park,80014485718,unconfirmed,Madaline,Brooklyn,Kensington,40.64749,-73.97237,United States,US,False,strict,Private room,2020.0,$966,$193,10.0,9.0,10/19/2021,0.21,4.0,6.0,286.0,Clean up and treat the home the way you'd like...,
1,1002102,Skylit Midtown Castle,52335172823,verified,Jenna,Manhattan,Midtown,40.75362,-73.98377,United States,US,False,moderate,Entire home/apt,2007.0,$142,$28,30.0,45.0,5/21/2022,0.38,4.0,2.0,228.0,Pet friendly but please confirm with me if the...,
2,1002403,THE VILLAGE OF HARLEM....NEW YORK !,78829239556,,Elise,Manhattan,Harlem,40.80902,-73.9419,United States,US,True,flexible,Private room,2005.0,$620,$124,3.0,0.0,,,5.0,1.0,352.0,"I encourage you to use my kitchen, cooking and...",
3,1002755,,85098326012,unconfirmed,Garry,Brooklyn,Clinton Hill,40.68514,-73.95976,United States,US,True,moderate,Entire home/apt,2005.0,$368,$74,30.0,270.0,7/5/2019,4.64,4.0,1.0,322.0,,
4,1003689,Entire Apt: Spacious Studio/Loft by central park,92037596077,verified,Lyndon,Manhattan,East Harlem,40.79851,-73.94399,United States,US,False,moderate,Entire home/apt,2009.0,$204,$41,10.0,9.0,11/19/2018,0.1,3.0,1.0,289.0,"Please no smoking in the house, porch or on th...",


###Clean data

Correct datatypes and remove $

In [2]:
# Remove $ from price and service_fee and convert to integers
df['price'] = df['price'].str.replace('$', '').str.replace(',', '').astype(float).fillna(0).astype(int)
df['service_fee'] = df['service_fee'].str.replace('$', '').str.replace(',', '').astype(float).fillna(0).astype(int)

# Convert construction_year, minimum_nights, number_of_reviews, availability_365 to integer
df['construction_year'] = df['construction_year'].fillna(0).astype(int)
df['minimum_nights'] = df['minimum_nights'].fillna(0).astype(int)
df['number_of_reviews'] = df['number_of_reviews'].fillna(0).astype(int)
df['availability_365'] = df['availability_365'].fillna(0).astype(int)
df['calculated_host_listings_count'] = df['calculated_host_listings_count'].fillna(0).astype(int)

# Convert review_rate_number to object
df['review_rate_number'] = df['review_rate_number'].fillna(0).astype(object)


In [3]:
df.head()

Unnamed: 0,id,name,host_id,host_identity_verified,host_name,neighbourhood_group,neighbourhood,lat,long,country,country_code,instant_bookable,cancellation_policy,room_type,construction_year,price,service_fee,minimum_nights,number_of_reviews,last_review,reviews_per_month,review_rate_number,calculated_host_listings_count,availability_365,house_rules,license
0,1001254,Clean & quiet apt home by the park,80014485718,unconfirmed,Madaline,Brooklyn,Kensington,40.64749,-73.97237,United States,US,False,strict,Private room,2020,966,193,10,9,10/19/2021,0.21,4.0,6,286,Clean up and treat the home the way you'd like...,
1,1002102,Skylit Midtown Castle,52335172823,verified,Jenna,Manhattan,Midtown,40.75362,-73.98377,United States,US,False,moderate,Entire home/apt,2007,142,28,30,45,5/21/2022,0.38,4.0,2,228,Pet friendly but please confirm with me if the...,
2,1002403,THE VILLAGE OF HARLEM....NEW YORK !,78829239556,,Elise,Manhattan,Harlem,40.80902,-73.9419,United States,US,True,flexible,Private room,2005,620,124,3,0,,,5.0,1,352,"I encourage you to use my kitchen, cooking and...",
3,1002755,,85098326012,unconfirmed,Garry,Brooklyn,Clinton Hill,40.68514,-73.95976,United States,US,True,moderate,Entire home/apt,2005,368,74,30,270,7/5/2019,4.64,4.0,1,322,,
4,1003689,Entire Apt: Spacious Studio/Loft by central park,92037596077,verified,Lyndon,Manhattan,East Harlem,40.79851,-73.94399,United States,US,False,moderate,Entire home/apt,2009,204,41,10,9,11/19/2018,0.1,3.0,1,289,"Please no smoking in the house, porch or on th...",


##Install DuckDB & ingest data

In [4]:
#!pip install duckdb

In [5]:
# Create a DuckDB database
con = duckdb.connect('airbnb.db')

# Create a table in the database from the pandas DataFrame - Super cool!
con.execute("CREATE OR REPLACE TABLE airbnb_data AS SELECT * FROM df")

<duckdb.duckdb.DuckDBPyConnection at 0x7e9c83dd23b0>

In [6]:
# View the first five rows of the table
result = con.execute("SELECT * FROM airbnb_data LIMIT 5").df()
print(tabulate(result, headers='keys', tablefmt='psql'))


+----+---------+--------------------------------------------------+-------------+--------------------------+-------------+-----------------------+-----------------+---------+----------+---------------+----------------+--------------------+-----------------------+-----------------+---------------------+---------+---------------+------------------+---------------------+---------------+---------------------+----------------------+----------------------------------+--------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|    |      id | name                                             |     host_id | host_identit

Let's describe the data in the table.

In [7]:
result = con.execute("DESCRIBE airbnb_data").df()
print(tabulate(result, headers='keys', tablefmt='psql'))

+----+--------------------------------+---------------+--------+-------+-----------+---------+
|    | column_name                    | column_type   | null   | key   | default   | extra   |
|----+--------------------------------+---------------+--------+-------+-----------+---------|
|  0 | id                             | BIGINT        | YES    |       |           |         |
|  1 | name                           | VARCHAR       | YES    |       |           |         |
|  2 | host_id                        | BIGINT        | YES    |       |           |         |
|  3 | host_identity_verified         | VARCHAR       | YES    |       |           |         |
|  4 | host_name                      | VARCHAR       | YES    |       |           |         |
|  5 | neighbourhood_group            | VARCHAR       | YES    |       |           |         |
|  6 | neighbourhood                  | VARCHAR       | YES    |       |           |         |
|  7 | lat                            | DOUBLE    

##SQL Querries

How many rows are in the table?

In [8]:
# This line executes an SQL query on the DuckDB connection con
result = con.execute("SELECT count(*) FROM airbnb_data").df()
print(tabulate(result, headers='keys', tablefmt='psql'))


+----+----------------+
|    |   count_star() |
|----+----------------|
|  0 |         102599 |
+----+----------------+


How many listings in new york are priced over $500?

In [9]:
result = con.execute("SELECT count(*) FROM airbnb_data WHERE price > 500 AND neighbourhood_group = 'Manhattan'").df()
print(tabulate(result, headers='keys', tablefmt='psql'))

+----+----------------+
|    |   count_star() |
|----+----------------|
|  0 |          26442 |
+----+----------------+


Count the number of nulls in the `name` field.

In [10]:

result = con.execute("SELECT COUNT(*) FROM airbnb_data WHERE name IS NULL").df()
print(tabulate(result, headers='keys', tablefmt='psql'))

+----+----------------+
|    |   count_star() |
|----+----------------|
|  0 |            250 |
+----+----------------+


Show the tables in the database.

In [11]:
result = con.execute("SHOW TABLES").df()
print(tabulate(result, headers='keys', tablefmt='psql'))

+----+-------------+
|    | name        |
|----+-------------|
|  0 | airbnb_data |
+----+-------------+


## UDF in SQL

Create a UDF and run it in the SQL querry. Note that DuckDB requires you to define the data type.

In [12]:
from duckdb.typing import DOUBLE

def total_price(price: DOUBLE, minimum_nights: DOUBLE) -> DOUBLE:
  """Calculates the total price of an Airbnb listing.

  Args:
    price: The price per night for the listing.
    minimum_nights: The minimum number of nights required to book.

  Returns:
    The total price of the Airbnb listing.
  """
  return price * minimum_nights

In [13]:
# Register the UDF with DuckDB, explicitly setting the return type
con.create_function("total_price", total_price, return_type=DOUBLE)

<duckdb.duckdb.DuckDBPyConnection at 0x7e9c83dd23b0>

In [14]:
# Use the UDF in a SQL query!!! Limit to 10 rows
result = con.execute("SELECT total_price(price, minimum_nights) AS total_price FROM airbnb_data LIMIT 10").df()
print(tabulate(result, headers='keys', tablefmt='psql'))

+----+---------------+
|    |   total_price |
|----+---------------|
|  0 |          9660 |
|  1 |          4260 |
|  2 |          1860 |
|  3 |         11040 |
|  4 |          2040 |
|  5 |          1731 |
|  6 |          3195 |
|  7 |         47700 |
|  8 |          2036 |
|  9 |           582 |
+----+---------------+


## Build a ML model from DuckDB

### Load data
Let's use the California houseing dataset to build a simple XGboost model.

In [15]:
# For this example, let's use a built-in dataset and simulate a binary target.
df2 = pd.read_csv('sample_data/california_housing_train.csv')

In [16]:
# Create a table in the database from the pandas DataFrame - Super cool!
con.execute("CREATE OR REPLACE TABLE california_housing_train AS SELECT * FROM df2")

<duckdb.duckdb.DuckDBPyConnection at 0x7e9c83dd23b0>

Let's verify both of our tables are in our database.

In [17]:
#View tables in database
result = con.execute("SHOW TABLES").df()
print(tabulate(result, headers='keys', tablefmt='psql'))

+----+--------------------------+
|    | name                     |
|----+--------------------------|
|  0 | airbnb_data              |
|  1 | california_housing_train |
+----+--------------------------+


### Build Model

Let's build a model without using pandas (reading data directly from the database using SQL) to show that we can go beyond the memory contraints of pandas.

In [18]:
import numpy as np
import xgboost as xgb
from sklearn.model_selection import cross_val_score, KFold
from sklearn.metrics import mean_absolute_error

In [19]:
#Print the first 5 values from california_housing_train table

result = con.execute("SELECT * FROM california_housing_train LIMIT 5").df()
print(tabulate(result, headers='keys', tablefmt='psql'))

+----+-------------+------------+----------------------+---------------+------------------+--------------+--------------+-----------------+----------------------+
|    |   longitude |   latitude |   housing_median_age |   total_rooms |   total_bedrooms |   population |   households |   median_income |   median_house_value |
|----+-------------+------------+----------------------+---------------+------------------+--------------+--------------+-----------------+----------------------|
|  0 |     -114.31 |      34.19 |                   15 |          5612 |             1283 |         1015 |          472 |          1.4936 |                66900 |
|  1 |     -114.47 |      34.4  |                   19 |          7650 |             1901 |         1129 |          463 |          1.82   |                80100 |
|  2 |     -114.56 |      33.69 |                   17 |           720 |              174 |          333 |          117 |          1.6509 |                85700 |
|  3 |     -114.57 |  

####Fetch Data

In [20]:
query = """
    SELECT
        housing_median_age,
        total_rooms,
        total_bedrooms,
        population,
        households,
        median_income,
        median_house_value
    FROM
        california_housing_train;
    """

    # Execute the query and fetch the result as a NumPy array
result = con.execute(query)
data = result.fetchnumpy()

In [21]:
type(data)

dict

In [22]:
data.keys()

dict_keys(['housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value'])

####Prepare the data

In [23]:
# Assuming 'data' has a consistent structure across all keys. Convert values from dict keys into a list and construct a 2D array
data_array = np.array(list(data.values())).transpose()

# Now you can perform slicing accross 2D data Array
X = data_array[:, :-1]  # Features, all fields except median_house_value
y = data_array[:, -1]   # Target variable

In [24]:
# Check the shape of your 2D array
X.shape

(17000, 6)

The shape looks good, 17k rows, 6 columns for the features as we would expect.

####XGBoost model

In [76]:
import time

#Strat a timer
start_time = time.time()

model = xgb.XGBRegressor(objective='reg:squarederror', n_estimators=100, random_state=42, eval_metric='mae')

# Perform 5-fold cross-validation with MAE
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, X, y, cv=kfold, scoring='neg_mean_absolute_error')

# Calculate the average MAE
mae = -scores.mean()
print(f"Mean Absolute Error from cross-validation: {mae}")

# Fit the model on the entire dataset
model.fit(X, y)

# End timer
end_time = time.time()
elapsed_time = end_time - start_time

print(f"Elapsed time: {elapsed_time:.2f} seconds")

Mean Absolute Error from cross-validation: 48724.84331198299
Elapsed time: 3.13 seconds


####Load test data and predict

Load test data into duck db, Check MAE on the TEST set, and predict the first 10 values.

In [26]:
#Load test data into a data frame
df3 = pd.read_csv('sample_data/california_housing_test.csv')

#Create a table in the database from the pandas DataFrame
con.execute("CREATE OR REPLACE TABLE california_housing_test AS SELECT * FROM df3")

<duckdb.duckdb.DuckDBPyConnection at 0x7e9c83dd23b0>

In [27]:
# Fetch test data using a SQL query
query = """
SELECT
    housing_median_age,
    total_rooms,
    total_bedrooms,
    population,
    households,
    median_income,
    median_house_value
FROM
    california_housing_test;
"""
result = con.execute(query)
data_test = result.fetchnumpy()

#Create 2D data array
data_test_array = np.array(list(data_test.values())).transpose()

# Prepare test data
X_test = data_test_array[:, :-1]
y_test = data_test_array[:, -1]

In [28]:
X_test.shape

(3000, 6)

In [29]:
# Make predictions on the test set
y_pred = model.predict(X_test)

# Calculate the MAE
mae = mean_absolute_error(y_test, y_pred)
print(f"Mean Absolute Error on Test Data: {mae}")

Mean Absolute Error on Test Data: 48475.23351692708


In [30]:
# Print the first 10 predictions and their MAE
for i in range(10):
  print(f"Prediction {i+1}: {round(y_pred[i])}, Actual: {round(y_test[i])}, MAE: {round(abs(y_pred[i] - y_test[i]))}")


Prediction 1: 410082, Actual: 344700, MAE: 65382
Prediction 2: 206192, Actual: 176500, MAE: 29692
Prediction 3: 286720, Actual: 270500, MAE: 16220
Prediction 4: 187867, Actual: 330000, MAE: 142133
Prediction 5: 108805, Actual: 81700, MAE: 27105
Prediction 6: 85838, Actual: 67000, MAE: 18838
Prediction 7: 78256, Actual: 67000, MAE: 11256
Prediction 8: 119331, Actual: 166900, MAE: 47569
Prediction 9: 194991, Actual: 194400, MAE: 591
Prediction 10: 124903, Actual: 164200, MAE: 39297


### Parallelize model training



Let's pretend our dataset was very varge and we wanted to parallelize it accros multiple cores.

You can train models that don't fit in memory using **Dask**.


Dask is a parallel computing library that allows you to scale your computations beyond a single machine's memory. It can handle large datasets by distributing them across multiple cores or machines.

For model training, Dask offers several strategies:

1. **Dask-ML**: This library provides scalable machine learning algorithms that can operate on Dask arrays and dataframes. These algorithms are designed to handle datasets larger than memory.
2. **Incremental Learning**: For models that support incremental learning (like SGDClassifier), Dask can train on chunks of data sequentially, updating the model with each chunk.
3. **Joblib with Dask backend**: You can parallelize the training process of existing scikit-learn models using joblib with the Dask backend. This allows you to distribute the training tasks across a Dask cluster.

In [31]:
#!pip install dask distributed scikit-learn==1.2.2 xgboost

In [None]:
from dask.distributed import Client
client = Client(processes=False)

In [33]:
import joblib
import xgboost as xgb
from sklearn.model_selection import cross_val_score, KFold

#Strat a timer
start_time = time.time()

with joblib.parallel_backend('dask'):
  #n_jobs=-1 will make model use all available cores
  model = xgb.XGBRegressor(objective='reg:squarederror', n_estimators=100, random_state=42, n_jobs=-1)

  kfold = KFold(n_splits=5, shuffle=True, random_state=42)
  scores = cross_val_score(model, X, y, cv=kfold, scoring='neg_mean_absolute_error')

  mae = -scores.mean()
  print(f"Mean Absolute Error from cross-validation: {mae}")

  model.fit(X, y)

  # End timer
end_time = time.time()
elapsed_time = end_time - start_time

print(f"Elapsed time: {elapsed_time:.2f} seconds")

Mean Absolute Error from cross-validation: 48724.84331198299
Elapsed time: 3.15 seconds


The Original code tool 11.07 seconds while the parallelized code took 2.41 seconds to run. A 4x improvement!

### Save results to Parquet

When compared to formats like CSV, Parquet brings the following advantages:
1. It’s faster to read and write, often by 4-10x

2. It’s more compact to store, often by 2-5x

3. It has a schema, and so there’s no ambiguity about what types the columns are. This avoids confusing errors.

4. It supports more advanced data types, like categoricals, proper datetimes, and more

5. It’s more portable, and can be used with other systems like databases or Apache Spark

6. Depending on how the data is partitioned Dask can identify sorted columns, and sometimes pick out subsets of data more efficiently



#### Slow implementation
Add a new column to existing table to store predictions.

In [34]:
# Add predictions column to california_housing_test
con.execute("ALTER TABLE california_housing_test ADD COLUMN predictions DOUBLE")

<duckdb.duckdb.DuckDBPyConnection at 0x7e9c83dd23b0>

Predict in minibatches of 1000. We're purposely not using Pandas to show you can do this without dataframes. This is a slow way to do it.

In [35]:
# Fetch data in batches of 1000
batch_size = 1000
for i in range(0, len(X_test), batch_size):
  X_batch = X_test[i:i+batch_size]
  y_pred_batch = model.predict(X_batch)

  # Update predictions column for the current batch
  for j, prediction in enumerate(y_pred_batch):
    con.execute(f"UPDATE california_housing_test SET predictions = {prediction} WHERE rowid = {i+j+1}")

In [36]:
query = """
SELECT
    housing_median_age,
    total_rooms,
    total_bedrooms,
    population,
    households,
    median_income,
    median_house_value,
    predictions
FROM
    california_housing_test
LIMIT 10;
"""
result = con.execute(query).df()
print(tabulate(result, headers='keys', tablefmt='psql'))

+----+----------------------+---------------+------------------+--------------+--------------+-----------------+----------------------+---------------+
|    |   housing_median_age |   total_rooms |   total_bedrooms |   population |   households |   median_income |   median_house_value |   predictions |
|----+----------------------+---------------+------------------+--------------+--------------+-----------------+----------------------+---------------|
|  0 |                   27 |          3885 |              661 |         1537 |          606 |          6.6085 |               344700 |         nan   |
|  1 |                   43 |          1510 |              310 |          809 |          277 |          3.599  |               176500 |      410082   |
|  2 |                   27 |          3589 |              507 |         1484 |          495 |          5.7934 |               270500 |      206192   |
|  3 |                   28 |            67 |               15 |           49 |         

In [37]:
query = """
COPY (SELECT * FROM california_housing_test)
TO 'california_housing_test.parquet' (FORMAT PARQUET);
"""

#Export to parquet
con.execute(query)

<duckdb.duckdb.DuckDBPyConnection at 0x7e9c83dd23b0>

#### Parallelized implementation

This is the proper way to parallelize the predictions using Dask and saving to parquet.

Let's start by deleting the prdiction column from our test data.

In [39]:
query = """
ALTER TABLE
    california_housing_test
DROP COLUMN
    predictions;
"""

#Export to parquet
con.execute(query)

<duckdb.duckdb.DuckDBPyConnection at 0x7e9c83dd23b0>

Validate the field was dropped.

In [41]:
result = con.execute("SELECT * FROM california_housing_test LIMIT 5").df()
print(tabulate(result, headers='keys', tablefmt='psql'))

+----+-------------+------------+----------------------+---------------+------------------+--------------+--------------+-----------------+----------------------+
|    |   longitude |   latitude |   housing_median_age |   total_rooms |   total_bedrooms |   population |   households |   median_income |   median_house_value |
|----+-------------+------------+----------------------+---------------+------------------+--------------+--------------+-----------------+----------------------|
|  0 |     -122.05 |      37.37 |                   27 |          3885 |              661 |         1537 |          606 |          6.6085 |               344700 |
|  1 |     -118.3  |      34.26 |                   43 |          1510 |              310 |          809 |          277 |          3.599  |               176500 |
|  2 |     -117.81 |      33.78 |                   27 |          3589 |              507 |         1484 |          495 |          5.7934 |               270500 |
|  3 |     -118.36 |  

Great, the prediction field was dropped. Let's add a id field as an index to make it easier to recreate the table after distributed computing.

In [59]:
query = """
ALTER TABLE california_housing_test ADD COLUMN id INTEGER;
UPDATE california_housing_test SET id = rowid;
"""
con.execute(query)

<duckdb.duckdb.DuckDBPyConnection at 0x7e9c83dd23b0>

In [60]:
# Validate the id field was added.
result = con.execute("SELECT * FROM california_housing_test LIMIT 5").df()
print(tabulate(result, headers='keys', tablefmt='psql'))

+----+-------------+------------+----------------------+---------------+------------------+--------------+--------------+-----------------+----------------------+------+
|    |   longitude |   latitude |   housing_median_age |   total_rooms |   total_bedrooms |   population |   households |   median_income |   median_house_value |   id |
|----+-------------+------------+----------------------+---------------+------------------+--------------+--------------+-----------------+----------------------+------|
|  0 |     -122.05 |      37.37 |                   27 |          3885 |              661 |         1537 |          606 |          6.6085 |               344700 |    0 |
|  1 |     -118.3  |      34.26 |                   43 |          1510 |              310 |          809 |          277 |          3.599  |               176500 |    1 |
|  2 |     -117.81 |      33.78 |                   27 |          3589 |              507 |         1484 |          495 |          5.7934 |           



Now let's load the test data into a dask partitioned dataframe with 8 partitions since my lapptop has 8 cores. Data dataframes work on up to 100 GB on laptops and are just pandas under the hood.

https://docs.dask.org/en/stable/dataframe.html

SQLAlchemy, which is used by dask.dataframe.read_sql_table, does not have a registered dialect for DuckDB. Dask's read_sql_table function relies on SQLAlchemy for database connections, but as of now, DuckDB does not have native support for SQLAlchemy dialects.

To load data from DuckDB into Dask, you can use an alternative approach by:

1. Loading the data into a Pandas DataFrame first
2. converting it into a Dask DataFrame.


In [62]:
import dask.dataframe as dd
import pandas as pd

# Load DuckDB table into a Pandas DataFrame
result = con.execute("SELECT * FROM california_housing_test").df()

# Convert Pandas DataFrame to Dask DataFrame
ddf = dd.from_pandas(df, npartitions=8)

In the real world, you would probably export the duckdb data as a parquet file then import it to a dask dataframe.

In [63]:
# Query to export as parquet
query = """
COPY (SELECT * FROM california_housing_test)
TO 'california_housing_test.parquet' (FORMAT PARQUET);
"""

#Export to parquet
con.execute(query)

<duckdb.duckdb.DuckDBPyConnection at 0x7e9c83dd23b0>

In [64]:
# Load the exported Parquet files into a Dask DataFrame
ddf = dd.read_parquet('california_housing_test.parquet', npartitions=8, index_col='id')

In [68]:
ddf.head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,id
0,-122.05,37.37,27.0,3885.0,661.0,1537.0,606.0,6.6085,344700.0,0
1,-118.3,34.26,43.0,1510.0,310.0,809.0,277.0,3.599,176500.0,1
2,-117.81,33.78,27.0,3589.0,507.0,1484.0,495.0,5.7934,270500.0,2
3,-118.36,33.82,28.0,67.0,15.0,49.0,11.0,6.1359,330000.0,3
4,-119.67,36.33,19.0,1241.0,244.0,850.0,237.0,2.9375,81700.0,4


Let’s load and cache our dataset into our cluster’s distributed memory using df.persist(). This allows us to avoid repeated expensive data loading steps when doing multiple computations on the same data.

In [73]:
ddf = ddf.persist()

The next step is to make predictions using XGBoost and dask.

 **There is conflicting documentation and I'm running into errors. This step is not so trivial.**

In [None]:
import xgboost as xgb
from dask import dataframe as dd

cluster = LocalCluster(n_workers=8, threads_per_worker=1)
client = Client(cluster)

# Convert your Dask DataFrame to DaskDMatrix (a special format for XGBoost with Dask)
dmatrix = xgb.dask.DaskDMatrix(ddf)

# Predict using the trained model
predictions = xgb.dask.predict(your_dask_client, model, dmatrix)

# Compute the predictions if needed (Dask operations are lazy by default)
final_predictions = predictions.compute()

# Display or use the predictions
print(final_predictions)


### Delete Table and Database

In [77]:
# Delete the table
con.execute("DROP TABLE IF EXISTS airbnb_data")
con.execute("DROP TABLE IF EXISTS california_housing_train")
con.execute("DROP TABLE IF EXISTS california_housing_test")

# Close the connection to the database
con.close()

# Delete the database file (optional)
!rm airbnb.db
