In [None]:
import pandas as pd
import glob
import os

# 폴더 내의 .gz.parquet 파일을 검색합니다.
files = glob.glob('*.gz.parquet')
files

In [3]:
import pandas as pd
import glob
import os
import gc  
from tqdm import tqdm

def get_file_size(file):
    return os.path.getsize(file)

# 폴더 리스트
folders = ['company', 'estimates', 'exchangerate', 'financials', 'keydev', 
           'market', 'ownership', 'professional', 'transcript']

# 최대 파일 크기 설정 (예: 1GB)
MAX_FILE_SIZE = 1 * 1024 * 1024 * 1024  # 이 값을 조정하여 최대 파일 크기를 설정할 수 있습니다.

# 파일 리스트와 그 크기 가져오기
files_with_size = []
for folder in folders:
    file = glob.glob(f'{folder}/*.gz.parquet')[0]
    size = get_file_size(file)
    files_with_size.append((folder, file, size))

# 파일 크기 순으로 정렬
files_with_size.sort(key=lambda x: x[2])

# 각 파일을 처리
for folder, file, size in tqdm(files_with_size, desc="Processing files"):
    # 파일 크기가 최대 크기보다 크면 건너뛰기
    if size > MAX_FILE_SIZE:
        print(f"The file {file} is too large ({size} bytes). Skipping.")
        continue

    # Parquet 파일명
    parquet_filename = f"{folder}.parquet"

    # 이미 해당 이름의 Parquet 파일이 존재하는 경우 건너뛰기
    if os.path.exists(parquet_filename):
        print(f"{parquet_filename} already exists. Skipping.")
        continue
    
    # .gz.parquet 파일을 데이터프레임으로 읽기
    df = pd.read_parquet(file)
    print(f"{folder} 파일 읽기 완료")
    
    
    # 데이터프레임을 Parquet 파일로 저장
    df.to_parquet(parquet_filename)
    print(f"Saved {parquet_filename}")
    
    # 메모리 해제
    del df
    
    # 가비지 컬렉션 실행
    gc.collect()


Processing files:   0%|          | 0/9 [00:00<?, ?it/s]

다 읽었다


Processing files:  11%|█         | 1/9 [00:00<00:06,  1.15it/s]

Saved exchangerate.parquet
다 읽었다


Processing files:  22%|██▏       | 2/9 [00:03<00:14,  2.07s/it]

Saved ownership.parquet
다 읽었다


Processing files:  33%|███▎      | 3/9 [00:15<00:40,  6.68s/it]

Saved estimates.parquet
다 읽었다


Processing files:  44%|████▍     | 4/9 [00:17<00:22,  4.51s/it]

Saved professional.parquet
다 읽었다
Saved keydev.parquet


Processing files:  56%|█████▌    | 5/9 [00:36<00:39,  9.76s/it]

다 읽었다
Saved company.parquet


Processing files: 100%|██████████| 9/9 [02:14<00:00, 14.93s/it]

The file market\part-00000-tid-2679257485173306022-ebf58120-261f-486c-92af-e0b6cf6a33c5-14782-1-c000.gz.parquet is too large (1341743847 bytes). Skipping.
The file financials\part-00000-tid-3926431628187713411-a790f6d3-fba9-4590-86bb-309ca16e8e12-539-1-c000.gz.parquet is too large (2282087173 bytes). Skipping.
The file transcript\part-00000-tid-8297069538010164701-6b6a55d9-6263-40bd-811e-5da1c1553e6d-378-1-c000.gz.parquet is too large (2733575788 bytes). Skipping.





In [1]:
import pandas as pd
import numpy as np
import pyarrow
import gc
import time
import glob
import sys
import humanize
import math
import psutil
import gc
import simplejson
import skimage
import skimage.measure
from timeit import timeit
from time import sleep
from pyarrow.parquet import ParquetFile
import pyarrow
import pyarrow.parquet as pq
import signal
from contextlib import contextmanager

pd.set_option('display.max_columns',   500)
pd.set_option('display.max_colwidth',  None)

