In [1]:
import cv2
import os
import pandas as pd
import pickle
import requests
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt

from sklearn.decomposition import PCA
from sklearn.decomposition import KernelPCA
from sklearn.manifold import TSNE 
from sklearn.preprocessing import StandardScaler

from sklearn.linear_model import LogisticRegression
from sklearn.cluster import KMeans
from sklearn import svm
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import AdaBoostRegressor
from xgboost import XGBRegressor
from xgboost import XGBClassifier
from xgboost import plot_importance

from sklearn.metrics import mean_squared_error
from sklearn.model_selection import RepeatedKFold
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import cross_val_predict

In [2]:
# 到資料區間路徑
os.chdir("taitra_all")

<h1 style="text-align:center"><b> Data processing </b></h1>
<h2 style="text-align:center"><b> If file already exist, pass this block </b></h2>

**data x**
- thumbnail contrast

**data y**
- views
- watchtimes
- impressions

**feature processing**
- clean：Nan to 0, Drop duplicates, Feature to array
- PCA
- T-SNE
- binary

In [None]:
df = pd.read_csv('表格資料_510.csv',encoding='utf-8-sig')
df = df.fillna(0)
df = df.drop(df.index[0])

columns = [
    "影片標題",
    "straight",
    "horizontal"
    ]

df.drop_duplicates(subset = columns, keep = 'first', inplace = True)
df.to_csv("表格資料_510_edit.csv", encoding = 'utf-8-sig',index = None)

<h1 style="text-align:center"><b>Download thumbnail & cal</b></h1>

計算
- 明暗對比度
- 色彩豐富度
- 前三大面積顏色
- WSGI contrast

In [None]:
def Download_thumbnail(file):
    """
    download youtube thumbnail by video id
    """
    df = pd.read_csv(file)
    ids = df["id"]

    for id in tqdm(ids):
        url = f"https://img.youtube.com/vi/{id}/maxresdefault.jpg"
        img = requests.get(url)
            
        with open(f"thumbnails/{id}.jpg","wb") as file:
            file.write(img.content)
        file.close()
    return

#     Download_thumbnail("表格資料_edit.csv")

## 灰階對比

In [None]:
def Thumbnail_contrast(file):
    """
    計算所有 thmbnails 的明暗對比度
    """
    df = pd.read_csv(file)
    ids = df["id"]
    path = "thumbnails/"

    contrasts = []
    for id in tqdm(ids):
        img = cv2.imread(f"{path}{id}.jpg")
        img_grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img_grey = img_grey[:,435:845]
        contrast = img_grey.std()
        contrasts.append(contrast)
        
    contrasts = pd.Series(contrasts)
    df["contrast"] = contrasts
    return df

df = Thumbnail_contrast("表格資料_edit.csv")
df.to_csv("表格資料_edit.csv",encoding = 'utf-8-sig',index = None)

In [None]:
# 移除無 thumbnails 資料
df = pd.read_csv("表格資料_edit.csv")
df = df.dropna(subset=['contrast'])
df.reset_index(drop= True, inplace=True)
df.to_csv("表格資料_edit_thumbnails.csv",encoding = 'utf-8-sig',index = None)

## 色彩豐富度計算
>"Measuring Colourfulness in Natural Images" by Hasler and Süsstrunk

In [None]:
def image_colorfulness(image):
    image = image[:,435:845]
    (B, G, R) = cv2.split(image.astype("float")) 
    
    #rg = R - G
    rg = np.absolute(R - G) 

    #yb = 0.5 * (R + G) - B
    yb = np.absolute(0.5 * (R + G) - B) 

    (rbMean, rbStd) = (np.mean(rg), np.std(rg)) 
    (ybMean, ybStd) = (np.mean(yb), np.std(yb)) 

    stdRoot = np.sqrt((rbStd ** 2) + (ybStd ** 2))
    meanRoot = np.sqrt((rbMean ** 2) + (ybMean ** 2)) 

    return stdRoot + (0.3 * meanRoot)

