# a_2d_image_data.py

In [None]:
import os
import imageio.v2 as imageio  # 이미지 입출력을 위한 라이브러리
import torch  # 텐서 연산을 위한 라이브러리

In [None]:
# 이미지 파일을 읽어들임
img_arr = imageio.imread(os.path.join(os.path.pardir, os.path.pardir, "_00_data", "a_image-dog", "bobby.jpg"))
print(type(img_arr))  # 이미지 데이터 타입 출력
print(img_arr.shape)  # 이미지의 shape 출력
print(img_arr.dtype)  # 이미지의 데이터 타입 출력

# 이미지를 PyTorch 텐서로 변환
img = torch.from_numpy(img_arr)
# 채널 순서를 (높이, 너비, 채널)에서 (채널, 높이, 너비)로 변경
out = img.permute(2, 0, 1)
print(out.shape)  # 변환된 텐서 shape 출력

In [None]:
# 고양이 이미지가 있는 데이터 디렉토리 설정
data_dir = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "b_image-cats")
# PNG 이미지 파일 목록 추출
filenames = [
  name for name in os.listdir(data_dir) if os.path.splitext(name)[-1] == '.png'
]
print(filenames)  # 파일 이름 출력

from PIL import Image  # 이미지를 열기 위한 PIL 라이브러리

# 각 PNG 파일에 대해 이미지를 열고 shape, dtype 출력
for i, filename in enumerate(filenames):
  image = Image.open(os.path.join(data_dir, filename))  # 이미지 열기
  image.show()  # 이미지 표시
  img_arr = imageio.imread(os.path.join(data_dir, filename))  # 이미지 읽기
  print(img_arr.shape)  # 이미지의 shape 출력
  print(img_arr.dtype)  # 이미지의 데이터 타입 출력

# 배치 사이즈 설정 및 빈 텐서 생성 (3장의 이미지, 3채널, 256x256 크기)
batch_size = 3
batch = torch.zeros(batch_size, 3, 256, 256, dtype=torch.uint8)

# 각 이미지를 텐서로 변환 후 배치에 저장
for i, filename in enumerate(filenames):
  img_arr = imageio.imread(os.path.join(data_dir, filename))  # 이미지 읽기
  img_t = torch.from_numpy(img_arr)  # NumPy 배열을 텐서로 변환
  img_t = img_t.permute(2, 0, 1)  # 채널 순서 변경 (채널, 높이, 너비)
  batch[i] = img_t  # 배치에 이미지 저장

print(batch.shape)  # 배치 텐서 shape 출력


In [None]:
# 텐서를 float 타입으로 변환 후 정규화 (0-255 값을 0-1 범위로)
batch = batch.float()  # 텐서 타입을 float로 변환
batch /= 255.0  # 정규화
print(batch.dtype)  # 정규화된 텐서 데이터 타입 출력
print(batch.shape)  # 정규화된 배치 텐서의 shape 출력

# 각 채널의 평균과 표준편차 계산 후 정규화
n_channels = batch.shape[1]  # 채널 수 가져오기

for c in range(n_channels):
  mean = torch.mean(batch[:, c])  # 각 채널의 평균 계산
  std = torch.std(batch[:, c])  # 각 채널의 표준편차 계산
  print(mean, std)  # 평균과 표준편차 출력
  # 정규화: 각 채널의 평균을 빼고 표준편차로 나눔
  batch[:, c] = (batch[:, c] - mean) / std

# b_3d_image_data.py

In [None]:
# DICOM 이미지 파일이 있는 디렉토리 경로 설정
dir_path = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "c_volumetric-dicom", "2-LUNG_3.0_B70f-04083")
# DICOM 볼륨 데이터를 읽어들임
vol_array = imageio.volread(dir_path, format='DICOM')
print(type(vol_array))   # DICOM 데이터를 NumPy 배열로 출력
print(vol_array.shape)   # 볼륨 데이터의 shape 출력 (99, 512, 512): 슬라이스 개수와 이미지 크기
print(vol_array.dtype)   # 볼륨 데이터의 dtype 출력 (int16)
print(vol_array[0])  # 첫 번째 슬라이스 데이터 출력

In [None]:
import matplotlib.pyplot as plt  # 볼륨 데이터를 시각화하기 위한 라이브러리

# 99개의 슬라이스를 10x10 격자형으로 시각화
fig = plt.figure(figsize=(10, 10))
for id in range(0, 99):
  fig.add_subplot(10, 10, id + 1)  # 10x10 subplot 생성
  plt.imshow(vol_array[id])  # 각 슬라이스 이미지 출력
plt.show()  # 이미지를 화면에 표시

import torch  # 텐서 연산을 위한 PyTorch

# 볼륨 데이터를 텐서로 변환하고 float 타입으로 변경
vol = torch.from_numpy(vol_array).float()
vol = torch.unsqueeze(vol, 0)  # 채널 차원을 추가
vol = torch.unsqueeze(vol, 0)  # 배치 차원을 추가

print(vol.shape)  # 텐서의 shape 출력: torch.Size([1, 1, 99, 512, 512])


In [None]:
# 슬라이스 차원(3, 4)을 기준으로 평균과 표준편차 계산
mean = torch.mean(vol, dim=(3, 4), keepdim=True)  # 각 슬라이스의 평균 계산
print(mean.shape)  # 평균 텐서의 shape 출력: torch.Size([1, 1, 99, 1, 1])
std = torch.std(vol, dim=(3, 4), keepdim=True)  # 각 슬라이스의 표준편차 계산
print(std.shape)  # 표준편차 텐서의 shape 출력: torch.Size([1, 1, 99, 1, 1])

# 볼륨 데이터를 정규화 (mean=0, std=1)
vol = (vol - mean) / std
print(vol.shape)  # 정규화된 텐서의 shape 출력: torch.Size([1, 1, 99, 512, 512])

print(vol[0, 0, 0])  # 첫 번째 슬라이스의 정규화된 텐서 값 출력

# c_tabular_wine_data.py

In [None]:
import csv
import os
import numpy as np

In [None]:
# 와인 데이터 경로 설정 및 NumPy로 데이터 로드
wine_path = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "d_tabular-wine", "winequality-white.csv")
wineq_numpy = np.loadtxt(wine_path, dtype=np.float32, delimiter=";", skiprows=1)  # CSV 데이터 읽기
print(wineq_numpy.dtype)  # 데이터 타입 출력: float32
print(wineq_numpy.shape)  # 데이터 shape 출력: (4898, 12)
print(wineq_numpy)  # 데이터 출력
print()

# CSV 파일의 헤더(컬럼 이름) 추출
col_list = next(csv.reader(open(wine_path), delimiter=';'))
print(col_list)  # 컬럼 이름 리스트 출력
print()

In [None]:
import torch

# NumPy 배열을 PyTorch 텐서로 변환
wineq = torch.from_numpy(wineq_numpy)
print(wineq.dtype)  # 텐서 데이터 타입 출력: torch.float32
print(wineq.shape)  # 텐서 shape 출력: (4898, 12)
print()

# 데이터와 타겟 분리 (데이터는 마지막 열 제외, 타겟은 마지막 열)
data = wineq[:, :-1]  # 마지막 열을 제외한 데이터 부분
print(data.dtype)  # 데이터의 dtype 출력: torch.float32
print(data.shape)  # 데이터의 shape 출력: (4898, 11)
print(data)  # 데이터 출력
print()

target = wineq[:, -1]  # 마지막 열인 타겟 부분 선택
print(target.dtype)  # 타겟의 dtype 출력: torch.float32
print(target.shape)  # 타겟의 shape 출력: (4898,)
print(target)  # 타겟 출력
print()

target = target.long()  # 타겟을 정수형으로 변환
print(target.dtype)  # 변환된 타겟의 dtype 출력: torch.int64
print(target.shape)  # 변환된 타겟의 shape 출력: (4898,)
print(target)  # 변환된 타겟 출력
print()