In [6]:
import os
import dask.dataframe as dd
import time
import psutil
import gc
from tqdm import tqdm

# 현재 메모리 사용량을 반환하는 함수
def get_memory_usage():
    return psutil.Process().memory_info().rss / 1024**2  # MB 단위

# 파일 이름에서 확장자를 제외한 이름 추출
file_path = "transcript.parquet"
base_name = os.path.basename(file_path)
folder_name, _ = os.path.splitext(base_name)

# 해당 이름으로 폴더 생성
if not os.path.exists(folder_name):
    os.makedirs(folder_name)

# Dask로 데이터의 열 정보만 불러오기
df_meta = dd.read_parquet(file_path, engine='fastparquet')

overall_start_time = time.time()  # 전체 작업 시작 시간 저장

# 각 열에 대해 고유한 값들을 계산하고 파일로 저장 (tqdm으로 진행률 표시)
for column in tqdm(df_meta.columns, desc="Processing columns"):
    start_time = time.time()  # 해당 열 처리 시작 시간 저장
    
    # 해당 열만 불러오기
    df_column = dd.read_parquet(file_path, columns=[column], engine='fastparquet')
    
    # 해당 열의 고유한 값들을 계산
    unique_values = df_column[column].unique().compute()
    
    # 고유한 값들을 폴더 내 파일로 저장
    filename = os.path.join(folder_name, f"unique_{folder_name}_{column}.csv")
    unique_values.to_csv(filename, index=False)
    
    elapsed_time = time.time() - start_time  # 해당 열 처리 소요 시간 계산
    memory_usage = get_memory_usage()  # 현재 메모리 사용량
    
    print(f"Completed column: {column}")
    print(f"Memory usage: {memory_usage:.2f} MB")
    print(f"Time taken: {elapsed_time:.2f} seconds")
    print("======================================")
    
    # 메모리 정리
    del df_column, unique_values
    gc.collect()  # 가비지 콜렉션 수행

overall_elapsed_time = time.time() - overall_start_time  # 전체 작업 소요 시간 계산
print(f"Total time taken for all columns: {overall_elapsed_time:.2f} seconds")

Processing columns:   0%|          | 0/24 [00:00<?, ?it/s]

Processing columns:   4%|▍         | 1/24 [00:00<00:15,  1.52it/s]

Completed column: companyId
Memory usage: 256.19 MB
Time taken: 0.61 seconds


Processing columns:   8%|▊         | 2/24 [00:12<02:34,  7.04s/it]

Completed column: companyName
Memory usage: 258.80 MB
Time taken: 11.45 seconds


Processing columns:  12%|█▎        | 3/24 [00:21<02:48,  8.00s/it]

Completed column: tickerSymbol
Memory usage: 258.39 MB
Time taken: 9.11 seconds


Processing columns:  17%|█▋        | 4/24 [00:29<02:44,  8.24s/it]

Completed column: exchangeSymbol
Memory usage: 258.39 MB
Time taken: 8.55 seconds


Processing columns:  21%|██        | 5/24 [00:38<02:41,  8.50s/it]

Completed column: ISOCode
Memory usage: 259.37 MB
Time taken: 8.90 seconds


Processing columns:  25%|██▌       | 6/24 [00:39<01:45,  5.84s/it]

Completed column: pricingDate
Memory usage: 258.66 MB
Time taken: 0.65 seconds


Processing columns:  29%|██▉       | 7/24 [00:40<01:10,  4.13s/it]

Completed column: exDate
Memory usage: 258.66 MB
Time taken: 0.55 seconds


Processing columns:  33%|███▎      | 8/24 [00:40<00:48,  3.02s/it]

Completed column: payDate
Memory usage: 258.66 MB
Time taken: 0.58 seconds


Processing columns:  38%|███▊      | 9/24 [00:41<00:34,  2.28s/it]

Completed column: recordDate
Memory usage: 258.67 MB
Time taken: 0.56 seconds


Processing columns:  42%|████▏     | 10/24 [00:42<00:24,  1.77s/it]