df = pd.read_csv("表格資料_edit_thumbnails.csv")
colorfulness = []
for id in tqdm(df["id"]):
    img = cv2.imread(f"thumbnails/{id}.jpg")
    colorfulness.append(image_colorfulness(img))

df["colorfulness"] = colorfulness
df.to_csv("表格資料_edit_thumbnails.csv", index=False)

## cv 取色

In [None]:
def kMeans_colors(file):
    """
    計算所有 thumbnail 的前五大面積顏色(HSV)以及面積比例
    """
    df = pd.read_csv(file)
    ids = df["id"]
    path = "thumbnails/"
    colors_all, proportion_all = [], []
    for id in tqdm(ids):
        image = cv2.imread(f"{path}{id}.jpg")
        image = image[:,435:845]
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = image.reshape((image.shape[0] * image.shape[1], 3))
        model = KMeans(3)
        model.fit(image)
        colors = model.cluster_centers_

        proportion = []
        for label in np.unique(model.labels_):
            count = 0
            for px_label in model.labels_:
                if label == px_label:
                    count += 1
            proportion.append(count/len(image))
        colors_all.append(colors.tolist())
        proportion_all.append(proportion)
        
    return colors_all, proportion_all
        
if __name__ == "__main__":
    colors, proportion = kMeans_colors("表格資料_edit_thumbnails.csv")
    df = pd.read_csv("表格資料_edit_thumbnails.csv")
    df["colors"], df["proportion"] = colors, proportion
    df.to_csv("表格資料_edit_thumbnails.csv",index=False)

## 顏色比例視覺化

In [None]:
def plot_colors(proportion, colors):
    bar = np.zeros((50, 300, 3), dtype = "uint8")
    startX = 0

    for (percent, color) in zip(proportion, colors):
        endX = startX + (percent * 300)
        cv2.rectangle(bar, (int(startX), 0), (int(endX), 50),
            color.astype("uint8").tolist(), -1)
        startX = endX
    return bar

def csvcolors_to_list(file):
    df = pd.read_csv(file)
    rgb_all = []
    for i in range(len(df)):
        rgb_list = []
        for rgb in eval(df.iloc[i]["colors"]):
            rgb_array = np.array((rgb[0],rgb[1],rgb[2]))
            rgb_list.append(rgb_array)
        rgb_all.append(rgb_list)
    return rgb_all

colors = csvcolors_to_list("表格資料_edit_thumbnails.csv")
df = pd.read_csv("表格資料_edit_thumbnails.csv")
for i  in range(len(colors)): 
    bar = plot_colors(eval(df.iloc[i]["proportion"]), colors[i])
    plt.figure()
    plt.axis("off")
    image = f"thumbnails/{df.iloc[i]['id']}.jpg"
    img = cv2.imread(image)[:,:,::-1]
    plt.imshow(img)
    plt.show()
    plt.imshow(bar)
    plt.show()

## 色相對比計算

In [None]:
import colorsys

def csvrgb_to_hsv(file):
    df = pd.read_csv(file)
    hsv_all = []
    for i in range(len(df)):
        hsv_list = []
        for rgb in eval(df.iloc[i]["colors"]):
            rgb_array = np.array((rgb[0]/255,rgb[1]/255,rgb[2]/255))
            hsv_array = colorsys.rgb_to_hsv(rgb_array[0], rgb_array[1], rgb_array[2])
            hsv_array = (hsv_array[0]*360, hsv_array[1]*100, hsv_array[2]*100)
            hsv_list.append(hsv_array)
        hsv_all.append(hsv_list)
    return hsv_all

df = pd.read_csv("表格資料_edit_thumbnails.csv")
hsv = csvrgb_to_hsv("表格資料_edit_thumbnails.csv")
df['hsv_colors'] = hsv
df.to_csv("表格資料_edit_thumbnails.csv",index=False)