In [None]:
# 타겟을 이용해 원-핫 인코딩 수행
eye_matrix = torch.eye(10)  # 크기가 10인 단위행렬 생성 (one-hot 인코딩에 사용)
onehot_target = eye_matrix[target]  # 타겟 값을 인덱스로 원-핫 벡터 생성

print(onehot_target.shape)  # 원-핫 인코딩된 타겟의 shape 출력: torch.Size([4898, 10])
print(onehot_target[0])  # 첫 번째 원-핫 벡터 출력
print(onehot_target[1])  # 두 번째 원-핫 벡터 출력
print(onehot_target[-2])  # 뒤에서 두 번째 원-핫 벡터 출력
print(onehot_target)  # 전체 원-핫 인코딩된 타겟 출력
print()

In [None]:
# 데이터의 평균과 분산을 구하고 정규화 수행
data_mean = torch.mean(data, dim=0)  # 각 피처의 평균 계산
data_var = torch.var(data, dim=0)  # 각 피처의 분산 계산
data = (data - data_mean) / torch.sqrt(data_var)  # 정규화 수행
print(data)  # 정규화된 데이터 출력
print()

In [None]:
from sklearn.model_selection import train_test_split

# 데이터셋을 훈련용/검증용으로 나누기
X_train, X_test, y_train, y_test = train_test_split(data, onehot_target, test_size=0.2)  # 데이터 분할

print(X_train.shape)  # 훈련용 데이터 shape 출력: torch.Size([3918, 11])
print(y_train.shape)  # 훈련용 타겟 shape 출력: torch.Size([3918, 10])
print(X_test.shape)  # 검증용 데이터 shape 출력: torch.Size([980, 11])
print(y_test.shape)  # 검증용 타겟 shape 출력: torch.Size([980, 10])
print()

# 와인 데이터를 로드하고 전처리하여 훈련/검증 데이터셋으로 분할하는 함수 정의
def get_wine_data():
  wine_path = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "d_tabular-wine", "winequality-white.csv")
  wineq_numpy = np.loadtxt(wine_path, dtype=np.float32, delimiter=";", skiprows=1)

  wineq = torch.from_numpy(wineq_numpy)

  data = wineq[:, :-1]  # 마지막 열을 제외한 데이터 부분
  target = wineq[:, -1].long()  # 타겟을 정수형으로 변환

  eye_matrix = torch.eye(10)
  onehot_target = eye_matrix[target]

  data_mean = torch.mean(data, dim=0)
  data_var = torch.var(data, dim=0)
  data = (data - data_mean) / torch.sqrt(data_var)

  X_train, X_valid, y_train, y_valid = train_test_split(data, onehot_target, test_size=0.2)

  return X_train, X_valid, y_train, y_valid  # 훈련용 및 검증용 데이터 반환

# d_tabular_california_housing.py

In [None]:
# https://medium.com/analytics-vidhya/implement-linear-regression-on-boston-housing-dataset-by-pytorch-c5d29546f938
# https://scikit-learn.org/stable/datasets/real_world.html#california-housing-dataset
# 캘리포니아 주택 가격 데이터셋을 로드하고 전처리하는 코드입니다.
import torch
from sklearn.datasets import fetch_california_housing

In [None]:
# 캘리포니아 주택 데이터셋 로드
housing = fetch_california_housing()
print(housing.keys())  # 데이터셋의 키 정보 출력

print(type(housing.data))  # 데이터 타입 출력: <class 'numpy.ndarray'>
print(housing.data.dtype)  # 데이터 타입 출력: float64
print(housing.data.shape)  # 데이터의 shape 출력: (20640, 8)
print(housing.feature_names)  # 특성 이름 출력

print(housing.target.shape)  # 타겟의 shape 출력: (20640,)
print(housing.target_names)  # 타겟 이름 출력: None (타겟 이름 없음)

In [None]:
import numpy as np

# 데이터의 최소값과 최대값 출력
print(housing.data.min(), housing.data.max())  # 데이터의 범위

# 데이터 평균 및 분산 계산 후 정규화
data_mean = np.mean(housing.data, axis=0)  # 각 특성의 평균
data_var = np.var(housing.data, axis=0)  # 각 특성의 분산
data = (housing.data - data_mean) / np.sqrt(data_var)  # 정규화
target = housing.target  # 타겟 값

# 정규화된 데이터의 최소값과 최대값 출력
print(data.min(), data.max())  # 정규화된 데이터의 범위

In [None]:
from sklearn.model_selection import train_test_split

# 데이터셋을 훈련용 및 테스트용으로 분할
X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=0.2)

# NumPy 배열을 PyTorch 텐서로 변환
X_train = torch.from_numpy(X_train).float()  # 훈련 데이터
X_test = torch.from_numpy(X_test).float()  # 테스트 데이터
y_train = torch.from_numpy(y_train).float()  # 훈련 타겟
y_test = torch.from_numpy(y_test).float()  # 테스트 타겟

# 훈련 데이터와 타겟의 shape 출력
print(X_train.shape)  # 훈련 데이터의 shape: torch.Size([16512, 8])
print(y_train.shape)  # 훈련 타겟의 shape: torch.Size([16512])

# 테스트 데이터와 타겟의 shape 출력
print(X_test.shape)  # 테스트 데이터의 shape: torch.Size([4128, 8])
print(y_test.shape)  # 테스트 타겟의 shape: torch.Size([4128])

# e_bikes_sharing_data.py

In [None]:
import os
import numpy as np
import torch

In [None]:
torch.set_printoptions(edgeitems=2, threshold=50, linewidth=75)  # 출력 설정

# 자전거 공유 데이터셋 경로 설정
bikes_path = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "e_time-series-bike-sharing-dataset", "hour-fixed.csv")

# CSV 파일에서 데이터를 로드
bikes_numpy = np.loadtxt(
  fname=bikes_path, dtype=np.float32, delimiter=",", skiprows=1,
  converters={
    1: lambda x: float(x[8:10])  # 날짜에서 시간을 추출하여 변환
  }
)
bikes = torch.from_numpy(bikes_numpy)  # NumPy 배열을 PyTorch 텐서로 변환
print(bikes.shape)  # 전체 데이터의 shape 출력

# 하루 단위로 데이터를 변환
daily_bikes = bikes.view(-1, 24, bikes.shape[1])
print(daily_bikes.shape)  # 하루 단위 데이터의 shape: torch.Size([730, 24, 17])

# 데이터와 타겟 분리
daily_bikes_data = daily_bikes[:, :, :-1]  # 마지막 열 제외 (타겟)
daily_bikes_target = daily_bikes[:, :, -1].unsqueeze(dim=-1)  # 타겟 열 추가 차원 추가

print(daily_bikes_data.shape)  # 데이터의 shape 출력
print(daily_bikes_target.shape)  # 타겟의 shape 출력

In [None]:
first_day_data = daily_bikes_data[0]  # 첫 날 데이터
print(first_day_data.shape)  # 첫 날 데이터의 shape 출력

# 날씨 상황을 long 타입으로 변환하여 출력
# 1: 맑음, 2: 안개, 3: 가벼운 비/눈, 4: 강한 비/눈
print(first_day_data[:, 9].long())  # 날씨 열 출력
eye_matrix = torch.eye(4)  # 단위 행렬 생성
print(eye_matrix)  # 단위 행렬 출력

# 날씨 정보를 one-hot 인코딩
weather_onehot = eye_matrix[first_day_data[:, 9].long() - 1]
print(weather_onehot.shape)  # 원-핫 인코딩된 날씨 정보의 shape 출력
print(weather_onehot)  # 원-핫 인코딩된 날씨 정보 출력

# 첫 날 데이터에 날씨 정보를 추가
first_day_data_torch = torch.cat(tensors=(first_day_data, weather_onehot), dim=1)
print(first_day_data_torch.shape)  # 결합된 데이터의 shape 출력
print(first_day_data_torch)  # 결합된 데이터 출력