Completed column: announcedDate
Memory usage: 258.67 MB
Time taken: 0.56 seconds


Processing columns:  46%|████▌     | 11/24 [00:45<00:28,  2.22s/it]

Completed column: priceClose
Memory usage: 276.93 MB
Time taken: 3.20 seconds


Processing columns:  50%|█████     | 12/24 [00:54<00:53,  4.48s/it]

Completed column: volume
Memory usage: 327.05 MB
Time taken: 9.60 seconds


Processing columns:  54%|█████▍    | 13/24 [00:55<00:36,  3.35s/it]

Completed column: adjustmentFactor
Memory usage: 258.93 MB
Time taken: 0.70 seconds


Processing columns:  58%|█████▊    | 14/24 [01:06<00:54,  5.50s/it]

Completed column: VWAP
Memory usage: 330.53 MB
Time taken: 10.39 seconds


Processing columns:  62%|██████▎   | 15/24 [01:07<00:37,  4.19s/it]

Completed column: divAdjFactor
Memory usage: 262.26 MB
Time taken: 1.11 seconds


Processing columns:  67%|██████▋   | 16/24 [01:43<01:50, 13.82s/it]

Completed column: marketCap
Memory usage: 560.68 MB
Time taken: 36.13 seconds


Processing columns:  71%|███████   | 17/24 [02:22<02:29, 21.38s/it]

Completed column: TEV
Memory usage: 577.32 MB
Time taken: 38.88 seconds


Processing columns:  75%|███████▌  | 18/24 [02:24<01:32, 15.49s/it]

Completed column: sharesOutstanding
Memory usage: 268.50 MB
Time taken: 1.73 seconds


Processing columns:  79%|███████▉  | 19/24 [02:28<00:59, 12.00s/it]

Completed column: divFreqTypeName
Memory usage: 259.45 MB
Time taken: 3.79 seconds


Processing columns:  83%|████████▎ | 20/24 [02:32<00:38,  9.58s/it]

Completed column: supplementalTypeName
Memory usage: 261.26 MB
Time taken: 3.89 seconds


Processing columns:  88%|████████▊ | 21/24 [02:35<00:23,  7.88s/it]

Completed column: dividendTypeName
Memory usage: 260.45 MB
Time taken: 3.85 seconds


Processing columns:  92%|█████████▏| 22/24 [02:36<00:11,  5.76s/it]

Completed column: divAmount
Memory usage: 262.43 MB
Time taken: 0.77 seconds


Processing columns:  96%|█████████▌| 23/24 [02:37<00:04,  4.26s/it]

Completed column: netAmount
Memory usage: 261.52 MB
Time taken: 0.72 seconds


Processing columns: 100%|██████████| 24/24 [02:38<00:00,  6.60s/it]

Completed column: grossAmount
Memory usage: 262.33 MB
Time taken: 0.77 seconds
Total time taken for all columns: 158.43 seconds





# Groupby

In [2]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 32,Total memory: 31.15 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:5335,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 32
Started: Just now,Total memory: 31.15 GiB

0,1
Comm: tcp://127.0.0.1:5390,Total threads: 4
Dashboard: http://127.0.0.1:5393/status,Memory: 3.89 GiB
Nanny: tcp://127.0.0.1:5338,
Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-ki7noho2,Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-ki7noho2

0,1
Comm: tcp://127.0.0.1:5373,Total threads: 4
Dashboard: http://127.0.0.1:5375/status,Memory: 3.89 GiB
Nanny: tcp://127.0.0.1:5339,
Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-rpayy00g,Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-rpayy00g

0,1
Comm: tcp://127.0.0.1:5381,Total threads: 4
Dashboard: http://127.0.0.1:5385/status,Memory: 3.89 GiB
Nanny: tcp://127.0.0.1:5340,
Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-gze5yjfu,Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-gze5yjfu

0,1
Comm: tcp://127.0.0.1:5380,Total threads: 4
Dashboard: http://127.0.0.1:5383/status,Memory: 3.89 GiB
Nanny: tcp://127.0.0.1:5341,
Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-iza_kdas,Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-iza_kdas