In [None]:
def hue_contrast(hsv1, hsv2):
    angle1 = np.absolute(hsv1.h - hsv2.h)
    angle2 = 360 - angle1
    hue_diff = min(angle1,angle2)
    return hue_diff

df = pd.read_csv("表格資料_edit_thumbnails.csv")

df['hsv_diff'] = hsv_diff

## WSGI 對比計算

In [None]:
from typing import NamedTuple

def rgb_to_l(rgb):
    """
    relative brightness, normalized to 0 for darkest black and 1 for lightest white
    """
    lchannel = []
    for channel in rgb:
        schannel = (channel/255)
        lchannel.append(
            ((schannel+0.055)/1.055)**2.4 if schannel > 0.03928 else schannel/12.92
        )
    L = 0.2126 * lchannel[0] + 0.7152 * lchannel[1] + 0.0722 * lchannel[2]
    return L

def contrast_ratio(l1,l2):
    """
    Contrast ratios range from 1 to 21, L1 lighter, L2 darker
    """
    contrast = (l1 + 0.05) / (l2 + 0.05)
    return contrast


if __name__ == "__main__":
    rgb_list = csvcolors_to_list("表格資料_edit_thumbnails.csv")
    contrast_all = []
    for palette in rgb_list:
        l_list=[]
        for color in palette:
            l = rgb_to_l(color)
            l_list.append(l)
        contrast = contrast_ratio(max(l_list), min(l_list))
        contrast_all.append(contrast)

    df = pd.read_csv("表格資料_edit_thumbnails.csv")
    df['wcag_contrast'] = contrast_all
    df.to_csv("表格資料_edit_thumbnails.csv", index=False)

## 資料分區

In [None]:
df = pd.read_csv("表格資料_edit_thumbnails.csv")


data = {"interval_contrast":[10,20,30,40,50,60,70,80,90,100]}
ctr, views, watchtimes, impressions = [],[],[],[]

for i in range(0,100,10):
    mask = df["contrast"] <= i
    inv_ctr = np.mean(df[mask]["CTR"])
    inv_view = np.mean(df[mask]["views"])
    inv_watch = np.mean(df[mask]["watchtimes(hr)"])
    inv_impression = np.mean(df[mask]["impressions"])
    ctr.append(inv_ctr)
    views.append(inv_view)
    watchtimes.append(inv_watch)
    impressions.append(inv_impression)
data.update([("ctr",ctr),("views",views), ("watchtimes",watchtimes), ("impressions",impressions)])

data = pd.DataFrame(data)
data.to_csv("interval/interval_contrast.csv", index=False)

## Feature to array

In [3]:
df = pd.read_csv('表格資料_edit_thumbnails.csv',encoding='utf-8-sig')
# y
impressions = df.loc[:,"impressions"].values
views = df.loc[:,"views"].values
watchtimes = df.loc[:,"watchtimes(hr)"].values*3600
#x
contrast = df.loc[:,"contrast"].values.reshape((len(df),1))
wcag_contrast = df.loc[:,"wcag_contrast"].values.reshape((len(df),1))
colorful = df.loc[:,"colorfulness"].values.reshape((len(df),1))
bertscore = df.loc[:,"f1-score"].values.reshape((len(df),1))
product_encode = df.loc[:,"product_encode"].values.reshape((len(df),1))
form_encode = df.loc[:,"form_encode"].values.reshape((len(df),1))
thumbnail_text_len = df.loc[:,"thumbnail_text_len"].values.reshape((len(df),1))

features = df.loc[:,"contrast":].drop(columns=["colors","proportion","hsv_colors","form","p-score","r-score","product","form"])

print(contrast.shape,
      wcag_contrast.shape,
      colorful.shape,
      features.shape,
      impressions.shape,
      watchtimes.shape,
      views.shape)

(344, 1) (344, 1) (344, 1) (344, 7) (344,) (344,) (344,)


<h1 style="text-align:center"><b>ML</b></h1>

## PCA

In [None]:
def pca(features, n):
    pca = PCA(n_components = n, random_state=42)
    dw_features = pca.fit_transform(features)
    return dw_features