In [None]:
day_data_torch_list = []  # 일일 데이터 리스트 초기화

# 각 일별로 데이터를 처리
for daily_idx in range(daily_bikes_data.shape[0]):  # 730일 반복
  day = daily_bikes_data[daily_idx]  # 하루 데이터
  weather_onehot = eye_matrix[day[:, 9].long() - 1]  # 날씨 정보 원-핫 인코딩
  day_data_torch = torch.cat(tensors=(day, weather_onehot), dim=1)  # 데이터 결합
  day_data_torch_list.append(day_data_torch)  # 리스트에 추가

print(len(day_data_torch_list))  # 리스트 길이 출력
daily_bikes_data = torch.stack(day_data_torch_list, dim=0)  # 리스트를 텐서로 변환
print(daily_bikes_data.shape)  # 최종 데이터의 shape 출력

In [None]:
# 특정 열들을 선택하여 shape 출력
print(daily_bikes_data[:, :, :9].shape, daily_bikes_data[:, :, 10:].shape)
# 'instant'와 'whethersit' 열을 제거하고 결합
daily_bikes_data = torch.cat(
  [daily_bikes_data[:, :, 1:9], daily_bikes_data[:, :, 10:]], dim=2
)  
print(daily_bikes_data.shape)  # 최종 데이터의 shape 출력

# 온도 데이터 정규화
temperatures = daily_bikes_data[:, :, 8]  # 온도 열 선택
daily_bikes_data[:, :, 8] = (daily_bikes_data[:, :, 8] - torch.mean(temperatures)) / torch.std(temperatures)  # 정규화

# f_hourly_bikes_sharing_data.py

In [None]:
import os
import numpy as np
import torch
from pathlib import Path

In [None]:
BASE_PATH = str(Path(os.getcwd()).resolve().parent.parent)
import sys
sys.path.append(BASE_PATH)

torch.set_printoptions(edgeitems=2, threshold=50, linewidth=75)

# 데이터 파일 경로 설정
bikes_path = os.path.join(BASE_PATH, "_00_data", "e_time-series-bike-sharing-dataset", "hour-fixed.csv")

# CSV 파일에서 데이터를 읽어 numpy 배열로 변환
bikes_numpy = np.loadtxt(
    fname=bikes_path, dtype=np.float32, delimiter=",", skiprows=1,
    converters={
        1: lambda x: float(x[8:10])  # 2011-01-07 --> 07 --> 7
    }
)

# numpy 배열을 PyTorch 텐서로 변환
bikes_data = torch.from_numpy(bikes_numpy).to(torch.float)
print(bikes_data.shape)    # >>> torch.Size([17520, 17])

# 타겟과 피쳐 분리
bikes_target = bikes_data[:, -1].unsqueeze(dim=-1)  # 'cnt'
bikes_data = bikes_data[:, :-1]   # >>> torch.Size([17520, 16])

# 날씨 데이터를 위한 단위 행렬 생성
eye_matrix = torch.eye(4)

data_torch_list = []
for idx in range(bikes_data.shape[0]):  # range(730)
    hour_data = bikes_data[idx]  # hour_data.shape: [17]
    weather_onehot = eye_matrix[hour_data[9].long() - 1]  # 날씨에 대한 원-핫 인코딩
    concat_data_torch = torch.cat(tensors=(hour_data, weather_onehot), dim=-1)
    # concat_data_torch.shape: [20]
    data_torch_list.append(concat_data_torch)

# 데이터를 스택하여 최종 데이터셋 생성
bikes_data = torch.stack(data_torch_list, dim=0)
bikes_data = torch.cat([bikes_data[:, 1:9], bikes_data[:, 10:]], dim=-1)  # 'instant' 및 'whethersit' 열 삭제

print(bikes_data.shape)
print(bikes_data[0])


In [None]:
# 시퀀스 크기 및 데이터 크기 설정
sequence_size = 24
validation_size = 96
test_size = 24
y_normalizer = 100

data_size = len(bikes_data) - sequence_size + 1
print("data_size: {0}".format(data_size))
train_size = data_size - (validation_size + test_size)
print("train_size: {0}, validation_size: {1}, test_size: {2}".format(train_size, validation_size, test_size))

In [None]:
# 훈련 데이터 생성
row_cursor = 0

X_train_list = []
y_train_regression_list = []
for idx in range(0, train_size):
  sequence_data = bikes_data[idx: idx + sequence_size]  # 시퀀스 데이터
  sequence_target = bikes_target[idx + sequence_size - 1]  # 시퀀스 타겟
  X_train_list.append(sequence_data)
  y_train_regression_list.append(sequence_target)
  row_cursor += 1

X_train = torch.stack(X_train_list, dim=0).to(torch.float)  # 훈련 데이터 텐서로 변환
y_train_regression = torch.tensor(y_train_regression_list, dtype=torch.float32) / y_normalizer  # 타겟 정규화

# 데이터 정규화
m = X_train.mean(dim=0, keepdim=True)
s = X_train.std(dim=0, keepdim=True)
X_train = (X_train - m) / s

print(X_train.shape, y_train_regression.shape)
# >>> torch.Size([17376, 24, 19]) torch.Size([17376])

In [None]:
# 검증 데이터셋 생성
X_validation_list = []
y_validation_regression_list = []
for idx in range(row_cursor, row_cursor + validation_size):
    sequence_data = bikes_data[idx: idx + sequence_size]  # 시퀀스 데이터
    sequence_target = bikes_target[idx + sequence_size - 1]  # 해당 시퀀스의 타겟
    X_validation_list.append(sequence_data)
    y_validation_regression_list.append(sequence_target)
    row_cursor += 1

# 검증 데이터 텐서로 변환
X_validation = torch.stack(X_validation_list, dim=0).to(torch.float)
y_validation_regression = torch.tensor(y_validation_regression_list, dtype=torch.float32) / y_normalizer

# 데이터 정규화
X_validation = (X_validation - m) / s

print(X_validation.shape, y_validation_regression.shape)
# >>> torch.Size([96, 24, 19]) torch.Size([96])

In [None]:
# 테스트 데이터셋 생성
X_test_list = []
y_test_regression_list = []
for idx in range(row_cursor, row_cursor + test_size):
    sequence_data = bikes_data[idx: idx + sequence_size]  # 시퀀스 데이터
    sequence_target = bikes_target[idx + sequence_size - 1]  # 해당 시퀀스의 타겟
    X_test_list.append(sequence_data)
    y_test_regression_list.append(sequence_target)
    row_cursor += 1

# 테스트 데이터 텐서로 변환
X_test = torch.stack(X_test_list, dim=0).to(torch.float)
y_test_regression = torch.tensor(y_test_regression_list, dtype=torch.float32) / y_normalizer

# 테스트 데이터 정규화
X_test = (X_test - m) / s  # 정규화를 올바르게 적용

print(X_test.shape, y_test_regression.shape)
# >>> torch.Size([24, 24, 18]) torch.Size([24])

# g_cryptocurrency_data.py

In [None]:
import pandas as pd
from pathlib import Path
import os
import torch
import matplotlib.pyplot as plt

In [None]:
# 기본 경로 설정
BASE_PATH = str(Path(os.getcwd()).resolve().parent.parent)
import sys
sys.path.append(BASE_PATH)

# 데이터 경로 설정
btc_krw_path = os.path.join(BASE_PATH, "_00_data", "k_cryptocurrency", "BTC_KRW.csv")
df = pd.read_csv(btc_krw_path)  # CSV 파일에서 데이터 읽기
print(df)

# 데이터 크기 및 열 정보 출력
row_size = len(df)
print("row_size:", row_size)
columns = df.columns  # ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
print([column for column in columns])
date_list = df['Date']  # 날짜 정보 저장
df = df.drop(columns=['Date'])  # 날짜 열 제거

print(df)

In [None]:
# 하이퍼파라미터 설정
sequence_size = 10  # 시퀀스 크기
validation_size = 100  # 검증 데이터 크기
test_size = 50  # 테스트 데이터 크기

