In [1]:
import pandas as pd
import sqlite3
from datetime import datetime, timedelta
import numpy as np


In [2]:
# Cell 2: Định nghĩa hàm reduce_mem_usage
def reduce_mem_usage(df, verbose=True):
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2    
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type in numerics: 
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)    
    end_mem = df.memory_usage().sum() / 1024**2
    if verbose: print('Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction)'.format(end_mem, 100 * (start_mem - end_mem) / start_mem))
    return df

In [3]:
# Cell 3: Đọc dữ liệu
def read_data():
    INPUT_DIR_PATH = 'C:/Users/Ho Hau/Downloads/M5/data/raw/'
    sell_prices_df = pd.read_csv(INPUT_DIR_PATH + 'sell_prices.csv')
    sell_prices_df = reduce_mem_usage(sell_prices_df)
    print('Sell prices has {} rows and {} columns'.format(sell_prices_df.shape[0], sell_prices_df.shape[1]))
    
    calendar_df = pd.read_csv(INPUT_DIR_PATH + 'calendar.csv')
    calendar_df = reduce_mem_usage(calendar_df)
    print('Calendar has {} rows and {} columns'.format(calendar_df.shape[0], calendar_df.shape[1]))
    
    sales_train_validation_df = pd.read_csv(INPUT_DIR_PATH + 'sales_train_validation.csv')
    sales_train_validation_df = reduce_mem_usage(sales_train_validation_df)
    print('Sales train validation has {} rows and {} columns'.format(sales_train_validation_df.shape[0], sales_train_validation_df.shape[1]))
    
    return sell_prices_df, calendar_df, sales_train_validation_df

sell_prices_df, calendar_df, sales_train_validation_df = read_data()

Mem. usage decreased to 130.48 Mb (37.5% reduction)
Sell prices has 6841121 rows and 4 columns
Mem. usage decreased to  0.12 Mb (41.9% reduction)
Calendar has 1969 rows and 14 columns
Mem. usage decreased to 95.00 Mb (78.7% reduction)
Sales train validation has 30490 rows and 1919 columns


In [5]:
# Kết nối đến SQLite
conn = sqlite3.connect('database/m5_forecasting.db')
cursor = conn.cursor()

In [6]:
calendar_df.to_sql('calendar', conn, if_exists='replace', index=False)


1969

In [8]:
# Đọc và lưu sells_price 
print("Saving sell_prices to SQLite...")
chunk_size = 500000  # Số hàng mỗi chunk
num_chunks = len(sell_prices_df) // chunk_size + (1 if len(sell_prices_df) % chunk_size else 0)

for i in range(num_chunks):
    print(f"Processing sell_prices chunk {i+1}/{num_chunks}")
    start_idx = i * chunk_size
    end_idx = min((i + 1) * chunk_size, len(sell_prices_df))
    chunk = sell_prices_df.iloc[start_idx:end_idx]
    
    if i == 0:
        chunk.to_sql('sell_prices', conn, if_exists='replace', index=False)
    else:
        chunk.to_sql('sell_prices', conn, if_exists='append', index=False)

Saving sell_prices to SQLite...
Processing sell_prices chunk 1/14
Processing sell_prices chunk 2/14
Processing sell_prices chunk 3/14
Processing sell_prices chunk 4/14
Processing sell_prices chunk 5/14
Processing sell_prices chunk 6/14
Processing sell_prices chunk 7/14
Processing sell_prices chunk 8/14
Processing sell_prices chunk 9/14
Processing sell_prices chunk 10/14
Processing sell_prices chunk 11/14
Processing sell_prices chunk 12/14
Processing sell_prices chunk 13/14
Processing sell_prices chunk 14/14


In [9]:
# Đọc và lưu sales_train (sales_train_validation_df đã có sẵn)
print("Saving sales_train to SQLite...")
chunk_size = 5000  # Số hàng mỗi chunk
num_chunks = len(sales_train_validation_df) // chunk_size + (1 if len(sales_train_validation_df) % chunk_size else 0)

for i in range(num_chunks):
    print(f"Processing sales_train chunk {i+1}/{num_chunks}")
    start_idx = i * chunk_size
    end_idx = min((i + 1) * chunk_size, len(sales_train_validation_df))
    chunk = sales_train_validation_df.iloc[start_idx:end_idx]
    
    if i == 0:
        chunk.to_sql('sales_train', conn, if_exists='replace', index=False)
    else:
        chunk.to_sql('sales_train', conn, if_exists='append', index=False)