X_pca = pca(features, 2)
df = pd.DataFrame({"Feature_1":X_pca[:,0],"Feature_2":X_pca[:,1], "label":impressions})
df.plot(x="Feature_1", y="Feature_2", kind='scatter', c='label', colormap='viridis')

In [None]:
dw_features = KernelPCA(n_components=3, kernel='rbf').fit_transform(features)
df = pd.DataFrame({
            "Feature_1":dw_features[:,0],
            "Feature_2":dw_features[:,1], 
            "Feature_3":dw_features[:,2],
            "label":impressions
            })
fig = plt.figure(figsize=(10,6))
ax = fig.gca(projection = '3d')
x = df["Feature_1"]
y = df["Feature_2"]
z = df["Feature_3"]
ax.scatter(x, y, z, c = df["label"])
ax.legend()
plt.show()

## T-SNE

In [None]:
def tsne_2d(features, perplexity=50,plot=False):
    X_tsne = TSNE(n_components=2, perplexity=perplexity,random_state=42).fit_transform(features)
    if plot == True:
        df = pd.DataFrame({"Feature_1":X_tsne[:,0],"Feature_2":X_tsne[:,1], "label":impressions})
        df.plot(x="Feature_1", y="Feature_2", kind='scatter', c='label', colormap='viridis')
    return X_tsne

def tsne_3d(features, perplexity=50,plot=False):
    X_tsne = TSNE(n_components=3, perplexity=perplexity,random_state=42).fit_transform(features)
    if plot == True:
        df = pd.DataFrame({
            "Feature_1":X_tsne[:,0],
            "Feature_2":X_tsne[:,1], 
            "Feature_3":X_tsne[:,2],
            "label":impressions
            })
        fig = plt.figure(figsize=(10,6))
        ax = fig.gca(projection = '3d')
        x = df["Feature_1"]
        y = df["Feature_2"]
        z = df["Feature_3"]
        ax.scatter(x, y, z, c = df["label"])
        ax.legend()
        plt.show()
    return X_tsne
tsne_2d(features, perplexity=50,plot = True)
tsne_3d(features, perplexity=50,plot = True)

## binary

In [None]:
def label_binary(label):
    label_avg = sum(label)/len(label)
    label_bi = label.copy()
    label_bi[label_bi<label_avg] = 0
    label_bi[label_bi>=label_avg] = 1
    return label_bi

impressions_bi = label_binary(impressions)
views_bi = label_binary(views)
watchtimes_bi = label_binary(watchtimes)

## plot func

In [None]:
def plt3d(dw_feats,color):
    fig = plt.figure(figsize=(10,6))
    ax = fig.gca(projection = '3d')
    x = dw_feats[:,0]
    y = dw_feats[:,1]
    z = dw_feats[:,2]
    ax.scatter(x, y, z, c = color)
    ax.legend()
    plt.show()

def plt2d(dw_feats,color):
    plt.scatter(dw_feats[:, 0], dw_feats[:, 1], c = color)
    plt.show()

## **Model**
- K-Means
- Logistic Regression
- Decission tree
- SVM
- XGBoost

## K-Means
- <font color=white> 將無 target 的純特徵向量做 unsupervised learning, 觀察不同 k 值 clustering 結果 </font>

In [None]:
def cluster_label(n_cluster, label):
    for n in range(n_clusters):
        indexs = np.where(y_pred==n)
        for index in indexs:
            print(f"cluster {n} average: {np.mean(label[index])}")
    return
    
def draw_cluster_label(y_pred, label):
    plt.scatter(y_pred, label, c = y_pred)
    plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei'] 
    plt.rcParams['axes.unicode_minus'] = False
    plt.xlabel("分群編號")
    plt.ylabel("label")
    plt.show()
    return
    