# 데이터 크기 계산
data_size = row_size - sequence_size + 1
print("data_size: {0}".format(data_size))
train_size = data_size - (validation_size + test_size)  # 훈련 데이터 크기 계산
print("train_size: {0}, validation_size: {1}, test_size: {2}".format(train_size, validation_size, test_size))

In [None]:
# 훈련 데이터 준비
row_cursor = 0
y_normalizer = 1.0e7  # 가격 정규화 값

X_train_list = []
y_train_regression_list = []
y_train_classification_list = []
y_train_date = []

for idx in range(0, train_size):
    # 시퀀스 데이터 생성
    sequence_data = df.iloc[idx: idx + sequence_size].values  # 시퀀스 데이터
    X_train_list.append(torch.from_numpy(sequence_data))  # 텐서로 변환
    y_train_regression_list.append(df.iloc[idx + sequence_size - 1]["Close"])  # 회귀 레이블
    y_train_classification_list.append(
        1 if df.iloc[idx + sequence_size - 1]["Close"] >= df.iloc[idx + sequence_size - 2]["Close"] else 0  # 분류 레이블
    )
    y_train_date.append(date_list[idx + sequence_size - 1])  # 날짜 저장
    row_cursor += 1

# 텐서로 변환 및 정규화
X_train = torch.stack(X_train_list, dim=0).to(torch.float)
y_train_regression = torch.tensor(y_train_regression_list, dtype=torch.float32) / y_normalizer
y_train_classification = torch.tensor(y_train_classification_list, dtype=torch.int64)

m = X_train.mean(dim=0, keepdim=True)  # 평균
s = X_train.std(dim=0, keepdim=True)  # 표준편차
X_train -= m  # 평균으로 정규화
X_train /= s  # 표준편차로 정규화

print(X_train.shape, y_train_regression.shape, y_train_classification.shape)
print("Label - Start Date: {0} ~ End Date: {1}".format(y_train_date[0], y_train_date[-1]))

In [None]:
# 검증 데이터 준비
X_validation_list = []
y_validation_regression_list = []
y_validation_classification_list = []
y_validation_date = []

for idx in range(row_cursor, row_cursor + validation_size):
    sequence_data = df.iloc[idx: idx + sequence_size].values  # 시퀀스 데이터
    X_validation_list.append(torch.from_numpy(sequence_data))  # 텐서로 변환
    y_validation_regression_list.append(df.iloc[idx + sequence_size - 1]["Close"])  # 회귀 레이블
    y_validation_classification_list.append(
        1 if df.iloc[idx + sequence_size - 1]["Close"] >= df.iloc[idx + sequence_size - 2]["Close"] else 0  # 분류 레이블
    )
    y_validation_date.append(date_list[idx + sequence_size - 1])  # 날짜 저장
    row_cursor += 1

# 텐서로 변환 및 정규화
X_validation = torch.stack(X_validation_list, dim=0).to(torch.float)
y_validation_regression = torch.tensor(y_validation_regression_list, dtype=torch.float32) / y_normalizer
y_validation_classification = torch.tensor(y_validation_classification_list, dtype=torch.int64)

X_validation = (X_validation - m) / s  # 정규화

print(X_validation.shape, y_validation_regression.shape, y_validation_classification.shape)
print("Label - Start Date: {0} ~ End Date: {1}".format(y_validation_date[0], y_validation_date[-1]))

In [None]:
# 테스트 데이터 준비
X_test_list = []
y_test_regression_list = []
y_test_classification_list = []
y_test_date = []

for idx in range(row_cursor, row_cursor + test_size):
    sequence_data = df.iloc[idx: idx + sequence_size].values  # 시퀀스 데이터
    X_test_list.append(torch.from_numpy(sequence_data))  # 텐서로 변환
    y_test_regression_list.append(df.iloc[idx + sequence_size - 1]["Close"])  # 회귀 레이블
    y_test_classification_list.append(
        1 if df.iloc[idx + sequence_size - 1]["Close"] > df.iloc[idx + sequence_size - 2]["Close"] else 0  # 분류 레이블
    )
    y_test_date.append(date_list[idx + sequence_size - 1])  # 날짜 저장
    row_cursor += 1

# 텐서로 변환 및 정규화
X_test = torch.stack(X_test_list, dim=0).to(torch.float)
y_test_regression = torch.tensor(y_test_regression_list, dtype=torch.float32) / y_normalizer
y_test_classification = torch.tensor(y_test_classification_list, dtype=torch.int64)

X_test = (X_test - m) / s  # 정규화

print(X_test.shape, y_test_regression.shape, y_test_classification.shape)
print("Label - Start Date: {0} ~ End Date: {1}".format(y_test_date[0], y_test_date[-1]))

In [None]:
# 데이터 시각화
fig, ax = plt.subplots(1, figsize=(13, 7))
ax.plot(y_train_date, y_train_regression * y_normalizer, label="y_train_regression", linewidth=2)
ax.plot(y_validation_date, y_validation_regression * y_normalizer, label="y_validation", linewidth=2)
ax.plot(y_test_date, y_test_regression * y_normalizer, label="y_test", linewidth=2)
ax.set_ylabel('Bitcoin [KRW]', fontsize=14)
ax.set_xticks(ax.get_xticks()[::200])  # X축 레이블 조정
plt.ticklabel_format(style='plain', axis='y')
plt

# h_audio_data.py

In [None]:
import torch
import os
import scipy.io.wavfile as wavfile

In [None]:
# 오디오 파일 경로 설정
audio_1_path = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "f_audio-chirp", "1-100038-A-14.wav")
audio_2_path = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "f_audio-chirp", "1-100210-A-36.wav")

# WAV 파일을 읽어 샘플링 주파수와 파형 배열을 반환
freq_1, waveform_arr_1 = wavfile.read(audio_1_path)
print(freq_1)
print(type(waveform_arr_1))
print(len(waveform_arr_1))
print(waveform_arr_1)

# 두 번째 WAV 파일을 읽어 샘플링 주파수와 파형 배열을 반환
freq_2, waveform_arr_2 = wavfile.read(audio_2_path)

# 2개의 오디오 파일의 파형을 텐서 형태로 저장
waveform = torch.empty(2, 1, 220_500)  # 텐서 초기화
waveform[0, 0] = torch.from_numpy(waveform_arr_1).float()  # 첫 번째 오디오 파형
waveform[1, 0] = torch.from_numpy(waveform_arr_2).float()  # 두 번째 오디오 파형
print(waveform.shape)

In [None]:
from scipy import signal

# 입력 신호의 스펙트로그램을 계산
_, _, sp_arr_1 = signal.spectrogram(waveform_arr_1, freq_1)
_, _, sp_arr_2 = signal.spectrogram(waveform_arr_2, freq_2)

# NumPy 배열을 PyTorch 텐서로 변환
sp_1 = torch.from_numpy(sp_arr_1)
sp_2 = torch.from_numpy(sp_arr_2)
print(sp_1.shape)
print(sp_2.shape)

# 스펙트로그램을 텐서로 변환
sp_left_t = torch.from_numpy(sp_arr_1)
sp_right_t = torch.from_numpy(sp_arr_2)
print(sp_left_t.shape)
print(sp_right_t.shape)

# 두 개의 스펙트로그램을 스택하여 3차원 텐서 생성
sp_t = torch.stack((sp_left_t, sp_right_t), dim=0).unsqueeze(dim=0)  # 새로운 차원 추가
print(sp_t.shape)

# i_video_data.py

In [None]:
import torch
import os
import imageio

# 비디오 파일 경로 설정 (상위 디렉터리로부터 "_00_data/g_video-cockatoo/cockatoo.mp4" 경로로 설정)
video_path = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "g_video-cockatoo", "cockatoo.mp4")

# 비디오 파일 리더(reader) 생성
reader = imageio.get_reader(video_path)

# reader의 타입 출력
print(type(reader))