0,1
Comm: tcp://127.0.0.1:5397,Total threads: 4
Dashboard: http://127.0.0.1:5399/status,Memory: 3.89 GiB
Nanny: tcp://127.0.0.1:5342,
Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-iedgsrsj,Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-iedgsrsj

0,1
Comm: tcp://127.0.0.1:5374,Total threads: 4
Dashboard: http://127.0.0.1:5378/status,Memory: 3.89 GiB
Nanny: tcp://127.0.0.1:5343,
Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-xy61ydu1,Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-xy61ydu1

0,1
Comm: tcp://127.0.0.1:5389,Total threads: 4
Dashboard: http://127.0.0.1:5391/status,Memory: 3.89 GiB
Nanny: tcp://127.0.0.1:5344,
Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-hpajfzqo,Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-hpajfzqo

0,1
Comm: tcp://127.0.0.1:5382,Total threads: 4
Dashboard: http://127.0.0.1:5387/status,Memory: 3.89 GiB
Nanny: tcp://127.0.0.1:5345,
Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-x7zki8y9,Local directory: C:\Users\FELAB\AppData\Local\Temp\dask-scratch-space\worker-x7zki8y9


In [8]:
import dask.dataframe as dd
import pandas as pd
file_path = "transcript.parquet"
df = dd.read_parquet(file_path, engine='fastparquet')
df = df.persist()

In [9]:
df

Unnamed: 0_level_0,companyId,companyName,tickerSymbol,exchangeSymbol,ISOCode,pricingDate,exDate,payDate,recordDate,announcedDate,priceClose,volume,adjustmentFactor,VWAP,divAdjFactor,marketCap,TEV,sharesOutstanding,divFreqTypeName,supplementalTypeName,dividendTypeName,divAmount,netAmount,grossAmount
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1
,float64,string,string,string,string,datetime64[ns],datetime64[ns],datetime64[ns],datetime64[ns],datetime64[ns],float64,float64,float64,float64,float64,float64,float64,float64,string,string,string,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [None]:
import os
import pandas as pd
import dask.dataframe as dd
import time
import gc
from tqdm import tqdm

def save_group_to_file(group, name, save_folder, groupby_column):
    """
    주어진 그룹 데이터를 파일로 저장하는 함수
    """
    save_path = os.path.join(save_folder, f"{groupby_column}_{name}.parquet")
    group.to_parquet(save_path, engine='fastparquet')
    del group
    gc.collect()

# 파일 경로 및 이름 설정
file_path = "transcript.parquet"
base_name = os.path.basename(file_path)
folder_name, _ = os.path.splitext(base_name)

# CSV 파일로부터 그룹화할 열 이름 추출
ticker_csv = 'transcript/unique_transcript_tickerSymbol.csv'
groupby_column = ticker_csv.split('_')[-1].replace('.csv', '')
group_names = pd.read_csv(ticker_csv).values.flatten()

# 저장할 폴더 준비
save_folder = os.path.join(folder_name, "grouped_by_" + groupby_column)
os.makedirs(save_folder, exist_ok=True)  # 폴더가 없으면 생성

# Dask로 데이터 불러오기
df_meta = dd.read_parquet(file_path, engine='fastparquet')

# Repartition if necessary
df_meta = df_meta.repartition(npartitions=8)

# Persist the data into memory to avoid recomputation
df_meta = df_meta.persist()

# 시작 시간 기록
overall_start_time = time.time()

# Dask의 groupby를 사용하여 그룹화 및 파일 저장

grouped = df_meta.groupby(groupby_column)

empty_meta = pd.DataFrame({'index': pd.Index([], dtype='object')}).set_index('index')
result = (grouped.apply(lambda group: save_group_to_file(group, group.name, save_folder, groupby_column),
                        meta=empty_meta)
          .compute())  # or use your Dask distributed client here

# 전체 작업 소요 시간 출력
print(f"Grouped data saved in folder: {save_folder}")
overall_elapsed_time = time.time() - overall_start_time
print(f"Total time taken for all tasks: {overall_elapsed_time:.2f} seconds")