for n_clusters in range(2,8):
#     dw_feats = tsne_2d(features)
    dw_feats = features
    kmeans = KMeans(n_clusters = n_clusters,  random_state=42)
    y_pred = kmeans.fit_predict(dw_feats)
    unique, counts = np.unique(y_pred[:], return_counts=True)
    print(dict(zip(unique, counts)))

    cluster_label(n_clusters, impressions)   
    draw_cluster_label(y_pred, impressions)
    # plt3d(dw_feats,y_pred)
    # plt2d(dw_feats,y_pred)

In [None]:
# import sys
# np.set_printoptions(threshold=sys.maxsize)
indexs = np.where(y_pred==2)

for index in indexs:
    for i in index:
        print(dw_feats.iloc[i], "impression:", impressions[i])
#     print((sum(watchtimes[index])-watchtimes[index[36]])/55)

## Logistic Regression

In [None]:
# X = pca(features, n = 3)
# X = tsne_3d(features)
X = features
scaler = StandardScaler()
scaler.fit(X)
X_std = scaler.transform(X)

model = LogisticRegression()
# define model evaluation method
cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=1)
# evaluate model
scores = cross_val_score(model, X, views_bi,  scoring='accuracy', cv=cv, n_jobs=-1)
print('Accuracy: %.3f (%.3f)' % (np.mean(scores), np.std(scores)))
for train_index, test_index in cv.split(X_std):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = views_bi[train_index], views_bi[test_index]
    model.fit(X_train, y_train)
    pridicted_tr = model.predict(X_train)
    pridicted_te = model.predict(X_test)
    print('訓練集: ',model.score(X_train,y_train))
    print('測試集: ',model.score(X_test,y_test))

## svm
<font color=white>將特徵向量與 target 做 svm regression</font>

In [None]:
# dw_feats = pca(features, n = 2)
# dw_feats = tsne_2d(features,50)
# dw_features = KernelPCA(n_components=2, kernel='rbf').fit_transform(features)
dw_feats = features
label = impressions

models = (svm.LinearSVR(C=15),
          svm.SVR(kernel='linear', C=15),
          svm.SVR(kernel='rbf', gamma=0.7, C=15),
          svm.SVR(kernel='poly', C=15))

models = (model.fit(dw_feats, impressions) for model in models)
# define model evaluation method
cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=1)
# evaluate model
for model in models:
    print(model)
    scores = cross_val_score(model, dw_feats, label, cv=cv)
    print("%0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std()))

    for train_index, test_index in cv.split(dw_feats):
        X_train, X_test = dw_feats.iloc[train_index], dw_feats.iloc[test_index]
        y_train, y_test = label[train_index], impressions[test_index]
        pred = model.predict(X_test)
    plt.scatter(pred,y_test,c='r', s=2)
    plt.xlabel('Predicted')
    plt.ylabel('Measured')
    plt.show()

In [None]:
# dw_feats = pca(features, n = 3)
# dw_feats = tsne_2d(features,50)
dw_feats = contrast
models = (svm.LinearSVC(C=10),
          svm.SVC(kernel='rbf', gamma=0.7, C=10),
          svm.SVC(kernel='poly', C=10))

models = (model.fit(dw_feats, views_bi) for model in models)
# define model evaluation method
cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=1)
# evaluate model
for model in models:
    print(model)
    scores = cross_val_score(model, dw_feats, views_bi, cv=cv)
    print("Accuracy(std)%0.2f(%0.2f)" % (scores.mean(), scores.std()))
    with open(f"model/{model}.pkl","wb") as file:
        pickle.dump(model,file)

#     for train_index, test_index in cv.split(dw_feats):
#         X_train, X_test = dw_feats[train_index], dw_feats[test_index]
#         y_train, y_test = views_bi[train_index], views_bi[test_index]
#         pred = model.predict(X_test)
#         plt.scatter(pred,y_test,c='r', s=2)
#         plt.xlabel('Predicted')
#         plt.ylabel('Measured')
#         plt.show()

## Decision tree

In [None]:
import os 
os.environ["PATH"] += os.pathsep + "C:\Users\x0933\taitraYT\Lib\site-packages\pygments\lexers\graphviz.py"