# 비디오 메타데이터 가져오기 (프레임 수, 해상도 등)
meta = reader.get_meta_data()
print(meta)

# 비디오의 각 프레임을 하나씩 가져와 torch 텐서로 변환
for i, frame in enumerate(reader):
  # 각 프레임을 NumPy 배열에서 torch 텐서로 변환하고 float 타입으로 캐스팅
  frame = torch.from_numpy(frame).float()  # frame.shape: [360, 480, 3] (360x480 해상도, 3개의 채널)
  print(i, frame.shape)   # 현재 프레임 번호와 텐서의 크기 출력

# 비디오의 채널 수와 프레임 수를 설정 (3채널, 총 529프레임)
n_channels = 3
n_frames = 529

# 빈 텐서 생성: (1, 529, 3, 480, 360) 형태로 비디오 데이터를 저장할 공간 할당
video = torch.empty(1, n_frames, n_channels, *meta['size'])  # (1, 529, 3, 480, 360)
print(video.shape)

# 다시 프레임을 하나씩 읽어와서 비디오 텐서에 저장
for i, frame in enumerate(reader):
  # NumPy 배열을 torch 텐서로 변환하고 float 타입으로 변경
  frame = torch.from_numpy(frame).float()       # frame.shape: [360, 480, 3]
  # 차원 순서를 변경 (기본적으로 [높이, 너비, 채널] 순서인 배열을 [채널, 너비, 높이]로 변환)
  frame = torch.permute(frame, dims=(2, 1, 0))  # frame.shape: [3, 480, 360]
  # 변환한 프레임을 비디오 텐서의 해당 위치에 저장
  video[0, i] = frame

# 비디오 텐서의 차원 순서를 변경: (1, 3, 529, 480, 360) -> (1, 3채널, 529프레임, 480, 360) 형태
video = video.permute(dims=(0, 2, 1, 3, 4))
print(video.shape)


# j_linear_regression_dataset_dataloader.py

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split


class LinearRegressionDataset(Dataset):
    def __init__(self, N=50, m=-3, b=2, *args, **kwargs):
        # N: 샘플 수 (예: 50)
        # m: 기울기
        # b: 절편
        super().__init__(*args, **kwargs)

        # N개의 랜덤 입력값 생성 (2차원)
        self.x = torch.rand(N, 2)
        # 잡음 추가 (0.2까지의 랜덤 값)
        self.noise = torch.rand(N) * 0.2
        self.m = m
        self.b = b
        # y 값 계산: y = mx + b + 잡음
        self.y = (torch.sum(self.x * self.m) + self.b + self.noise).unsqueeze(-1)

    def __len__(self):
        # 데이터셋의 샘플 수 반환
        return len(self.x)

    def __getitem__(self, idx):
        # 주어진 인덱스의 샘플 (x, y) 반환
        return self.x[idx], self.y[idx]

    def __str__(self):
        # 데이터셋의 정보 문자열 반환
        str = "Data Size: {0}, Input Shape: {1}, Target Shape: {2}".format(
            len(self.x), self.x.shape, self.y.shape
        )
        return str


if __name__ == "__main__":
    # LinearRegressionDataset 인스턴스 생성
    linear_regression_dataset = LinearRegressionDataset()

    # 데이터셋 정보 출력
    print(linear_regression_dataset)

    print("#" * 50, 1)

    # 데이터셋의 각 샘플 출력
    for idx, sample in enumerate(linear_regression_dataset):
        input, target = sample
        print("{0} - {1}: {2}".format(idx, input, target))

    # 데이터셋을 훈련, 검증, 테스트 세트로 분할
    train_dataset, validation_dataset, test_dataset = random_split(linear_regression_dataset, [0.7, 0.2, 0.1])

    print("#" * 50, 2)

    # 각 데이터셋의 샘플 수 출력
    print(len(train_dataset), len(validation_dataset), len(test_dataset))

    print("#" * 50, 3)

    # 훈련 데이터 로더 생성 (배치 사이즈 4, 섞기 활성화)
    train_data_loader = DataLoader(
        dataset=train_dataset,
        batch_size=4,
        shuffle=True
    )

    # 데이터 로더의 각 배치 출력
    for idx, batch in enumerate(train_data_loader):
        input, target = batch
        print("{0} - {1}: {2}".format(idx, input, target))


# k_2d_image_dataset_dataloader.py

In [None]:
import os
import torch
from PIL import Image
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms


class DogCat2DImageDataset(Dataset):
    def __init__(self):
        # 이미지 전처리 변환: 크기 조정 및 텐서 변환
        self.image_transforms = transforms.Compose([
            transforms.Resize(size=(256, 256)),  # 이미지 크기를 256x256으로 조정
            transforms.ToTensor()  # 이미지를 텐서로 변환
        ])

        # 개와 고양이 이미지가 저장된 디렉토리 경로 설정
        dogs_dir = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "a_image-dog")
        cats_dir = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "b_image-cats")

        # 이미지 파일 리스트 생성
        image_lst = [
            Image.open(os.path.join(dogs_dir, "bobby.jpg")),  # (1280, 720, 3)
            Image.open(os.path.join(cats_dir, "cat1.png")),  # (256, 256, 3)
            Image.open(os.path.join(cats_dir, "cat2.png")),  # (256, 256, 3)
            Image.open(os.path.join(cats_dir, "cat3.png"))   # (256, 256, 3)
        ]

        # 이미지를 전처리 및 텐서로 변환
        image_lst = [self.image_transforms(img) for img in image_lst]
        self.images = torch.stack(image_lst, dim=0)  # 변환된 이미지를 하나의 텐서로 쌓음

        # 0: "dog", 1: "cat"
        self.image_labels = torch.tensor([[0], [1], [1], [1]])  # 각 이미지에 대한 레이블 설정

    def __len__(self):
        # 데이터셋의 총 샘플 수 반환
        return len(self.images)

    def __getitem__(self, idx):
        # 주어진 인덱스의 이미지와 레이블 반환
        return self.images[idx], self.image_labels[idx]

    def __str__(self):
        # 데이터셋의 정보 문자열 반환
        str = "Data Size: {0}, Input Shape: {1}, Target Shape: {2}".format(
            len(self.images), self.images.shape, self.image_labels.shape
        )
        return str


if __name__ == "__main__":
    # DogCat2DImageDataset 인스턴스 생성
    dog_cat_2d_image_dataset = DogCat2DImageDataset()

    # 데이터셋 정보 출력
    print(dog_cat_2d_image_dataset)

    print("#" * 50, 1)

    # 데이터셋의 각 샘플 출력
    for idx, sample in enumerate(dog_cat_2d_image_dataset):
        input, target = sample
        print("{0} - {1}: {2}".format(idx, input.shape, target))

    # 데이터셋을 훈련과 테스트 세트로 분할
    train_dataset, test_dataset = random_split(dog_cat_2d_image_dataset, [0.7, 0.3])

    print("#" * 50, 2)

    # 각 데이터셋의 샘플 수 출력
    print(len(train_dataset), len(test_dataset))

    print("#" * 50, 3)

    # 훈련 데이터 로더 생성 (배치 사이즈 2, 섞기 활성화)
    train_data_loader = DataLoader(
        dataset=train_dataset,
        batch_size=2,
        shuffle=True
    )

    # 데이터 로더의 각 배치 출력
    for idx, batch in enumerate(train_data_loader):
        input, target = batch
        print("{0} - {1}: {2}".format(idx, input.shape, target))


# l_wine_dataset_dataloader.py

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split