Saving sales_train to SQLite...
Processing sales_train chunk 1/7
Processing sales_train chunk 2/7
Processing sales_train chunk 3/7
Processing sales_train chunk 4/7
Processing sales_train chunk 5/7
Processing sales_train chunk 6/7
Processing sales_train chunk 7/7


In [10]:
# Tạo bảng days chứa các số từ 1 đến 1913
print("Creating days table...")
cursor.execute('DROP TABLE IF EXISTS days')
cursor.execute('CREATE TABLE days (day_num INTEGER)')
days = [(i,) for i in range(1, 1914)]
cursor.executemany('INSERT INTO days (day_num) VALUES (?)', days)
conn.commit()

Creating days table...


In [11]:
# Tạo bảng sales_melted bằng SQL
print("Creating sales_melted table...")
cursor.execute('DROP TABLE IF EXISTS sales_melted')
# Tạo câu lệnh CASE cho tất cả 1913 ngày
case_statements = '\n'.join([f"WHEN day_num = {i} THEN d_{i}" for i in range(1, 1914)])
query = f'''
CREATE TABLE sales_melted AS
SELECT 
    s.item_id, s.dept_id, s.cat_id, s.store_id, s.state_id,
    'd_' || d.day_num AS d,
    CASE 
        {case_statements}
    END AS sales
FROM sales_train s
CROSS JOIN days d
WHERE sales IS NOT NULL
'''
cursor.execute(query)
conn.commit()

Creating sales_melted table...


In [12]:
# Tạo bảng sales_data bằng SQL
print("Creating sales_data table...")
cursor.execute('DROP TABLE IF EXISTS sales_data')
cursor.execute('''
CREATE TABLE sales_data AS
SELECT 
    s.item_id, s.dept_id, s.cat_id, s.store_id, s.state_id,
    s.d, s.sales,
    c.date, c.wm_yr_wk, c.weekday, c.wday, c.month, c.year,
    c.snap_CA, c.snap_TX, c.snap_WI,
    p.sell_price,
    CASE WHEN c.event_name_1 IS NULL THEN 'None' ELSE c.event_name_1 END AS event_name_1,
    CASE WHEN c.event_name_2 IS NULL THEN 'None' ELSE c.event_name_2 END AS event_name_2
FROM sales_melted s
LEFT JOIN calendar c ON s.d = c.d
LEFT JOIN sell_prices p ON s.store_id = p.store_id AND s.item_id = p.item_id AND c.wm_yr_wk = p.wm_yr_wk
WHERE s.sales IS NOT NULL
''')
conn.commit()

Creating sales_data table...


In [13]:
# Đọc sales_data để tạo cột evt_* và điền giá trị thiếu
print("Processing evt_* columns and filling missing values...")
# Đọc sales_data theo từng chunk để tránh MemoryError
chunk_size = 1000000  # Số hàng mỗi chunk
first_chunk = True
for chunk in pd.read_sql_query('SELECT * FROM sales_data', conn, chunksize=chunk_size):
    print(f"Processing sales_data chunk...")

    # Kết hợp sự kiện từ event_name_1 và event_name_2
    chunk['events'] = chunk[['event_name_1', 'event_name_2']].apply(
        lambda x: set([e for e in x if pd.notnull(e)]), axis=1
    )
    chunk['events'] = chunk['events'].apply(lambda x: list(x) if x else ['None'])

    # Mã hóa one-hot cho các sự kiện
    event_dummies = pd.get_dummies(chunk['events'].explode(), prefix='evt').groupby(level=0).sum()
    chunk = pd.concat([chunk, event_dummies], axis=1)
    chunk = chunk.drop(columns=['event_name_1', 'event_name_2', 'events'])

    # Điền giá trị thiếu cho sell_price
    chunk['sell_price'] = chunk['sell_price'].fillna(
        chunk.groupby(['item_id', 'store_id'])['sell_price'].transform('mean')
    )

    # Lưu chunk vào SQLite
    if first_chunk:
        chunk.to_sql('sales_data_final', conn, if_exists='replace', index=False)
        first_chunk = False
    else:
        chunk.to_sql('sales_data_final', conn, if_exists='append', index=False)

Processing evt_* columns and filling missing values...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sales_data chunk...
Processing sale

In [14]:
# Xóa bảng sales_data tạm
cursor.execute('DROP TABLE IF EXISTS sales_data')
cursor.execute('ALTER TABLE sales_data_final RENAME TO sales_data')
conn.commit()

conn.close()
print("Đã lưu dữ liệu M5 vào database/m5_forecasting.db")

Đã lưu dữ liệu M5 vào database/m5_forecasting.db
