## Initialization chrome driver

I need to manually install chromedriver in Google Colab because Colab does not come with it pre-installed. As Colab's default runtime was changed from Ubuntu 18.04 LTS to Ubuntu 20.04 LTS, and chromium-browser is no longer distributed outside of the snap package. So the old method of installing Chromium via APT no longer works. By manually downloading the Chrome and chromedriver binaries, we can get around this issue and still use Selenium with Chrome in Colab notebooks.

Reference: https://dev.classmethod.jp/articles/google-colaboratory-use-selenium/

In [1]:
%%shell

# 更新を実行
sudo apt -y update

# ダウンロードのために必要なパッケージをインストール
sudo apt install -y wget curl unzip
# 以下はChromeの依存パッケージ
wget http://archive.ubuntu.com/ubuntu/pool/main/libu/libu2f-host/libu2f-udev_1.1.4-1_all.deb
dpkg -i libu2f-udev_1.1.4-1_all.deb

# Chromeのインストール
wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
dpkg -i google-chrome-stable_current_amd64.deb

# Chrome Driverのインストール
CHROME_DRIVER_VERSION=`curl -sS chromedriver.storage.googleapis.com/LATEST_RELEASE`
wget -N https://chromedriver.storage.googleapis.com/$CHROME_DRIVER_VERSION/chromedriver_linux64.zip -P /tmp/
unzip -o /tmp/chromedriver_linux64.zip -d /tmp/
chmod +x /tmp/chromedriver
mv /tmp/chromedriver /usr/local/bin/chromedriver