class WineDataset(Dataset):
    def __init__(self):
        # CSV 파일 경로 설정
        wine_path = os.path.join(os.path.pardir, os.path.pardir, "_00_data", "d_tabular-wine", "winequality-white.csv")
        
        # CSV 파일을 NumPy 배열로 로드 (첫 번째 행은 헤더이므로 건너뜀)
        wineq_numpy = np.loadtxt(wine_path, dtype=np.float32, delimiter=";", skiprows=1)
        wineq = torch.from_numpy(wineq_numpy)  # NumPy 배열을 PyTorch 텐서로 변환

        # 데이터와 레이블 분리
        data = wineq[:, :-1]  # 모든 행과 마지막 열을 제외한 모든 열 선택
        data_mean = torch.mean(data, dim=0)  # 각 특성의 평균 계산
        data_var = torch.var(data, dim=0)  # 각 특성의 분산 계산
        self.data = (data - data_mean) / torch.sqrt(data_var)  # 정규화

        target = wineq[:, -1].long()  # 레이블을 정수형으로 변환
        eye_matrix = torch.eye(10)  # 10개의 클래스에 대한 단위 행렬 생성
        self.target = eye_matrix[target]  # 레이블을 원-핫 인코딩으로 변환

        # 데이터와 레이블의 길이가 같은지 확인
        assert len(self.data) == len(self.target)

    def __len__(self):
        # 데이터셋의 총 샘플 수 반환
        return len(self.data)

    def __getitem__(self, idx):
        # 주어진 인덱스의 데이터와 레이블 반환
        wine_feature = self.data[idx]
        wine_target = self.target[idx]
        return wine_feature, wine_target

    def __str__(self):
        # 데이터셋의 정보 문자열 반환
        str = "Data Size: {0}, Input Shape: {1}, Target Shape: {2}".format(
            len(self.data), self.data.shape, self.target.shape
        )
        return str


if __name__ == "__main__":
    # WineDataset 인스턴스 생성
    wine_dataset = WineDataset()

    # 데이터셋 정보 출력
    print(wine_dataset)

    print("#" * 50, 1)

    # 데이터셋의 각 샘플 출력
    for idx, sample in enumerate(wine_dataset):
        input, target = sample
        print("{0} - {1}: {2}".format(idx, input.shape, target.shape))

    # 데이터셋을 훈련, 검증, 테스트 세트로 분할
    train_dataset, validation_dataset, test_dataset = random_split(wine_dataset, [0.7, 0.2, 0.1])

    print("#" * 50, 2)

    # 각 데이터셋의 샘플 수 출력
    print(len(train_dataset), len(validation_dataset), len(test_dataset))

    print("#" * 50, 3)

    # 훈련 데이터 로더 생성 (배치 사이즈 32, 섞기 활성화, 마지막 배치 버리기)
    train_data_loader = DataLoader(
        dataset=train_dataset,
        batch_size=32,
        shuffle=True,
        drop_last=True  # 마지막 배치가 배치 크기보다 작으면 버림
    )

    # 데이터 로더의 각 배치 출력
    for idx, batch in enumerate(train_data_loader):
        input, target = batch
        print("{0} - {1}: {2}".format(idx, input.shape, target.shape))


# m_california_housing_dataset_dataloader.py

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split


class CaliforniaHousingDataset(Dataset):
    def __init__(self):
        # 사이킷런에서 캘리포니아 주택 데이터셋 로드
        from sklearn.datasets import fetch_california_housing
        housing = fetch_california_housing()
        
        # 데이터 평균과 분산 계산
        data_mean = np.mean(housing.data, axis=0)
        data_var = np.var(housing.data, axis=0)
        
        # 데이터 정규화 및 PyTorch 텐서로 변환
        self.data = torch.tensor((housing.data - data_mean) / np.sqrt(data_var), dtype=torch.float32)
        
        # 타겟 값도 PyTorch 텐서로 변환 (차원 추가)
        self.target = torch.tensor(housing.target, dtype=torch.float32).unsqueeze(dim=-1)

    def __len__(self):
        # 데이터셋의 총 샘플 수 반환
        return len(self.data)

    def __getitem__(self, idx):
        # 주어진 인덱스의 데이터와 타겟 반환
        sample_data = self.data[idx]
        sample_target = self.target[idx]
        return sample_data, sample_target

    def __str__(self):
        # 데이터셋의 정보 문자열 반환
        str = "Data Size: {0}, Input Shape: {1}, Target Shape: {2}".format(
            len(self.data), self.data.shape, self.target.shape
        )
        return str


if __name__ == "__main__":
    # CaliforniaHousingDataset 인스턴스 생성
    california_housing_dataset = CaliforniaHousingDataset()

    # 데이터셋 정보 출력
    print(california_housing_dataset)

    print("#" * 50, 1)

    # 데이터셋의 각 샘플 출력
    for idx, sample in enumerate(california_housing_dataset):
        input, target = sample
        print("{0} - {1}: {2}".format(idx, input.shape, target.shape))

    # 데이터셋을 훈련, 검증, 테스트 세트로 분할
    train_dataset, validation_dataset, test_dataset = random_split(california_housing_dataset, [0.7, 0.2, 0.1])

    print("#" * 50, 2)

    # 각 데이터셋의 샘플 수 출력
    print(len(train_dataset), len(validation_dataset), len(test_dataset))

    print("#" * 50, 3)

    # 훈련 데이터 로더 생성 (배치 사이즈 32, 섞기 활성화, 마지막 배치 버리기)
    train_data_loader = DataLoader(
        dataset=train_dataset,
        batch_size=32,
        shuffle=True,
        drop_last=True  # 마지막 배치가 배치 크기보다 작으면 버림
    )

    # 데이터 로더의 각 배치 출력
    for idx, batch in enumerate(train_data_loader):
        input, target = batch
        print("{0} - {1}: {2}".format(idx, input.shape, target.shape))


# n_time_series_dataset_dataloader.py

In [None]:
import os
from pathlib import Path

import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split

# BASE_PATH 설정: 데이터 파일을 찾기 위한 기준 경로
BASE_PATH = str(Path(os.getcwd()).resolve().parent.parent)
import sys
sys.path.append(BASE_PATH)


class BikesDataset(Dataset):
    def __init__(self, train=True, test_days=1):
        self.train = train  # 훈련 세트 여부
        self.test_days = test_days  # 테스트에 사용할 일수

        # 자전거 대여 데이터 파일 경로
        bikes_path = os.path.join(BASE_PATH, "_00_data", "e_time-series-bike-sharing-dataset", "hour-fixed.csv")

        # CSV 파일에서 데이터를 읽어옴
        bikes_numpy = np.loadtxt(
            fname=bikes_path, dtype=np.float32, delimiter=",", skiprows=1,
            converters={
                1: lambda x: float(x[8:10])  # 날짜에서 '07'을 가져와서 float으로 변환
            }
        )
        bikes = torch.from_numpy(bikes_numpy)

        # 데이터를 일별로 변환
        daily_bikes = bikes.view(-1, 24, bikes.shape[1])  # daily_bikes.shape: [730, 24, 17]
        self.daily_bikes_target = daily_bikes[:, :, -1].unsqueeze(dim=-1)  # 마지막 열을 타겟으로 설정

        self.daily_bikes_data = daily_bikes[:, :, :-1]  # 나머지 데이터를 피처로 설정
        eye_matrix = torch.eye(4)  # 날씨를 원-핫 인코딩하기 위한 단위 행렬

        # 데이터 전처리: 일별 데이터에 날씨 원-핫 인코딩 추가
        day_data_torch_list = []
        for daily_idx in range(self.daily_bikes_data.shape[0]):  # range(730)
            day = self.daily_bikes_data[daily_idx]  # 하루 데이터
            weather_onehot = eye_matrix[day[:, 9].long() - 1]  # 날씨에 대한 원-핫 인코딩
            day_data_torch = torch.cat(tensors=(day, weather_onehot), dim=1)  # 피처 결합
            day_data_torch_list.append(day_data_torch)

        self.daily_bikes_data = torch.stack(day_data_torch_list, dim=0)

        # 9번째 열과 10번째 열 사이의 데이터를 결합
        self.daily_bikes_data = torch.cat(
            [self.daily_bikes_data[:, :, :9], self.daily_bikes_data[:, :, 10:]], dim=2
        )

        # 훈련 데이터와 타겟을 설정
        total_length = len(self.daily_bikes_data)
        self.train_bikes_data = self.daily_bikes_data[:total_length - test_days]
        self.train_bikes_targets = self.daily_bikes_target[:total_length - test_days]

        # 온도 정규화
        train_temperatures = self.train_bikes_data[:, :, 9]
        train_temperatures_mean = torch.mean(train_temperatures)
        train_temperatures_std = torch.std(train_temperatures)
        self.train_bikes_data[:, :, 9] = \
            (self.train_bikes_data[:, :, 9] - train_temperatures_mean) / train_temperatures_std

        assert len(self.train_bikes_data) == len(self.train_bikes_targets)

        # 테스트 데이터 설정
        self.test_bikes_data = self.daily_bikes_data[-test_days:]
        self.test_bikes_targets = self.daily_bikes_target[-test_days:]

        # 테스트 데이터의 온도 정규화
        self.test_bikes_data[:, :, 9] = \
            (self.test_bikes_data[:, :, 9] - train_temperatures_mean) / train_temperatures_std

        assert len(self.test_bikes_data) == len(self.test_bikes_targets)

    def __len__(self):
        # 데이터셋 길이 반환
        return len(self.train_bikes_data) if self.train else len(self.test_bikes_data)

    def __getitem__(self, idx):
        # 인덱스에 따라 데이터와 타겟 반환
        bike_feature = self.train_bikes_data[idx] if self.train else self.test_bikes_data[idx]
        bike_target = self.train_bikes_targets[idx] if self.train else self.test_bikes_targets[idx]
        return bike_feature, bike_target

    def __str__(self):
        # 데이터셋의 정보 문자열 반환
        if self.train:
            str = "Data Size: {0}, Input Shape: {1}, Target Shape: {2}".format(
                len(self.train_bikes_data), self.train_bikes_data.shape, self.train_bikes_targets.shape
            )
        else:
            str = "Data Size: {0}, Input Shape: {1}, Target Shape: {2}".format(
                len(self.test_bikes_data), self.test_bikes_data.shape, self.test_bikes_targets.shape
            )
        return str


