* https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store

In [None]:
import pandas as pd
import time
import re
from tqdm import tqdm

In [None]:
# %time df = pd.read_csv("data/2019-Oct.csv")

In [None]:
from glob import glob

glob("data/*.csv")

In [None]:
# chunk_size = 1e7
# chunk_iter = pd.read_csv("data/2019-Nov.csv", chunksize=chunk_size)

In [None]:
# chunk_iter.read()

## downcast_csv_to_parquet

In [None]:
def downcast(df_chunk):
    for col in df_chunk.columns:
        dtypes_name = df_chunk[col].dtypes.name
        if dtypes_name.startswith("float"):
            df_chunk[col] = pd.to_numeric(df_chunk[col], downcast="float")
        elif dtypes_name.startswith("int"):
            # 최솟값을 구해서 음수가 있을 때는 integer
            # 음수가 없을 때는 unsigned
            if df_chunk[col].min() < 0 :
                df_chunk[col] = pd.to_numeric(df_chunk[col], downcast="integer")
            else:
                df_chunk[col] = pd.to_numeric(df_chunk[col], downcast="unsigned")
        # 문자일 때는 category 로 변경해 줍니다.
        # 카디널리티가 높거나 텍스트 데이터에는 적합하지 않을 수 있습니다.
        elif dtypes_name.startswith("object"):
                df_chunk[col] = df_chunk[col].astype("category")
    return df_chunk

In [None]:
def downcast_csv_to_parquet(zip_file_name):
    start = time.time()
    result = re.findall(r'data/(.*?)\.csv', zip_file_name, re.IGNORECASE)
    
    save_file_name = result[0]
    print(save_file_name)
    
    chunk_size = 1e6
    chunk_iter = pd.read_csv(zip_file_name, chunksize=chunk_size)
    row_count = 0
    chunk_list = [] 
    
    for chunk in chunk_iter:
        row_count = row_count + chunk.shape[0]
        df_chunk = downcast(chunk)
        # downcast() 후에 list에 모아두었다가 concat()을 할 수도 있지만 
        # 메모리가 부족하다면 저장후 불러오는 전략을 사용
        # chunk_list.append(df_chunk)
        df_chunk.to_parquet(
            f"data_parquet/{save_file_name}-{df_chunk.index[0]}-{df_chunk.index[-1]}.parquet", index=False)
    print(row_count)
    end = time.time()
    return f"{row_count}행, {end-start:.0f}초" 

In [None]:
from glob import glob

zip_file_names = glob("data/*")
zip_file_names = sorted(zip_file_names)[::-1]
zip_file_names

In [None]:
for zip_file_name in zip_file_names:
    result = downcast_csv_to_parquet(zip_file_name)
    print(result)

In [None]:
import pyarrow.parquet as pq

# Parquet 파일 경로
parquet_file = glob("data_parquet/*")[0]

# Parquet 파일의 메타데이터 읽기
metadata = pq.read_metadata(parquet_file)

# 스키마 정보 출력
print("Schema:")
print(metadata.schema)

# 기타 메타데이터 확인
print("Other Metadata:")
print(metadata.metadata[b'pandas'])

### parquet 파일 로드하고 하나의 파일로 저장하기

In [None]:
parquet_list = glob("data_parquet/*.parquet")
parquet_list[:3]

In [None]:
%time df_parquet_list = [pd.read_parquet(gzip_file_name) for gzip_file_name in parquet_list]
len(df_parquet_list)

In [None]:
%time df = pd.concat(df_parquet_list, ignore_index=True)
df.shape

In [None]:
df.info()

In [None]:
file_name = "2019-Oct-Nov-parquet.parquet"
file_name

In [None]:
df.to_parquet(file_name, index=False)

In [None]:
df_pq = pd.read_parquet(file_name)

In [None]:
df_pq.info()

In [None]:
%whos

In [None]:
chunk_size = 1e7

In [None]:
start = time()

def get_column_type_and_length(df):
    column_type_and_length = {}
    for column in df.columns:
        if pd.api.types.is_integer_dtype(df[column]):
            column_type_and_length[column] = "INT"
        elif pd.api.types.is_float_dtype(df[column]):
            column_type_and_length[column] = "FLOAT"
        elif pd.api.types.is_bool_dtype(df[column]):
            column_type_and_length[column] = "BOOLEAN"
        elif pd.api.types.is_datetime64_any_dtype(df[column]):
            column_type_and_length[column] = "DATETIME"
        else:
            max_length = df[column].str.len().max()
            if max_length <= 255:
                column_type_and_length[column] = f"VARCHAR({max_length})"
            else:
                column_type_and_length[column] = "TEXT"
    return column_type_and_length

def create_table_from_parquet(file_path, table_name):
    # SQLite 데이터베이스 연결
    conn = sqlite3.connect('dd_parquet.db')
    cursor = conn.cursor()

    # CSV 파일을 스트리밍하여 테이블 생성과 데이터 저장
    df = pd.read_parquet(file_path)
    # 테이블 이름 지정
    table_name = "my_table"

    # 판다스 데이터프레임에서 스키마 정보 추출
    schema_info = get_column_type_and_length(df)


    # 테이블이 존재하지 않는 경우, 첫 번째 chunk를 이용해 테이블 생성
    if df.index.start == 0:
        # CREATE TABLE 쿼리 생성
        create_table_query = f"CREATE TABLE {table_name} ({', '.join([f'{column} {type}' for column, type in schema_info.items()])});"
        cursor.execute(create_table_query)
        conn.commit()

    # chunk 데이터를 테이블에 추가
    df.to_sql(name=table_name, con=conn, if_exists='append', index=False)

    # 데이터베이스 연결 종료
    conn.close()

# 대용량 CSV 파일의 경로와 테이블 이름을 지정하여 함수 호출
file_path = '2019-Nov-parquet.gzip'
table_name = 'my_table_parquet'
create_table_from_parquet(file_path, table_name)


end = time()
print(f"{end-start:.0f}초")

In [None]:
# sqlite3 데이터베이스 연결
conn = sqlite3.connect('dd_parquet.db')

# 커서 생성
cursor = conn.cursor()

# 테이블 이름
table_name = 'my_table'

# PRAGMA를 사용하여 테이블의 스키마 정보 조회
cursor.execute(f"PRAGMA table_info({table_name})")
schema_info = cursor.fetchall()

# 스키마 정보 출력
print("Column Name | Data Type")
print("------------------------")
for column_info in schema_info:
    column_name, data_type = column_info[1], column_info[2]
    print(f"{column_name} | {data_type}")

# 연결 종료
conn.close()

In [None]:
import sqlite3

def view_data_from_table(table_name):
    # SQLite 데이터베이스 연결
    conn = sqlite3.connect('my_database.db')
    cursor = conn.cursor()

    # 테이블의 데이터 조회
    query = f"SELECT * FROM {table_name} LIMIT 10;"
    cursor.execute(query)

    # 조회된 데이터 출력
    rows = cursor.fetchall()
    for row in rows:
        print(row)

    # 데이터베이스 연결 종료
    conn.close()

# 저장한 테이블 이름을 지정하여 함수 호출
table_name = 'my_table'
view_data_from_table(table_name)


## dask

In [None]:
import dask.dataframe as dd

chunk_size = 1e6
# CSV 파일 불러오기
dd_chunk = dd.read_csv('data/2019-Oct.csv', assume_missing=True, blocksize=chunk_size)

In [None]:
dd_chunk.shape

In [None]:
dd_csv = dd_chunk.compute()