Hit:1 https://cli.github.com/packages stable InRelease
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [2,002 kB]
Hit:6 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:7 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:9 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [80.4 kB]
Get:10 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,790 kB]
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [9,243 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:13 http://archive.ubuntu.com



### Install selenium

In [2]:
!pip install selenium
!pip install webdriver-manager

Collecting selenium
  Downloading selenium-4.35.0-py3-none-any.whl.metadata (7.4 kB)
Collecting trio~=0.30.0 (from selenium)
  Downloading trio-0.30.0-py3-none-any.whl.metadata (8.5 kB)
Collecting trio-websocket~=0.12.2 (from selenium)
  Downloading trio_websocket-0.12.2-py3-none-any.whl.metadata (5.1 kB)
Collecting typing_extensions~=4.14.0 (from selenium)
  Downloading typing_extensions-4.14.1-py3-none-any.whl.metadata (3.0 kB)
Collecting outcome (from trio~=0.30.0->selenium)
  Downloading outcome-1.3.0.post0-py2.py3-none-any.whl.metadata (2.6 kB)
Collecting wsproto>=0.14 (from trio-websocket~=0.12.2->selenium)
  Downloading wsproto-1.2.0-py3-none-any.whl.metadata (5.6 kB)
Downloading selenium-4.35.0-py3-none-any.whl (9.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.6/9.6 MB[0m [31m62.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading trio-0.30.0-py3-none-any.whl (499 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m499.2/499.2 kB[0m [31m25.

In [3]:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

options = Options()
options.add_argument("--headless")
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--no-sandbox")

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)


## Scraping the data and save to csv

In [4]:
import pandas as pd
from selenium.webdriver.common.by import By


# Initialize the DataFrame
df = pd.DataFrame(columns=["YEAR", "TIMES", "N1", "N2", "N3", "N4", "N5", "N6", "S1"])

# Iterate over the years
for year in range(2003, 2025):
    # Open the webpage
    driver.get(f"http://www.nfd.com.tw/house/year/{year}.htm")

    # Scrape the data
    data = []
    table = driver.find_element(By.XPATH, "//table")
    for row in table.find_elements(By.XPATH, ".//tr"):
        cols = row.find_elements(By.XPATH, ".//td")
        # Ensure row has data before appending
        if cols:
            data.append([col.text for col in cols])

    # Convert data to DataFrame and append to the main DataFrame
    df_year = pd.DataFrame(data, columns=["YEAR", "TIMES", "N1", "N2", "N3", "N4", "N5", "N6", "S1"])
    # Corrected line: use pd.concat() instead of df.concat()
    df = pd.concat([df, df_year], ignore_index=True)

driver.quit()

# Save the DataFrame as CSV
df.to_csv("data.csv", index=False)

## Import Module

In [5]:
import gc
import os
import math
import numpy as np
import pandas as pd
from numpy import sqrt, sin, cos, pi, zeros
from numpy.random import randn, rand, uniform, normal
from scipy.linalg import hadamard
import tensorflow as tf
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, Activation, LSTM, Dropout, RepeatVector, TimeDistributed, Embedding, Reshape, Dot, Concatenate
from tensorflow.keras.layers import GRU, SpatialDropout1D, Conv1D, GlobalMaxPooling1D,Multiply, Lambda, Softmax, Flatten, BatchNormalization, Bidirectional, dot, concatenate
from tensorflow.keras.layers import AdditiveAttention, Attention
from tensorflow.keras.activations import relu
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import callbacks
from tensorflow.keras import backend
from tensorflow.keras.utils import plot_model
from tensorflow.keras.metrics import MeanSquaredError
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

print("Tensorflow version " + tf.__version__)
AUTO = tf.data.experimental.AUTOTUNE

Tensorflow version 2.19.0


## Load Mark6 Lottery history


In [6]:
df = pd.read_csv('data.csv')

# date cleaning
df = df[df['YEAR'] != "YEAR"]
df = df[df['YEAR'] != "新冠疫情.七個月未開  2020 / 02 / 01~2020 / 09 / 24"]
df.to_csv('data.csv', index=False)
df.tail()

Unnamed: 0,YEAR,TIMES,N1,N2,N3,N4,N5,N6,S1
3083,2024,136,9,11,23,26,30,48,28
3084,2024,137,1,2,3,36,37,41,10
3085,2024,138,14,20,21,22,23,30,18
3086,2024,139,8,11,18,23,25,41,45
3087,2024,140,1,3,5,18,26,35,14


In [7]:
df = df.drop(columns=["YEAR", "TIMES"])

In [8]:
df = df.astype(int)

## Split the times series data

In [9]:
data = df.values - 1
train = data[:-50]
test = data[-50:]

w = 15
X_train = []
y_train = []
for i in range(w, len(train)):
    X_train.append(train[i - w: i, :])
    y_train.append(train[i])
X_train, y_train = np.array(X_train), np.array(y_train)

inputs = data[data.shape[0] - test.shape[0] - w:]
X_test = []
for i in range(w, inputs.shape[0]):
    X_test.append(inputs[i - w: i, :])
X_test = np.array(X_test)
y_test = test

In [10]:
print(data.shape)
print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

(3065, 7)
(3000, 15, 7)
(3000, 7)
(50, 15, 7)
(50, 7)


## Modeling setup

In [37]:
embed_dim = (59 // 2) + 1
dropout_rate = 0.5
spatial_dropout_rate = 0.5
steps_before = w
steps_after = 7
feature_count = embed_dim * 7
hidden_neurons = [64, 32]
bidirectional = True
attention_style = 'Bahdanau'

In [38]:
import tensorflow as tf
from tensorflow.keras import regularizers

# Create an instance of MirroredStrategy.
strategy = tf.distribute.MirroredStrategy()

with strategy.scope():

    inp0 = Input(shape = (w, X_train.shape[2]))

    inp1 = Lambda(lambda x: x[:, :, 0])(inp0)
    inp1 = Embedding(49, embed_dim)(inp1)
    inp1 = SpatialDropout1D(spatial_dropout_rate)(inp1)

    inp2 = Lambda(lambda x: x[:, :, 1])(inp0)
    inp2 = Embedding(49, embed_dim)(inp2)
    inp2 = SpatialDropout1D(spatial_dropout_rate)(inp2)

    inp3 = Lambda(lambda x: x[:, :, 2])(inp0)
    inp3 = Embedding(49, embed_dim)(inp3)
    inp3 = SpatialDropout1D(spatial_dropout_rate)(inp3)

    inp4 = Lambda(lambda x: x[:, :, 3])(inp0)
    inp4 = Embedding(49, embed_dim)(inp4)
    inp4 = SpatialDropout1D(spatial_dropout_rate)(inp4)

    inp5 = Lambda(lambda x: x[:, :, 4])(inp0)
    inp5 = Embedding(49, embed_dim)(inp5)
    inp5 = SpatialDropout1D(spatial_dropout_rate)(inp5)

    inp6 = Lambda(lambda x: x[:, :, 5])(inp0)
    inp6 = Embedding(49, embed_dim)(inp6)
    inp6 = SpatialDropout1D(spatial_dropout_rate)(inp6)

    inp7 = Lambda(lambda x: x[:, :, 6])(inp0)
    inp7 = Embedding(49, embed_dim)(inp7)
    inp7 = SpatialDropout1D(spatial_dropout_rate)(inp7)

    inp = Concatenate()([inp1, inp2, inp3, inp4, inp5, inp6, inp7])

    # Seq2Seq model with attention or bidirectional encoder

    num_layers = len(hidden_neurons)

    sh_list, h_list, c_list = [inp], [], []

    if bidirectional:

        for i in range(num_layers):

            sh, fh, fc, bh, bc = Bidirectional(LSTM(hidden_neurons[i],
                                                    dropout = dropout_rate,
                                                    return_state = True,
                                                    return_sequences = True,
                                                    kernel_regularizer=tf.keras.regularizers.l2(1e-4),
                                                    recurrent_regularizer=tf.keras.regularizers.l2(1e-4))

                                                    )(sh_list[-1])

            h = Concatenate()([fh, bh])
            c = Concatenate()([fc, bc])

            sh_list.append(sh)
            h_list.append(h)
            c_list.append(c)

    else:

        for i in range(num_layers):

            sh, h, c = LSTM(hidden_neurons[i],
                            dropout = dropout_rate,
                            return_state = True,
                            return_sequences = True,
                            kernel_regularizer=tf.keras.regularizers.l2(1e-4),
                            recurrent_regularizer=tf.keras.regularizers.l2(1e-4)
                            )(sh_list[-1])

            sh_list.append(sh)
            h_list.append(h)
            c_list.append(c)

    decoder = RepeatVector(steps_after)(h_list[-1])

    if bidirectional:

        decoder_hidden_neurons = [hn * 2 for hn in hidden_neurons]

    else:

        decoder_hidden_neurons = hidden_neurons

    for i in range(num_layers):

        decoder = LSTM(decoder_hidden_neurons[i],
                       dropout = dropout_rate,
                       return_sequences = True)(decoder, initial_state = [h_list[i], c_list[i]])

    if attention_style == 'Bahdanau':

        context = AdditiveAttention(dropout = dropout_rate)([decoder, sh_list[-1]])

        decoder = concatenate([context, decoder])

    elif attention_style == 'Luong':

        context = Attention(dropout = dropout_rate)([decoder, sh_list[-1]])

        decoder = concatenate([context, decoder])

    out = Dense(49, activation = 'softmax')(decoder)

    model = Model(inputs = inp0, outputs = out)

    sparse_top_k = tf.keras.metrics.SparseTopKCategoricalAccuracy(k = 5, name = 'sparse_top_k')

    optimizer = tf.keras.optimizers.Adam(learning_rate=LR_MAX)

    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=[sparse_top_k])

In [39]:
model.summary()

In [40]:
import math, tensorflow as tf
from keras.callbacks import Callback

class CosineAnnealingScheduler(Callback):
    def __init__(self, T_max, eta_max, eta_min=0.0, verbose=0):
        super().__init__()
        self.T_max = float(T_max)
        self.eta_max = float(eta_max)
        self.eta_min = float(eta_min)
        self.verbose = verbose

    def _set_lr(self, value: float):
        opt = self.model.optimizer
        lr_attr = getattr(opt, "learning_rate", None)
        if lr_attr is None:
            raise RuntimeError("Optimizer must expose `learning_rate` in Keras 3.")
        if hasattr(lr_attr, "assign"):
            lr_attr.assign(value)          # tf.Variable / HyperParameter path
        else:
            opt.learning_rate = float(value)  # property set path

    def _get_lr(self) -> float:
        opt = self.model.optimizer
        lr_attr = getattr(opt, "learning_rate", None)
        if lr_attr is None:
            return None
        # try variable → value; else treat as float
        try:
            return float(tf.keras.backend.get_value(lr_attr))
        except Exception:
            return float(lr_attr)

    def on_epoch_begin(self, epoch, logs=None):
        lr = self.eta_min + (self.eta_max - self.eta_min) * (1.0 + math.cos(math.pi * (epoch / self.T_max))) / 2.0
        self._set_lr(lr)
        if self.verbose:
            print(f"[CosAnneal] epoch {epoch+1}: lr set to {self._get_lr():.6f}")

    def on_epoch_end(self, epoch, logs=None):
        if logs is None: logs = {}
        logs["learning_rate"] = self._get_lr()  # <-- use a Keras-3 friendly key


## Model Fit

In [None]:
from tensorflow.keras import callbacks

# Assuming 'model', 'X_train', 'y_train', 'X_test', 'y_test',
# and 'CosineAnnealingScheduler' are already defined.

EPOCHS = 1000
BATCH_SIZE = 64
LR_MAX = 1e-3
LR_MIN = 1e-4

cas = CosineAnnealingScheduler(T_max=10, eta_max=LR_MAX, eta_min=LR_MAX*0.1, verbose=1)

# --- FIX IS ON THIS LINE ---
ckp = callbacks.ModelCheckpoint('best_model.keras', monitor = 'val_sparse_top_k', verbose = 0,
                                save_best_only = True, save_weights_only = False, mode = 'max')

history = model.fit(X_train, y_train,
                    validation_data = (X_test, y_test),
                    callbacks = [ckp, cas],
                    epochs = EPOCHS,
                    batch_size = BATCH_SIZE,
                    verbose = 0)

hist = pd.DataFrame(history.history)

[CosAnneal] epoch 1: lr set to 0.001000
[CosAnneal] epoch 2: lr set to 0.000978
[CosAnneal] epoch 3: lr set to 0.000914
[CosAnneal] epoch 4: lr set to 0.000815
[CosAnneal] epoch 5: lr set to 0.000689
[CosAnneal] epoch 6: lr set to 0.000550
[CosAnneal] epoch 7: lr set to 0.000411
[CosAnneal] epoch 8: lr set to 0.000285
[CosAnneal] epoch 9: lr set to 0.000186
[CosAnneal] epoch 10: lr set to 0.000122
[CosAnneal] epoch 11: lr set to 0.000100
[CosAnneal] epoch 12: lr set to 0.000122
[CosAnneal] epoch 13: lr set to 0.000186
[CosAnneal] epoch 14: lr set to 0.000285
[CosAnneal] epoch 15: lr set to 0.000411
[CosAnneal] epoch 16: lr set to 0.000550
[CosAnneal] epoch 17: lr set to 0.000689
[CosAnneal] epoch 18: lr set to 0.000815
[CosAnneal] epoch 19: lr set to 0.000914
[CosAnneal] epoch 20: lr set to 0.000978
[CosAnneal] epoch 21: lr set to 0.001000
[CosAnneal] epoch 22: lr set to 0.000978
[CosAnneal] epoch 23: lr set to 0.000914
[CosAnneal] epoch 24: lr set to 0.000815
[CosAnneal] epoch 25: lr 

In [None]:
model.load_weights('best_model.keras')
pred = model.predict(X_test)
pred = np.argmax(pred, axis = 2)

In [None]:
loss_and_metrics = model.evaluate(X_test, y_test)
loss_and_metrics

In [None]:
import matplotlib.pyplot as plt

# Plotting training loss and sparse top k
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.legend()
plt.title('Loss Over Epochs')

plt.subplot(1, 2, 2)
plt.plot(history.history['sparse_top_k'], label='Training Sparse Top K')
plt.plot(history.history['val_sparse_top_k'], label='Validation Sparse Top K')
plt.legend()
plt.title('Sparse Top K Over Epochs')

plt.show()

## Predict of next draw

In [None]:
X_latest = np.array([data[-w:, :]])
pred_latest = model.predict(X_latest)
pred_latest = np.squeeze(pred_latest)
pred_latest_greedy = np.argmax(pred_latest, axis = 1)
print(pred_latest_greedy + 1)