if __name__ == "__main__":
    # 훈련 데이터셋 인스턴스 생성
    train_bikes_dataset = BikesDataset(train=True, test_days=1)
    print(train_bikes_dataset)

    print("#" * 50, 1)

    # 훈련 데이터셋과 검증 데이터셋으로 분할
    train_dataset, validation_dataset = random_split(train_bikes_dataset, [0.8, 0.2])

    print("[TRAIN]")
    for idx, sample in enumerate(train_dataset):
        input, target = sample
        print("{0} - {1}: {2}".format(idx, input.shape, target.shape))

    # 훈련 데이터 로더 생성
    train_data_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True, drop_last=True)

    for idx, batch in enumerate(train_data_loader):
        input, target = batch
        print("{0} - {1}: {2}".format(idx, input.shape, target.shape))

    print("#" * 50, 2)

    print("[VALIDATION]")
    for idx, sample in enumerate(validation_dataset):
        input, target = sample
        print("{0} - {1}: {2}".format(idx, input.shape, target.shape))

    # 검증 데이터 로더 생성
    validation_data_loader = DataLoader(dataset=validation_dataset, batch_size=32)

    for idx, batch in enumerate(validation_data_loader):
        input, target = batch
        print("{0} - {1}: {2}".format(idx, input.shape, target.shape))

    print("#" * 50, 3)

    # 테스트 데이터셋 인스턴스 생성
    test_dataset = BikesDataset(train=False, test_days=1)
    print(test_dataset)

    print("[TEST]")
    for idx, sample in enumerate(test_dataset):
        input, target = sample
        print("{0} - {1}: {2}".format(idx, input.shape, target.shape))

    # 테스트 데이터 로더 생성
    test_data_loader = DataLoader(dataset=test_dataset, batch_size=len(test_dataset))

    for idx, batch in enumerate(test_data_loader):
        input, target = batch
        print("{0} - {1}: {2}".format(idx, input.shape, target.shape))


# o_hourly_bikes_sharing_dataset_dataloader.py

In [None]:
import os
from pathlib import Path

import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split

BASE_PATH = str(Path(os.getcwd()).resolve().parent.parent)
import sys
sys.path.append(BASE_PATH)


class HourlyBikesDataset(Dataset):
  def __init__(self, X, y):
    self.X = X
    self.y = y

    assert len(self.X) == len(self.y)

  def __len__(self):
    return len(self.X)

  def __getitem__(self, idx):
    X = self.X[idx]
    y = self.y[idx]
    return X, y

  def __str__(self):
    str = "Data Size: {0}, Input Shape: {1}, Target Shape: {2}".format(
      len(self.X), self.X.shape, self.y.shape
    )
    return str


def get_hourly_bikes_data(sequence_size=24, validation_size=96, test_size=24, y_normalizer=100):
  bikes_path = os.path.join(BASE_PATH, "_00_data", "e_time-series-bike-sharing-dataset", "hour-fixed.csv")

  bikes_numpy = np.loadtxt(
    fname=bikes_path, dtype=np.float32, delimiter=",", skiprows=1,
    converters={
      1: lambda x: float(x[8:10])  # 2011-01-07 --> 07 --> 7
    }
  )
  bikes_data = torch.from_numpy(bikes_numpy).to(torch.float) # >>> torch.Size([17520, 17])
  bikes_target = bikes_data[:, -1].unsqueeze(dim=-1)  # 'cnt'
  bikes_data = bikes_data[:, :-1]  # >>> torch.Size([17520, 16])

  eye_matrix = torch.eye(4)

  data_torch_list = []
  for idx in range(bikes_data.shape[0]):  # range(730)
    hour_data = bikes_data[idx]  # day.shape: [24, 17]
    weather_onehot = eye_matrix[hour_data[9].long() - 1]
    concat_data_torch = torch.cat(tensors=(hour_data, weather_onehot), dim=-1)  # day_torch.shape: [24, 21]
    data_torch_list.append(concat_data_torch)

  bikes_data = torch.stack(data_torch_list, dim=0)
  bikes_data = torch.cat([bikes_data[:, 1:9], bikes_data[:, 10:]], dim=-1)
  print(bikes_data.shape, "!!!")  # >>> torch.Size([17520, 18])

  data_size = len(bikes_data) - sequence_size
  train_size = data_size - (validation_size + test_size)

  #################################################################################################

  row_cursor = 0

  X_train_list = []
  y_train_regression_list = []
  for idx in range(0, train_size):
    sequence_data = bikes_data[idx: idx + sequence_size]
    sequence_target = bikes_target[idx + sequence_size - 1]
    X_train_list.append(sequence_data)
    y_train_regression_list.append(sequence_target)
    row_cursor += 1

  X_train = torch.stack(X_train_list, dim=0).to(torch.float)
  y_train_regression = torch.tensor(y_train_regression_list, dtype=torch.float32) / y_normalizer

  m = X_train.mean(dim=0, keepdim=True)
  s = X_train.std(dim=0, keepdim=True)
  X_train = (X_train - m) / s

  #################################################################################################

  X_validation_list = []
  y_validation_regression_list = []
  for idx in range(row_cursor, row_cursor + validation_size):
    sequence_data = bikes_data[idx: idx + sequence_size]
    sequence_target = bikes_target[idx + sequence_size - 1]
    X_validation_list.append(sequence_data)
    y_validation_regression_list.append(sequence_target)
    row_cursor += 1

  X_validation = torch.stack(X_validation_list, dim=0).to(torch.float)
  y_validation_regression = torch.tensor(y_validation_regression_list, dtype=torch.float32) / y_normalizer

  X_validation -= m
  X_validation /= s
  #################################################################################################

  X_test_list = []
  y_test_regression_list = []
  for idx in range(row_cursor, row_cursor + test_size):
    sequence_data = bikes_data[idx: idx + sequence_size]
    sequence_target = bikes_target[idx + sequence_size - 1]
    X_test_list.append(sequence_data)
    y_test_regression_list.append(sequence_target)
    row_cursor += 1

  X_test = torch.stack(X_test_list, dim=0).to(torch.float)
  y_test_regression = torch.tensor(y_test_regression_list, dtype=torch.float32) / y_normalizer

  X_test -= m
  X_test /= s

  return (
    X_train, X_validation, X_test,
    y_train_regression, y_validation_regression, y_test_regression
  )


