# Install JoinBoost and databases package.
To run this demo, you'll need to install the JoinBoost and databases packages. We will use Spark as the database in this demo.



In [1]:
%pip install joinboost==0.0.1523
%pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Download and Load Data into a Database

This step is only Necessary for the Demo as pre-existing databases are unavailable. In general, JoinBoost enables direct model training over databases without data downloading or loading.

In [2]:
import urllib.request
from pyspark.sql import SparkSession

# Download the files
urllib.request.urlretrieve("https://www.dropbox.com/s/kaovdndtevcvt83/holidays.csv?dl=1", "holidays.csv")
urllib.request.urlretrieve("https://www.dropbox.com/s/wh6amz4um7ieyqz/items.csv?dl=1", "items.csv")
urllib.request.urlretrieve("https://www.dropbox.com/s/ze6of1xqwslt8jb/oil.csv?dl=1", "oil.csv")
urllib.request.urlretrieve("https://www.dropbox.com/s/uiojfrc5c20gyrl/sales_small.csv?dl=1", "sales_small.csv")
urllib.request.urlretrieve("https://www.dropbox.com/s/cwy6z0b7rhsnrxb/stores.csv?dl=1", "stores.csv")
urllib.request.urlretrieve("https://www.dropbox.com/s/wwaoga17z70jb6l/train_small.csv?dl=1", "train_small.csv")
urllib.request.urlretrieve("https://www.dropbox.com/s/2bxto9wnetwnvqd/transactions.csv?dl=1", "transactions.csv")

# Initialize SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()

# Read CSV files into Spark DataFrames and create temporary views
spark.read.csv("holidays.csv", header=True, inferSchema=True).createOrReplaceTempView("holidays")
spark.read.csv("items.csv", header=True, inferSchema=True).createOrReplaceTempView("items")
spark.read.csv("oil.csv", header=True, inferSchema=True).createOrReplaceTempView("oil")
spark.read.csv("sales_small.csv", header=True, inferSchema=True).createOrReplaceTempView("sales")
spark.read.csv("stores.csv", header=True, inferSchema=True).createOrReplaceTempView("stores")
spark.read.csv("train_small.csv", header=True, inferSchema=True).createOrReplaceTempView("train")
spark.read.csv("transactions.csv", header=True, inferSchema=True).createOrReplaceTempView("transactions")


# Data exploration and Data Transformation
You can use Spark SQL queries to show all available tables and their schema. You can perform data transformations over these tables.



In [3]:
# List all available tables
tables = spark.catalog.listTables()
for table in tables:
    print(table.name)

# Print the schema for each table
for table in tables:
    print(f"Schema for {table.name}:")
    spark.table(table.name).printSchema()

# Get a sample of sales
sample_sales = spark.sql("SELECT * FROM sales LIMIT 10")
sample_sales.show()

holidays
items
oil
sales
stores
train
transactions
Schema for holidays:
root
 |-- date: integer (nullable = true)
 |-- htype: integer (nullable = true)
 |-- locale: integer (nullable = true)
 |-- locale_name: integer (nullable = true)
 |-- transferred: integer (nullable = true)
 |-- f2: integer (nullable = true)

Schema for items:
root
 |-- item_nbr: integer (nullable = true)
 |-- family: integer (nullable = true)
 |-- class: integer (nullable = true)
 |-- perishable: integer (nullable = true)
 |-- f1: integer (nullable = true)

Schema for oil:
root
 |-- date: integer (nullable = true)
 |-- dcoilwtico: integer (nullable = true)
 |-- f3: integer (nullable = true)

Schema for sales:
root
 |-- item_nbr: integer (nullable = true)
 |-- unit_sales: double (nullable = true)
 |-- onpromotion: integer (nullable = true)
 |-- tid: integer (nullable = true)
 |-- Y: double (nullable = true)

Schema for stores:
root
 |-- store_nbr: integer (nullable = true)
 |-- city: integer (nullable = true)
 |-- 


# Building a Join Graph

First, you need to specify the join graph, which includes the relations and features you want to use, as well as how these relations should be joined.
For example,


```
dataset.add_join("sales", "items", ["item_nbr"], ["item_nbr"])
```

indicates that the join condition between the "sales" and "items" tables is based on the item_nbr attribute, where sales.item_nbr = items.item_nbr. By specifying the join conditions in this manner, you can define how the tables in your dataset are related and should be combined.

In [4]:
from joinboost.executor import SparkExecutor
from joinboost.joingraph import JoinGraph

# Create SparkExecutor instance 
exe = SparkExecutor(spark)

# Initialize JoinGraph with the SparkExecutor
dataset = JoinGraph(exe=exe)

# Add relations to the JoinGraph with their features
dataset.add_relation("sales", [], y='Y')
dataset.add_relation("holidays", ["htype", "locale", "locale_name", "transferred", "f2"])
dataset.add_relation("oil", ["dcoilwtico", "f3"])
dataset.add_relation("transactions", ["transactions", "f5"])
dataset.add_relation("stores", ["city", "state", "stype", "cluster", "f4"])
dataset.add_relation("items", ["family", "class", "perishable", "f1"])