In [None]:
X = pca(features,3)
# X = tsne_2d(features,50)
# X = KernelPCA(n_components=3, kernel='rbf').fit_transform(features)
# X = contrast
dtr = DecisionTreeRegressor(max_features=1,random_state=42)

# define model evaluation method
cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=1)
# evaluate model
scores = cross_val_score(dtr, X, watchtimes, cv=cv)
print(scores)
print("%0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std()))

for train_index, test_index in cv.split(X):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = watchtimes[train_index], watchtimes[test_index]
    dtr.fit(X_train, y_train)
    pridicted_tr = dtr.predict(X_train)
    pridicted_te = dtr.predict(X_test)
    print('訓練集: ', dtr.score(X_train,y_train))
    print('測試集: ', dtr.score(X_test,y_test))
    plt.scatter(pridicted_te,y_test,c='r')
    plt.xlabel('Predicted')
    plt.ylabel('Measured')
    plt.show

In [None]:
from sklearn.tree import DecisionTreeClassifier
dw_feats = pca(features,5)
model = DecisionTreeClassifier()

# define model evaluation method
cv = RepeatedKFold(n_splits=7, n_repeats=3, random_state=1)
# evaluate model
scores = cross_val_score(model, dw_feats, views_bi, scoring = "accuracy", cv=cv)
print(scores)
print("%0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std()))


for train_index, test_index in cv.split(dw_feats):
    X_train, X_test = dw_feats[train_index], dw_feats[test_index]
    y_train, y_test = views_bi[train_index], views_bi[test_index]
    model.fit(X_train,y_train)
    pred = model.predict(X_test)
#     print(pred, y_test)

In [None]:
from sklearn.ensemble import BaggingClassifier
dw_feats = pca(features,5)
model = BaggingClassifier(n_estimators=500)

# define model evaluation method
cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=1)
# evaluate model
scores = cross_val_score(model, dw_feats, views_bi, scoring = "accuracy", cv=cv)
print(scores)
print("%0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std()))


for train_index, test_index in cv.split(dw_feats):
    X_train, X_test = dw_feats[train_index], dw_feats[test_index]
    y_train, y_test = views_bi[train_index], views_bi[test_index]
    model.fit(X_train,y_train)
    pred = model.predict(X_test)
#     print(pred, y_test)

## XGBoost
- <font color=white>xgbregression</font>
    - n_estimators: The number of trees in the ensemble, often increased until no further improvements are seen.
    - max_depth: The maximum depth of each tree, often values are between 1 and 10.
    - eta: The learning rate used to weight each model, often set to small values such as 0.3, 0.1, 0.01, or smaller.
    - subsample: The number of samples (rows) used in each tree, set to a value between 0 and 1, often 1.0 to use all samples.
    - colsample_bytree: Number of features (columns) used in each tree, set to a value between 0 and 1, often 1.0 to use all features.

In [None]:
#data
# X = KernelPCA(n_components=5, kernel='rbf').fit_transform(features)
# X = tsne_3d(features,50)
X = features
# scaler = StandardScaler()
# X = scaler.fit_transform(dw_feats)
# model
xgb = XGBClassifier(
    booster = "gbtree",
    max_depth = 50,
    eta = 0.1,
    subsample = 1,
    colsample_bytree = 1,
    eval_metric=mean_squared_error
)


# define model evaluation method
cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=1)
# evaluate model
scores = cross_val_score(xgb, X, views_bi, cv=cv)
print("%0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std()))

for train_index, test_index in cv.split(X):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = views_bi[train_index], views_bi[test_index]
    xgb.fit(X_train, y_train)
    pridicted = xgb.predict(X_test)
    print('訓練集: ',xgb.score(X_train,y_train))
    print('測試集: ',xgb.score(X_test,y_test))
    plt.scatter(pridicted,y_test,c='r')
    plt.xlabel('Predicted')
    plt.ylabel('Measured')
    plt.show()
    plot_importance(xgb)
    plt.show()