# Python 機器學習從零至一 

> 類別預測的任務

[數據交點](https://www.datainpoint.com) | 郭耀仁 <yaojenkuo@datainpoint.com>

In [1]:
import unittest
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

## 練習題指引

- 練習題閒置超過 15 分鐘會自動斷線，只要重新點選練習題連結即可重新啟動。
- 第一個程式碼儲存格會將可能用得到的模組載入。
- 如果練習題需要載入檔案，檔案存放於練習題的工作目錄。
- 練習題已經給定函數、類別、預期輸入或參數名稱，我們只需要寫作程式區塊。同時也給定函數的類別提示，說明預期輸入以及預期輸出的類別。
- 說明（Docstring）會描述測試如何進行，閱讀說明能夠暸解預期輸入以及預期輸出之間的關係，幫助我們更快解題。
- 請在 `### BEGIN SOLUTION` 與 `### END SOLUTION` 這兩個註解之間寫作函數或者類別的程式區塊。
- 將預期輸出放置在 `return` 保留字之後，若只是用 `print()` 函數將預期輸出印出無法通過測試。
- 語法錯誤（`SyntaxError`）或縮排錯誤（`IndentationError`）等將會導致測試失效，測試之前應該先在筆記本使用函數觀察是否與說明（Docstring）描述的功能相符。
- 執行測試：點選上方選單的 Kernel -> Restart & Run All -> Restart and Run All Cells

## 01. 載入 `titanic` 中的 `train.csv` 與 `test.csv`

定義函數 `import_titanic()` 將位於 `titanic` 路徑的 `train.csv` 與 `test.csv` 載入。

來源：<https://www.kaggle.com/c/titanic>

- 運用絕對路徑。
- 使用 `pd.read_csv()` 函數。
- 將預期輸出寫在 `return` 之後。

In [2]:
def import_titanic() -> tuple:
    """
    >>> train, test = import_titanic()
    >>> type(train)
    pandas.core.frame.DataFrame
    >>> type(test)
    pandas.core.frame.DataFrame
    >>> train.shape
    (891, 12)
    >>> test.shape
    (418, 11)
    """
    ### BEGIN SOLUTION
    train_csv = pd.read_csv("titanic/train.csv")
    test_csv = pd.read_csv("titanic/test.csv")
    return train_csv, test_csv
    ### END SOLUTION

## 02. 選擇 `titanic` 目標陣列與特徵矩陣

定義函數 `extract_target_array_feature_matrix_titanic()` 以 `train.csv` 中的 `Survived` 作為目標陣列 $y$、`Sex`、`Age` 作為特徵矩陣 $X$

- 使用 `import_titanic()` 函數。
- 運用選擇欄位技巧。
- 注意特徵矩陣外型。
- 將預期輸出寫在 `return` 之後。

In [3]:
def extract_target_array_feature_matrix_titanic() -> tuple:
    """
    >>> y, X = extract_target_array_feature_matrix_titanic()
    >>> type(y)
    numpy.ndarray
    >>> type(X)
    numpy.ndarray
    >>> y.shape
    (891,)
    >>> X.shape
    (891, 2)
    """
    ### BEGIN SOLUTION
    train, test = import_titanic()
    y = train["Survived"].values
    X = train[["Sex", "Age"]].values
    return y, X
    ### END SOLUTION

## 03. 操作 `titanic` 特徵矩陣

定義函數 `wrangle_feature_matrix_titanic()` 將 `extract_target_array_feature_matrix_titanic()` 函數輸出的 `X` 第 0 欄轉換為整數、第 1 欄填補未定義值，轉換與填補的規則如下：

- `{'female': 0, 'male': 1}`
- 使用 `Series.map()`
- 使用 `Series.mean()`
- 使用 `Series.fillna()` 以平均數作為填補值。
- 將預期輸出寫在 `return` 之後。

In [4]:
def wrangle_feature_matrix_titanic(X: np.ndarray) -> np.ndarray:
    """
    >>> y, X = extract_target_array_feature_matrix_titanic()
    >>> X_wrangled = wrangle_feature_matrix_titanic(X)
    >>> type(X_wrangled)
    numpy.ndarray
    >>> np.unique(X_wrangled[:, 0])
    array([0, 1])
    >>> np.sum(np.isnan(X_wrangled[:, 1]))
    0
    """
    ### BEGIN SOLUTION
    X_dataframe = pd.DataFrame(X)
    X_dataframe[0] = X_dataframe[0].map({'female': 0, 'male': 1})
    mean_age = X_dataframe[1].mean()
    X_dataframe[1] = X_dataframe[1].fillna(mean_age)
    X_dataframe[1] = X_dataframe[1].astype(int)
    return X_dataframe.values
    ### END SOLUTION

## 04. 為 `titanic` 特徵矩陣添加截距

定義函數 `add_intercepts_for_feature_matrix_titanic()` 為特徵矩陣添加截距 $x_0 = 1$

- 將預期輸出寫在 `return` 之後。

In [5]:
def add_intercepts_for_feature_matrix_titanic(X: np.ndarray) -> np.ndarray:
    """
    >>> y, X = extract_target_array_feature_matrix_titanic()
    >>> X_wrangled = wrangle_feature_matrix_titanic(X)
    >>> X_wrangled = add_intercepts_for_feature_matrix_titanic(X_wrangled)
    >>> type(X_wrangled)
    numpy.ndarray
    >>> X_wrangled.shape
    (891, 3)
    >>> X_wrangled
    array([[ 1,  1, 22],
           [ 1,  0, 38],
           [ 1,  0, 26],
           ...,
           [ 1,  0, 29],
           [ 1,  1, 26],
           [ 1,  1, 32]])
    """
    ### BEGIN SOLUTION
    m = X.shape[0]
    intercepts = np.ones((m, 1), dtype=int)
    return np.concatenate((intercepts, X), axis=1)
    ### END SOLUTION

## 04. 切割 `titanic` 訓練與驗證資料

定義函數 `split_train_valid_titanic()` 將 `extract_target_array_feature_matrix_titanic()` 函數所輸出的 $y$ 與 `wrangle_feature_matrix_titanic()` 函數所輸出的 `X_wrangled` 切割為訓練與驗證資料。

- 使用 `train_test_split(test_size=0.3, random_state=42)` 函數。
- 將預期輸出寫在 `return` 之後。

In [6]:
def split_train_valid_titanic(X: np.ndarray, y: np.ndarray) -> tuple:
    """
    >>> y, X = extract_target_array_feature_matrix_titanic()
    >>> X_wrangled = wrangle_feature_matrix_titanic(X)
    >>> X_wrangled = add_intercepts_for_feature_matrix_titanic(X_wrangled)
    >>> X_train, X_valid, y_train, y_valid = split_train_valid_titanic(X_wrangled, y)
    >>> X_train.shape
    (623, 3)
    >>> X_valid.shape
    (268, 3)
    >>> y_train.shape
    (623,)
    >>> y_valid.shape
    (268,)
    """
    ### BEGIN SOLUTION
    return train_test_split(X, y, test_size=0.3, random_state=42)
    ### END SOLUTION

## 05. 使用梯度遞減找出 `titanic` 的規則 $w$ 

定義函數 `find_w_with_gradient_descent_titanic()` 找出目標陣列 $y^{(train)}$、特徵矩陣 $X^{(train)}$ 之間的規則 $w$

- 使用梯度遞減。
- 將預期輸出寫在 `return` 之後。

In [7]:
def find_w_with_gradient_descent_titanic(X_train: np.ndarray, y_train: np.ndarray) -> np.ndarray:
    """
    >>> y, X = extract_target_array_feature_matrix_titanic()
    >>> X_wrangled = wrangle_feature_matrix_titanic(X)
    >>> X_wrangled = add_intercepts_for_feature_matrix_titanic(X_wrangled)
    >>> X_train, X_valid, y_train, y_valid = split_train_valid_titanic(X_wrangled, y)
    >>> w = find_w_with_gradient_descent_titanic(X_train, y_train)
    >>> w[1] < 0
    True
    >>> w[2] < 0
    True
    """
    ### BEGIN SOLUTION
    m, n = X_train.shape
    w = np.random.rand(n)
    epochs = 50000
    learning_rate = 0.01
    epsilon = 1e-6
    for i in range(epochs):
        X_w = np.dot(X_train, w)
        p_hat = 1 / (1 + np.exp(-X_w))
        cost_y1 = -np.dot(y_train, np.log(p_hat + epsilon))
        cost_y0 = -np.dot(1 - y_train, np.log(1 - p_hat + epsilon))
        cross_entropy = (cost_y1 + cost_y0) / m
        X_T = np.transpose(X_train)
        gradient = (1/m) * np.dot(X_T, p_hat - y_train)
        if i % 5000 == 0:
            print("epoch: {:6} - loss: {:.6f}".format(i, cross_entropy))
        w -= learning_rate*gradient
    return w.ravel()
    ### END SOLUTION

## 06. 定義 Sigmoid 函數預測機率

定義函數 `predict_proba_titanic()` 運用上題找出的規則 $w$ 預測 `Survived` 為 0 或 1 的機率為何。

\begin{equation}
\sigma(x) = \frac{1}{1 + e^{-x}}
\end{equation}

- 使用 Sigmoid 函數。
- 將預期輸出寫在 `return` 之後。

In [8]:
def predict_proba_titanic(X: np.ndarray, w: np.ndarray) -> np.ndarray:
    """
    >>> y, X = extract_target_array_feature_matrix_titanic()
    >>> X_wrangled = wrangle_feature_matrix_titanic(X)
    >>> X_wrangled = add_intercepts_for_feature_matrix_titanic(X_wrangled)
    >>> X_train, X_valid, y_train, y_valid = split_train_valid_titanic(X_wrangled, y)
    >>> w = find_w_with_gradient_descent_titanic(X_train, y_train)
    >>> predict_proba_titanic(X_valid, w)[:5]
    array([[0.8140447 , 0.1859553 ],
           [0.815614  , 0.184386  ],
           [0.80685537, 0.19314463],
           [0.25045221, 0.74954779],
           [0.25834303, 0.74165697]])
    """
    ### BEGIN SOLUTION
    X_w = np.dot(X, w)
    p_hat_1 = (1 / (1 + np.exp(-X_w))).reshape(-1, 1)
    p_hat_0 = 1 - p_hat_1
    proba = np.concatenate([p_hat_0, p_hat_1], axis=1)
    return proba
    ### END SOLUTION

## 07. 以梯度遞減找出的 $w$ 預測位於 `titanic` 路徑的 `test.csv`

定義函數 `predict_survived()` 能夠依據 `Sex`、`Age` 與基於機器學習的模型預測 `test.csv` 的 `Survived`

- 使用 `extract_target_array_feature_matrix_titanic()` 函數。
- 使用 `wrangle_feature_matrix_titanic()` 函數。
- 使用 `add_intercepts_for_feature_matrix_titanic()` 函數。
- 使用 `split_train_valid_titanic()` 函數。
- 使用 `find_w_with_gradient_descent_titanic()` 函數。
- 使用 `predict_proba_titanic()` 函數。
- 使用 `np.argmax()` 函數。
- 將預期輸出寫在 `return` 之後。

In [9]:
def predict_survived(X_test: pd.core.frame.DataFrame) -> pd.core.frame.DataFrame:
    """
    >>> train, test = import_titanic()
    >>> X_test = test[["PassengerId", "Sex", "Age"]]
    >>> predict_survived(X_test)[:5]
       PassengerId  Survived
    0          892         0
    1          893         1
    2          894         0
    3          895         0
    4          896         1
    """
    ### BEGIN SOLUTION
    y, X = extract_target_array_feature_matrix_titanic()
    X_wrangled = wrangle_feature_matrix_titanic(X)
    X_wrangled = add_intercepts_for_feature_matrix_titanic(X_wrangled)
    X_train, X_valid, y_train, y_valid = split_train_valid_titanic(X_wrangled, y)
    w = find_w_with_gradient_descent_titanic(X_train, y_train)
    X_test_ndarray = X_test[["Sex", "Age"]].values
    X_test_wrangled = wrangle_feature_matrix_titanic(X_test_ndarray)
    X_test_wrangled = add_intercepts_for_feature_matrix_titanic(X_test_wrangled)
    y_hat_proba = predict_proba_titanic(X_test_wrangled, w)
    y_hat = np.argmax(y_hat_proba, axis=1)
    out_df = pd.DataFrame()
    out_df["PassengerId"] = X_test["PassengerId"].values
    out_df["Survived"] = y_hat
    return out_df
    ### END SOLUTION

In [10]:
class TestClassification(unittest.TestCase):
    def test_01_import_titanic(self):
        train, test = import_titanic()
        self.assertIsInstance(train, pd.core.frame.DataFrame)
        self.assertIsInstance(test, pd.core.frame.DataFrame)
        self.assertEqual(train.shape, (891, 12))
        self.assertEqual(test.shape, (418, 11))
    def test_02_extract_target_array_feature_matrix_titanic(self):
        y, X = extract_target_array_feature_matrix_titanic()
        self.assertEqual(y.shape, (891,))
        self.assertEqual(X.shape, (891, 2))
    def test_03_wrangle_feature_matrix_titanic(self):
        y, X = extract_target_array_feature_matrix_titanic()
        X_wrangled = wrangle_feature_matrix_titanic(X)
        self.assertEqual(np.unique(X_wrangled[:, 0]).size, 2)
        self.assertEqual(np.sum(np.isnan(X_wrangled[:, 1])), 0)
    def test_04_add_intercepts_for_feature_matrix_titanic(self):
        y, X = extract_target_array_feature_matrix_titanic()
        X_wrangled = wrangle_feature_matrix_titanic(X)
        X_wrangled = add_intercepts_for_feature_matrix_titanic(X_wrangled)
        self.assertEqual(X_wrangled.shape, (891, 3))
    def test_05_split_train_valid_titanic(self):
        y, X = extract_target_array_feature_matrix_titanic()
        X_wrangled = wrangle_feature_matrix_titanic(X)
        X_wrangled = add_intercepts_for_feature_matrix_titanic(X_wrangled)
        X_train, X_valid, y_train, y_valid = split_train_valid_titanic(X_wrangled, y)
        self.assertEqual(X_train.shape, (623, 3))
        self.assertEqual(X_valid.shape, (268, 3))
        self.assertEqual(y_train.shape, (623,))
        self.assertEqual(y_valid.shape, (268,))
    def test_06_find_w_with_gradient_descent_titanic(self):
        y, X = extract_target_array_feature_matrix_titanic()
        X_wrangled = wrangle_feature_matrix_titanic(X)
        X_wrangled = add_intercepts_for_feature_matrix_titanic(X_wrangled)
        X_train, X_valid, y_train, y_valid = split_train_valid_titanic(X_wrangled, y)
        w = find_w_with_gradient_descent_titanic(X_train, y_train)
        self.assertTrue(w[1] < 0)
        self.assertTrue(w[2] < 0)
    def test_07_predict_survived(self):
        train, test = import_titanic()
        X_test = test[["PassengerId", "Sex", "Age"]]
        survived = predict_survived(X_test)
        self.assertIsInstance(survived, pd.core.frame.DataFrame)
        self.assertEqual(survived.shape, (418, 2))
        self.assertTrue(survived["Survived"].nunique(), 2)
        self.assertEqual(survived["Survived"].max(), 1)
        self.assertEqual(survived["Survived"].min(), 0)
        
suite = unittest.TestLoader().loadTestsFromTestCase(TestClassification)
runner = unittest.TextTestRunner(verbosity=2)
test_results = runner.run(suite)
number_of_failures = len(test_results.failures)
number_of_errors = len(test_results.errors)
number_of_test_runs = test_results.testsRun
number_of_successes = number_of_test_runs - (number_of_failures + number_of_errors)

test_01_import_titanic (__main__.TestClassification) ... ok
test_02_extract_target_array_feature_matrix_titanic (__main__.TestClassification) ... ok
test_03_wrangle_feature_matrix_titanic (__main__.TestClassification) ... ok
test_04_add_intercepts_for_feature_matrix_titanic (__main__.TestClassification) ... ok
test_05_split_train_valid_titanic (__main__.TestClassification) ... ok
test_06_find_w_with_gradient_descent_titanic (__main__.TestClassification) ... 

epoch:      0 - loss: 4.676135
epoch:   5000 - loss: 0.519019
epoch:  10000 - loss: 0.515746
epoch:  15000 - loss: 0.515384
epoch:  20000 - loss: 0.515338
epoch:  25000 - loss: 0.515331
epoch:  30000 - loss: 0.515330
epoch:  35000 - loss: 0.515330
epoch:  40000 - loss: 0.515330
epoch:  45000 - loss: 0.515330


ok
test_07_predict_survived (__main__.TestClassification) ... 

epoch:      0 - loss: 5.903722
epoch:   5000 - loss: 0.520811
epoch:  10000 - loss: 0.515943
epoch:  15000 - loss: 0.515409
epoch:  20000 - loss: 0.515341
epoch:  25000 - loss: 0.515332
epoch:  30000 - loss: 0.515331
epoch:  35000 - loss: 0.515330
epoch:  40000 - loss: 0.515330
epoch:  45000 - loss: 0.515330


ok

----------------------------------------------------------------------
Ran 7 tests in 6.912s

OK


In [11]:
print("你在類別預測的任務的 {} 個問題中答對了 {} 題。".format(number_of_test_runs, number_of_successes))

你在類別預測的任務的 7 個問題中答對了 7 題。