# Add join conditions to the JoinGraph
# sales.item_nbr = items.item_nbr
dataset.add_join("sales", "items", ["item_nbr"], ["item_nbr"])
# sales.tid = transactions.tid
dataset.add_join("sales", "transactions", ["tid"], ["tid"])
# transactions.store_nbr = stores.store_nbr
dataset.add_join("transactions", "stores", ["store_nbr"], ["store_nbr"])
# transactions.date = holidays.date
dataset.add_join("transactions", "holidays", ["date"], ["date"])
# holidays.date = oil.date
dataset.add_join("holidays", "oil", ["date"], ["date"])

dataset


# Training a Decision Tree with JoinBoost

Once you have defined the join graph, you can train a decision tree over it using JoinBoost.

In [6]:
from joinboost.app import DecisionTree

# Set the parameters for the decision tree
depth = 3
reg = DecisionTree(learning_rate=1, max_leaves=2 ** depth, max_depth=depth)

# Train the decision tree on the dataset
reg.fit(dataset)

# Compute the Root Mean Square Error (RMSE) on the training set
train_rmse = reg.compute_rmse('train')[0]
print("Training RMSE:", train_rmse)


Training RMSE: 2535.060839158587


# Training Gradient Boosting with JoinBoost

After training a decision tree, you can train a Gradient Boosting model with JoinBoost for further improvement. In this example, we train a Gradient Boosting model with 10 iterations.

As demonstrated, Gradient Boosting can further reduce the RMSE.

In [8]:
from joinboost.app import GradientBoosting

# Set the parameters for the Gradient Boosting model
reg = GradientBoosting(learning_rate=1, max_leaves=2 ** depth, max_depth=depth, iteration=10)

# Train the Gradient Boosting model on the dataset
reg.fit(dataset)

# Compute the Root Mean Square Error (RMSE) on the training set
train_rmse = reg.compute_rmse('train')[0]
print("Training RMSE for Gradient Boosting:", train_rmse)

Training RMSE for Gradient Boosting: 786.8773440907672


# Exploring JoinBoost Internals

Internally, JoinBoost translates machine learning logic into SQL and executes SQL queries directly in your database without the need for data movement.

To see the SQL queries generated by JoinBoost, you can enable debug mode when creating an executor instance:

In [7]:
# Create SparkExecutor instance with debug mode enabled
exe = SparkExecutor(spark, debug=True)

# Initialize JoinGraph with the executor
dataset = JoinGraph(exe=exe)

# Add relations and join conditions as described earlier
dataset.add_relation("sales", [], y='Y')
dataset.add_relation("holidays", ["htype", "locale", "locale_name", "transferred", "f2"])
dataset.add_relation("oil", ["dcoilwtico", "f3"])
dataset.add_relation("transactions", ["transactions", "f5"])
dataset.add_relation("stores", ["city", "state", "stype", "cluster", "f4"])
dataset.add_relation("items", ["family", "class", "perishable", "f1"])
dataset.add_join("sales", "items", ["item_nbr"], ["item_nbr"])
dataset.add_join("sales", "transactions", ["tid"], ["tid"])
dataset.add_join("transactions", "stores", ["store_nbr"], ["store_nbr"])
dataset.add_join("transactions", "holidays", ["date"], ["date"])
dataset.add_join("holidays", "oil", ["date"], ["date"])

# Set the parameters for the decision tree
depth = 3
reg = DecisionTree(learning_rate=1, max_leaves=2 ** depth, max_depth=depth)

# Train the decision tree on the dataset
reg.fit(dataset)

# Compute the Root Mean Square Error (RMSE) on the training set
train_rmse = reg.compute_rmse('train')[0]
print("Training RMSE:", train_rmse)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
+-------------------+---+--------+
|-2523.9350228107196|  1|     148|
| -5114.945259512356|  2|    2142|
| -17303.40289066734|  3|     471|
|  -3982.77144377889|  1|    1591|
| -4670.405934014017|  4|     463|
| 2540.3161527900907|  1|    2659|
|-13903.023418589346|  2|     496|
| 2084.7686805128587|  1|     833|
| -102.3144433702937|  1|    1829|
|   3812.20308713511|  1|    1580|
| -6953.031007789631|  1|    1645|
| -2515.973519282561|  1|    2866|
| 2478.3953426740836|  1|    3175|
| -9856.377730253382|  3|    1088|
| -7192.629214109124|  1|    2122|
|-3778.5422158933607|  1|    1342|
|-15662.887643042312|  3|    1507|
|  2167.326165316411|  1|    2999|
| -5952.213483393995|  3|     623|
| -3415.399713834943|  1|    3179|
+-------------------+---+--------+
only showing top 20 rows

SELECT htype AS htype, CASE WHEN 4626 > c THEN ((s/c)*s + (13439060.470843479-s)/(4626-c)*(13439060.470843479-s)) ELSE 0 END AS criteria, s