## 機械学習へ

In [18]:
import numpy as np
import pandas as pd
import json
import os
import lightgbm as lgb
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler

global counter
counter = 0

In [19]:
json_filepath = './feat_data_json_dir/20201126_1546_0_y_feat_data.json'
with open(json_filepath, "r") as f:
  feats = json.load(f)
keys_list = list(feats.keys())

# リストのキーを表示
print(keys_list)


['20201126_155300_018765.jpg', '20201126_155218_033996.jpg', '20201126_164100_032500.jpg', '20201126_160240_037988.jpg', '20201126_162919_036648.jpg', '20201126_161758_027066.jpg', '20201126_161740_011806.jpg', '20201126_160839_018956.jpg', '20201126_155019_025173.jpg', '20201126_163839_018358.jpg', '20201126_163259_019057.jpg', '20201126_161238_040682.jpg', '20201126_160219_062845.jpg', '20201126_161718_009737.jpg', '20201126_160718_040489.jpg', '20201126_164139_018871.jpg', '20201126_155038_034146.jpg', '20201126_163519_034434.jpg', '20201126_160259_015641.jpg', '20201126_164159_018595.jpg', '20201126_164218_010918.jpg', '20201126_162038_010647.jpg', '20201126_160139_014125.jpg', '20201126_155718_036002.jpg', '20201126_155040_034004.jpg', '20201126_163920_028528.jpg', '20201126_161200_027719.jpg', '20201126_163120_026592.jpg', '20201126_163719_037656.jpg', '20201126_155338_034212.jpg', '20201126_162940_012989.jpg', '20201126_154800_017375.jpg', '20201126_164518_010675.jpg', '20201126

In [20]:
# def load_and_preprocess_data(file_path):
#   """CSVを読み込んで前処理する関数。"""
#   df = pd.read_csv(file_path)
#   df = df.drop(["timestamp"], axis=1)
#   return df.dropna()

# def get_data_from_directory(directory_path):
#   """指定ディレクトリのCSV全部を読み込んで結合する関数。"""
#   files = [f for f in os.listdir(directory_path) if f.endswith(".csv")]
#   data_frames = [
#       load_and_preprocess_data(os.path.join(directory_path, file)) for file in files
#   ]
#   return pd.concat(data_frames, ignore_index=True)

def evaluate_model(model, X, y):
  """モデルの評価。RMSE"""
  predictions = model.predict(X)
  return np.sqrt(mean_squared_error(y, predictions))


def printResults(rmse_results, title, if_print=True):
  mean_rmse = np.mean(rmse_results)
  median_rmse = np.median(rmse_results)
  variance_rmse = np.var(rmse_results)
  std_rmse = np.std(rmse_results)
  min_rmse = np.min(rmse_results)
  max_rmse = np.max(rmse_results)

  out = ""
  out += f"[{title}]RMSEの平均値: {mean_rmse}\n"
  # out += f"[{title}]RMSEの中央値: {median_rmse}\n"
  # out += f"[{title}]RMSEの分散: {variance_rmse}\n"
  # out += f"[{title}]RMSEの標準偏差: {std_rmse}\n"
  out += f"[{title}]RMSEの最小値: {min_rmse}\n"
  out += f"[{title}]RMSEの最大値: {max_rmse}\n"

  if if_print:
    print(out)
  return out

  # print(f"[{title}]RMSEの平均値: {mean_rmse}")
  # # print(f"[{title}]RMSEの中央値: {median_rmse}")
  # # print(f"[{title}]RMSEの分散: {variance_rmse}")
  # # print(f"[{title}]RMSEの標準偏差: {std_rmse}")
  # print(f"[{title}]RMSEの最小値: {min_rmse}")
  # print(f"[{title}]RMSEの最大値: {max_rmse}")

In [21]:
from sklearn.model_selection import KFold


def load_and_preprocess_data_save_timestamp(file_path):
  """CSVを読み込んで前処理する関数。"""
  df = pd.read_csv(file_path)
  return df.dropna()


def extract_image_name(timestamp):
  return (
      timestamp.replace("-", "").replace(" ", "_").replace(":", "").replace(".", "_")
      + ".jpg"
  )


def train_lightgbm_full(X, y):
  """Train LightGBM model on the full training data."""
  train_data = lgb.Dataset(X, label=y)
  params = {"objective": "regression", "metric": "rmse", "verbose": -1}
  model = lgb.train(params, train_data)
  return model


def evaluate_on_test(model, X_test, y_test):
  """Evaluate model on the test data."""
  predictions = model.predict(X_test)
  return np.sqrt(mean_squared_error(y_test, predictions))


def shuffle_dataframe(df):
  return df.sample(frac=1.0).reset_index(drop=True)


def evaluate_with_kfold(X, y, n_splits=5):
  kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
  rmses = []

  for train_index, test_index in kf.split(X):
    X_train, X_test = X.iloc[train_index], X.iloc[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]

    model = train_lightgbm_full(X_train, y_train)
    rmse = evaluate_on_test(model, X_test, y_test)
    rmses.append(rmse)

  return rmses


def add_eye_area_to_df(df, eye_areas):
  eye_areas_sum = []

  for _, row in df.iterrows():
    image_name = extract_image_name(row["timestamp"])
    eye_area = eye_areas.get(image_name, {})
    left_eye_area = eye_area.get("left_eye_area")
    right_eye_area = eye_area.get("right_eye_area")
    if left_eye_area is not None and right_eye_area is not None:
      try:
        eye_areas_sum.append(float(left_eye_area) + float(right_eye_area))
      except ValueError:
        eye_areas_sum.append(np.nan)
    else:
      eye_areas_sum.append(np.nan)
  df["eye_area_sum"] = eye_areas_sum
  return df


def add_feat_data_to_df(df, feat_data):
  au01_list = []
  au02_list = []
  happiness_list = []

  for _, row in df.iterrows():
    image_name = extract_image_name(row["timestamp"])
    feat = feat_data.get(image_name, {})
    au01 = feat.get('aus', {}).get('AU01', {}).get('0')
    au02 = feat.get('aus', {}).get('AU02', {}).get('0')
    happiness = feat.get('emotions', {}).get('happiness', {}).get('0')
    au01_list.append(au01 if au01 is not None else np.nan)
    au02_list.append(au02 if au02 is not None else np.nan)
    happiness_list.append(happiness if happiness is not None else np.nan)
  df['AU01'] = au01_list
  df['AU02'] = au02_list
  df['Happiness'] = happiness_list
  return df

In [22]:
def print_dropped_rows_column_values(df, column_name):
  rows_to_drop = df[df.isna().any(axis=1)]
  column_values = rows_to_drop[column_name]
  # print(f"Values in '{column_name}' column of the rows that would be dropped:")
  # print(column_values)
  print(f"Rows to drop: {len(rows_to_drop)}")
  print(f"image_file_names")
  for i in column_values:
    print(extract_image_name(i))

def solve_one_subject_feat_ver(
    csv_filepath, json_filepath, target, descrive=False, data_name=""
):
  # 特徴量を定義
  global counter
  """ 
  df['AU01'] = au01_list
  df['AU02'] = au02_list
  df['Happiness'] = happiness_list
  """
  features = [
      "m_speed",
      "m_speed_var_480",
      "m_speed_stddev_480",
      "m_acceleration",
      "m_acceleration_var_480",
      "m_acceleration_stddev_480",
      "m_jerk",
      "m_jerk_var_480",
      "m_jerk_stddev_480",
      "oss",
      "perclos",
      "Sleepiness",
      "AU01",
      "AU02",
      "Happiness"
  ]
  outStr = "train,test同時に正規化\n"
  data = load_and_preprocess_data_save_timestamp(csv_filepath)
  with open(json_filepath, "r") as f:
    feats = json.load(f)
  #data = add_eye_area_to_df(data, eye_areas)#TODO
  data = add_feat_data_to_df(data, feats)
  # 正規化
  scaler = MinMaxScaler()
  data[features] = scaler.fit_transform(data[features])

  # print_dropped_rows_column_values(data, "timestamp")
  prevLen = len(data)
  data = data.dropna()
  data = shuffle_dataframe(data)
  newLen = len(data)
  print(f"dropped {prevLen - newLen} rows")
  counter += newLen

  # X = data[list(filter(lambda x: x != target, features))]
  X = data[features]
  Y_target = data[target]
  if descrive:
    print(data.describe())

  target_kfold_rmses_norm = evaluate_with_kfold(X, Y_target)
  outStr += printResults(
      target_kfold_rmses_norm, f"lgbm-{target}-normalized-kfold-single-sbj", False
  )
  return {"out": outStr, "t": target_kfold_rmses_norm}

In [23]:
DATA_DIR = "dms_data_single"
EYE_DATA_DIR = "eye_data_json_dir"
FEAT_DATA_DIR = "feat_data_json_dir"

file_names = [
    "20201126_1546_0_y",
    "20201127_1432_7_y",
    "20201127_1548_2_y",
    "20201127_1701_7_y",
    "20201127_1840_5_y",
    "20201130_1122_5_y",
    "20201130_1808_6_y",
    "20201201_1230_0_y",
    "20201201_1429_5_y",
    "20201201_1555_0_y",
    "20201203_1022_7_y",
    "20201203_1244_5_y",
    "20201203_1404_6_y",
    "20201210_1112_2_y",
    "20201210_1354_2_y",
    "20201210_1610_6_y",
]

VERBOSE = False

In [24]:
target = "Sleepiness"

target_rmses = []
for data_name in file_names:
  if VERBOSE:
    print(data_name)
  csv_filename = f"{data_name}.csv"
  json_filename = f"{data_name}_feat_data.json"
  csv_filepath = os.path.join(DATA_DIR, csv_filename)
  json_filepath = os.path.join(FEAT_DATA_DIR, json_filename)
  # two file is required
  if not os.path.exists(csv_filepath) or not os.path.exists(json_filepath):
    continue
  result = solve_one_subject_feat_ver(csv_filepath, json_filepath, target, data_name=data_name)
  if VERBOSE:
    print(result["out"])
  target_rmses.append(result["t"])

print(printResults(target_rmses, f"lgbm-target-normalized-kfold-all-sbj"))

print(counter)

dropped 0 rows
dropped 0 rows
dropped 0 rows
dropped 0 rows
dropped 0 rows
dropped 0 rows
dropped 7 rows
dropped 0 rows
dropped 0 rows
dropped 0 rows
dropped 1 rows
dropped 0 rows
dropped 10 rows
[lgbm-target-normalized-kfold-all-sbj]RMSEの平均値: 0.05893485737146603
[lgbm-target-normalized-kfold-all-sbj]RMSEの最小値: 0.014124064270361718
[lgbm-target-normalized-kfold-all-sbj]RMSEの最大値: 0.17602900328144047

[lgbm-target-normalized-kfold-all-sbj]RMSEの平均値: 0.05893485737146603
[lgbm-target-normalized-kfold-all-sbj]RMSEの最小値: 0.014124064270361718
[lgbm-target-normalized-kfold-all-sbj]RMSEの最大値: 0.17602900328144047

1816


## AUなどの情報をJSONに書き出せるようにする

In [25]:
# import os
# import json
# import numpy as np
# from pathlib import Path
# from feat import Detector


# def calc_feat_in_images(images_directory):
#   feats = {}
#   face_model = "retinaface"
#   landmark_model = "mobilefacenet"
#   au_model = "svm"
#   au_model = 'xgb'
#   emotion_model = "resmasknet"
#   facepose_model = "img2pose"
#   detector = Detector(
#       face_model=face_model,
#       landmark_model=landmark_model,
#       au_model=au_model,
#       emotion_model=emotion_model,
#       facepose_model=facepose_model,
#   )
#   for image_name in os.listdir(images_directory):
#     if image_name.endswith('.jpg'):
#       image_path = os.path.join(images_directory, image_name)
#       try:
#         result = detector.detect_image(image_path)
#         feat = {
#             "aus": result.aus.to_dict(),
#             "emotions": result.emotions.to_dict(),
#             "facepose": result.facepose.to_dict(),
#             "facebox": result.facebox.to_dict(),
#             "landmarks": result.landmarks.to_dict(),
#         }
#         feats[image_name] = feat
#       except Exception as e:
#         feats[image_name] = str(e)
#   return feats

# def calc_feat_in_dir(for_ml_directory):
#   results = {}
#   for dir_name in os.listdir(for_ml_directory):
#     images_directory = os.path.join(for_ml_directory, dir_name, 'images')
#     if os.path.isdir(images_directory):
#       feat_data = calc_feat_in_images(images_directory)
#       results[dir_name] = feat_data
#   return results

# def save_feats_to_json(feats, filepath):
#   with open(filepath, 'w') as fp:
#     json.dump(feats, fp)

In [26]:
# from pathlib import Path
# import json



# def save_data_to_json(eye_areas, filepath):
#   with open(filepath, 'w') as fp:
#     json.dump(eye_areas, fp)


# def load_data_from_json(filepath):
#   try:
#     with open(filepath, 'r') as fp:
#       return json.load(fp)
#   except FileNotFoundError:
#     return None

# def process_images_in_directory(directory_path):
#   subdir_name = Path(directory_path).name
#   json_filename = f"{subdir_name}_feat_data.json"
#   results = calc_feat_in_images(Path.joinpath(Path(directory_path), 'images'))
#   save_data_to_json(results, json_filename)


# def process_all_directories(base_directory):
#   for subdir in Path(base_directory).iterdir():
#     if subdir.is_dir():
#       print(f"Processing directory: {subdir}")
#       process_images_in_directory(str(subdir))

# base_directory = 'ForMachineLearning'
# process_all_directories(base_directory)