if __name__ == "__main__":
  X_train, X_validation, X_test, y_train, y_validation, y_test = get_hourly_bikes_data(
    sequence_size=24, validation_size=96, test_size=24, y_normalizer=100
  )

  print("Train: {0}, Validation: {1}, Test: {2}".format(len(X_train), len(X_validation), len(X_test)))

  train_hourly_bikes_dataset = HourlyBikesDataset(X=X_train, y=y_train)
  validation_hourly_bikes_dataset = HourlyBikesDataset(X=X_validation, y=y_validation)
  test_houly_bikes_dataset = HourlyBikesDataset(X=X_test, y=y_test)

  train_data_loader = DataLoader(
    dataset=train_hourly_bikes_dataset, batch_size=32, shuffle=True, drop_last=True
  )

  # for idx, batch in enumerate(train_data_loader):
  #   input, target = batch
  #   print("{0} - {1}: {2}, {3}".format(idx, input.shape, target.shape, target))

# p_cryptocurrency_dataset_dataloader.py

In [None]:
# https://towardsdatascience.com/cryptocurrency-price-prediction-using-deep-learning-70cfca50dd3a
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
import os
import torch
import pandas as pd
import numpy as np

BASE_PATH = str(Path(os.getcwd()).resolve().parent.parent)
import sys
sys.path.append(BASE_PATH)


class CryptoCurrencyDataset(Dataset):
  def __init__(self, X, y, is_regression=True):
    self.X = X
    self.y = y

    assert len(self.X) == len(self.y)

  def __len__(self):
    return len(self.X)

  def __getitem__(self, idx):
    X = self.X[idx]
    y = self.y[idx]
    return X, y

  def __str__(self):
    str = "Data Size: {0}, Input Shape: {1}, Target Shape: {2}".format(
      len(self.X), self.X.shape, self.y.shape
    )
    return str


def get_cryptocurrency_data(
    sequence_size=10, validation_size=100, test_size=10, target_column='Close', y_normalizer=1.0e7, is_regression=True
):
  btc_krw_path = os.path.join(BASE_PATH, "_00_data", "k_cryptocurrency", "BTC_KRW.csv")
  df = pd.read_csv(btc_krw_path)
  row_size = len(df)
  # ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
  date_list = df['Date']

  df = df.drop(columns=['Date'])

  data_size = row_size - sequence_size
  train_size = data_size - (validation_size + test_size)
  #################################################################################################

  row_cursor = 0

  X_train_list = []
  y_train_regression_list = []
  y_train_classification_list = []
  y_train_date = []
  for idx in range(0, train_size):
    sequence_data = df.iloc[idx: idx + sequence_size].values  # sequence_data.shape: (sequence_size, 5)
    X_train_list.append(torch.from_numpy(sequence_data))
    y_train_regression_list.append(df.iloc[idx + sequence_size][target_column])
    y_train_classification_list.append(
      1 if df.iloc[idx + sequence_size][target_column] >= df.iloc[idx + sequence_size - 1][target_column] else 0
    )
    y_train_date.append(date_list[idx + sequence_size])
    row_cursor += 1

  X_train = torch.stack(X_train_list, dim=0).to(torch.float)
  y_train_regression = torch.tensor(y_train_regression_list, dtype=torch.float32) / y_normalizer
  y_train_classification = torch.tensor(y_train_classification_list, dtype=torch.int64)

  m = X_train.mean(dim=0, keepdim=True)
  s = X_train.std(dim=0, keepdim=True)
  X_train = (X_train - m) / s

  #################################################################################################

  X_validation_list = []
  y_validation_regression_list = []
  y_validation_classification_list = []
  y_validation_date = []
  for idx in range(row_cursor, row_cursor + validation_size):
    sequence_data = df.iloc[idx: idx + sequence_size].values  # sequence_data.shape: (sequence_size, 5)
    X_validation_list.append(torch.from_numpy(sequence_data))
    y_validation_regression_list.append(df.iloc[idx + sequence_size][target_column])
    y_validation_classification_list.append(
      1 if df.iloc[idx + sequence_size][target_column] >= df.iloc[idx + sequence_size - 1][target_column] else 0
    )
    y_validation_date.append(date_list[idx + sequence_size])
    row_cursor += 1

  X_validation = torch.stack(X_validation_list, dim=0).to(torch.float)
  y_validation_regression = torch.tensor(y_validation_regression_list, dtype=torch.float32) / y_normalizer
  y_validation_classification = torch.tensor(y_validation_classification_list, dtype=torch.int64)

  X_validation = (X_validation - m) / s
  #################################################################################################

  X_test_list = []
  y_test_regression_list = []
  y_test_classification_list = []
  y_test_date = []
  for idx in range(row_cursor, row_cursor + test_size):
    sequence_data = df.iloc[idx: idx + sequence_size].values  # sequence_data.shape: (sequence_size, 5)
    X_test_list.append(torch.from_numpy(sequence_data))
    y_test_regression_list.append(df.iloc[idx + sequence_size][target_column])
    y_test_classification_list.append(
      1 if df.iloc[idx + sequence_size][target_column] > df.iloc[idx + sequence_size - 1][target_column] else 0
    )
    y_test_date.append(date_list[idx + sequence_size])
    row_cursor += 1

  X_test = torch.stack(X_test_list, dim=0).to(torch.float)
  y_test_regression = torch.tensor(y_test_regression_list, dtype=torch.float32) / y_normalizer
  y_test_classification = torch.tensor(y_test_classification_list, dtype=torch.int64)

  X_test = (X_test - m) / s

  if is_regression:
    return (
      X_train, X_validation, X_test,
      y_train_regression, y_validation_regression, y_test_regression,
      y_train_date, y_validation_date, y_test_date
    )
  else:
    return (
      X_train, X_validation, X_test,
      y_train_classification, y_validation_classification, y_test_classification,
      y_train_date, y_validation_date, y_test_date
    )


if __name__ == "__main__":
  is_regression = False

  X_train, X_validation, X_test, y_train, y_validation, y_test, y_train_date, y_validation_date, y_test_date \
    = get_cryptocurrency_data(
    sequence_size=10, validation_size=100, test_size=10,
    target_column='Close', y_normalizer=1.0e7, is_regression=is_regression
  )

  train_crypto_currency_dataset = CryptoCurrencyDataset(X=X_train, y=y_train, is_regression=is_regression)
  validation_crypto_currency_dataset = CryptoCurrencyDataset(X=X_validation, y=y_validation, is_regression=is_regression)
  test_crypto_currency_dataset = CryptoCurrencyDataset(X=X_test, y=y_test, is_regression=is_regression)

  train_data_loader = DataLoader(
    dataset=train_crypto_currency_dataset,
    batch_size=32,
    shuffle=True,
    drop_last=True
  )

  for idx, batch in enumerate(train_data_loader):
    input, target = batch
    print("{0} - {1}: {2}, {3}".format(idx, input.shape, target.shape, target))

# 숙제 후기

이번 "_03_real_world_data_to_tensors" 과제를 통해 실제 데이터를 텐서로 변환시키는 것을 경험했습니다. 다양한 데이터셋을 다루며, 데이터 전처리와 정규화의 중요성을 깊이 이해할 수 있었습니다. PyTorch의 Dataset과 DataLoader 클래스를 활용하여 데이터 로딩과 배치 처리를 효과적으로 수행해보았고. 실습을 통해 데이터의 형태와 구조를 변경하는 기술도 익히고, 이를 통해 머신러닝 모델의 학습 데이터 준비 과정의 핵심을 깨달았습니다.