### 富邦原物料

In [5]:
import os
import pandas as pd


def process_daily_data(folder_path):
    # 新建名為fubon_day的資料夾
    output_folder = 'fubon_day'
    os.makedirs(output_folder, exist_ok=True)

    # 遍歷資料夾
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)

        # 檢查是否為csv檔案以及是否以"日"結尾
        if file_name.endswith('.csv') and '日' in file_name:
            # print(f'處理檔案: {file_name}')

            # 讀取csv檔案
            df = pd.read_csv(file_path)

            # 將日期欄位轉換為日期類型
            df['日期'] = pd.to_datetime(df['日期'])

            # 檢查日期是否有缺失
            date_range = pd.date_range(start=df['日期'].min(), end=df['日期'].max(), freq='D')
            missing_dates = date_range[~date_range.isin(df['日期'])]

            # 如果有缺失日期，則進行補齊
            if not missing_dates.empty:
                for missing_date in missing_dates:
                    # 找到缺失日期前一天的資料
                    previous_day_data = df[df['日期'] == missing_date - pd.Timedelta(days=1)]

                    # 創建新資料
                    new_data = pd.DataFrame({'日期': [missing_date], 'CV': previous_day_data['CV'].values})

                    # 將新資料加入到原始資料中
                    df = pd.concat([df, new_data], ignore_index=True)

            # 將資料按照日期排序
            df.sort_values(by='日期', inplace=True)

            # 將更新後的資料存儲到新的csv檔案中
            updated_file_path = os.path.join(output_folder, file_name)
            df.to_csv(updated_file_path, index=False)

            # print(f'已完成資料更新，更新後的檔案存儲在: {updated_file_path}\n')


def process_weekly_data(folder_path):
    # 新建名為fubon_week的資料夾
    output_folder = 'fubon_week'
    os.makedirs(output_folder, exist_ok=True)

    # 遍歷資料夾
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)

        # 檢查是否為csv檔案以及是否以“週”結尾
        if file_name.endswith('.csv') and '週' in file_name:
            # print(f'處理檔案: {file_name}')

            # 讀取csv檔案
            df = pd.read_csv(file_path)

            # 將日期欄位轉換為日期類型
            df['日期'] = pd.to_datetime(df['日期'])

            # 檢查第一筆資料的日期
            first_date = df['日期'].max()

            # 從最近的日期開始回推七天並檢查是否有缺失日期
            while first_date >= df['日期'].max():
                previous_date = first_date - pd.DateOffset(days=7)

                # 檢查是否有缺失日期
                if previous_date not in df['日期'].values:
                    # 找到前一週的資料
                    previous_week_data = df[df['日期'] >= first_date - pd.DateOffset(days=7)]

                    # 如果有多筆資料，取平均值
                    previous_week_cv = previous_week_data['CV'].mean()

                    # 創建新資料
                    new_data = pd.DataFrame({'日期': [previous_date], 'CV': [previous_week_cv]})

                    # 將新資料加入到原始資料中
                    df = pd.concat([df, new_data], ignore_index=True)

                first_date = previous_date

            # 將資料按照日期排序
            df.sort_values(by='日期', inplace=True)

            # 將更新後的資料存儲到新的csv檔案中
            updated_file_path = os.path.join(output_folder, file_name)
            df.to_csv(updated_file_path, index=False)

            # print(f'已完成資料更新，更新後的檔案存儲在: {updated_file_path}\n')


def process_three_months_data(folder_path):
    # 新建名為fubon_threemonths的資料夾
    output_folder = 'fubon_threemonths'
    os.makedirs(output_folder, exist_ok=True)

    # 遍歷資料夾
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)

        # 檢查是否為csv檔案以及是否以“三個月”結尾
        if file_name.endswith('.csv') and '_三個月' in file_name:
            # print(f'處理檔案: {file_name}')

            # 讀取csv檔案
            df = pd.read_csv(file_path)

            # 將日期欄位轉換為日期類型
            df['日期'] = pd.to_datetime(df['日期'])

            # 檢查第一筆資料的日期
            first_date = df['日期'].max()

            # 從第一筆日期開始回推三個月並檢查是否有缺失日期
            while first_date >= df['日期'].max():
                previous_date = first_date - pd.DateOffset(months=3)

                # 檢查是否有缺失日期
                if previous_date not in df['日期'].values:
                    # 找到前三個月的資料
                    previous_month_data = df[df['日期'] >= first_date - pd.DateOffset(months=3)]

                    # 如果有多筆資料，取平均值
                    previous_month_cv = previous_month_data['CV'].mean()

                    # 創建新資料
                    new_data = pd.DataFrame({'日期': [previous_date], 'CV': [previous_month_cv]})

                    # 將新資料加入到原始資料中
                    df = pd.concat([df, new_data], ignore_index=True)

                first_date = previous_date

            # 將資料按照日期排序
            df.sort_values(by='日期', inplace=True)

            # 將更新後的資料存儲到新的csv檔案中
            updated_file_path = os.path.join(output_folder, file_name)
            df.to_csv(updated_file_path, index=False)

            # print(f'已完成資料更新，更新後的檔案存儲在: {updated_file_path}\n')


# 定義資料夾路徑
folder_path = os.path.abspath('../Data_fubon_material_unclean')

# 創建名為Fubon_Clean的資料夾
output_folder = 'Fubon_Clean'
os.makedirs(output_folder, exist_ok=True)

# 處理日資料
process_daily_data(folder_path)

# 處理週資料
process_weekly_data(folder_path)

# 處理三個月資料
process_three_months_data(folder_path)

# 移動處理後的資料到Fubon_Clean資料夾
for sub_folder in ['fubon_day', 'fubon_week', 'fubon_threemonths']:
    os.replace(sub_folder, os.path.join(output_folder, sub_folder))

print("富邦原物料清理完成")


富邦原物料清理完成


### 西本資訊

In [4]:
import os
import pandas as pd
from datetime import timedelta

# 資料夾路徑
# data_folder = 'Data_XBI_chinadata_unclean'
data_folder = os.path.abspath('../Data_XBI_chinadata_unclean')  # 使用絕對路徑

# 日資料處理
def process_daily_data():
    # 創建一個資料夾來存放處理好的資料
    output_folder = 'XBI_day'
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)


    # 讀入所有以"日"結尾的檔案
    file_list = [file for file in os.listdir(data_folder) if file.endswith('_日.csv')]

    # 對每個檔案進行處理
    for file_name in file_list:
        # 讀入CSV檔案
        file_path = os.path.join(data_folder, file_name)
        df = pd.read_csv(file_path)
        
        # 将日期列转换为日期时间类型
        df['date'] = pd.to_datetime(df['date'])
        
        # 觀察缺乏的日期
        min_date = min(df['date'])
        max_date = max(df['date'])
        all_dates = pd.date_range(start=min_date, end=max_date)
        missing_dates = all_dates[~all_dates.isin(df['date'])]
        
        # 將缺乏的日期補上
        for missing_date in missing_dates:
            # 将缺失日期转换为日期时间类型
            missing_date = pd.to_datetime(missing_date)
            
            # 找到距离缺失日期最近的有数据的日期
            nearest_date = df.loc[df['date'].shift(0) < missing_date, 'date'].max()
            
            # 如果有数据的日期存在，则将其值复制到缺失日期的位置
            if pd.notnull(nearest_date):
                nearest_value = df.loc[df['date'] == nearest_date, 'value'].iloc[-1]
                df = df.append({'date': missing_date,
                                'value': nearest_value,
                                'change': 0,
                                'change percentage': '0%'}, ignore_index=True)
        
        # 重新排序資料
        df = df.sort_values(by='date', ascending=False)
        
        # 將處理好的資料存入XBI_day資料夾中
        output_file_path = os.path.join(output_folder, file_name)
        df.to_csv(output_file_path, index=False)

    # print("日資料處理完成")

# 月資料處理
def process_monthly_data():
    # 設定輸出資料夾
    output_folder = 'XBI_month'
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    # 取得所有以月結尾的CSV檔案
    file_list = [f for f in os.listdir(data_folder) if f.endswith('_月.csv')]
    
    # 對每個檔案進行處理
    for file in file_list:
        file_path = os.path.join(data_folder, file)
        
        # 讀取CSV檔案
        df = pd.read_csv(file_path)
        
        # 將'date'欄位轉換為日期格式
        df['date'] = pd.to_datetime(df['date'])
        
        # 補全日期
        date_range = pd.date_range(start=df['date'].min(), end=df['date'].max(), freq='M')
        df = df.set_index('date').reindex(date_range, method='ffill').rename_axis('date').reset_index()
        
        # 填充缺失值
        df['value'].fillna(method='ffill', inplace=True)
        df['change'].fillna(0, inplace=True)
        df['change percentage'].fillna('0%', inplace=True)
        
        # 儲存處理好的檔案至輸出資料夾
        output_file_path = os.path.join(output_folder, file)
        df.to_csv(output_file_path, index=False)
    
    # print("月資料處理完成。")

# 週資料處理
def process_weekly_data():
    # 設定輸出資料夾
    output_folder = 'XBI_week'
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    # 取得所有以週結尾的CSV檔案
    file_list = [f for f in os.listdir(data_folder) if f.endswith('_週.csv')]
    
    # 對每個檔案進行處理
    for file in file_list:
        file_path = os.path.join(data_folder, file)
        
        # 讀取CSV檔案
        df = pd.read_csv(file_path)
        
        # 將'date'欄位轉換為日期格式
        df['date'] = pd.to_datetime(df['date'])
        
        # 找出日期的最小值
        min_date = df['date'].min()
        
        # 補全缺失的日期
        while True:
            prev_date = min_date - timedelta(days=7)
            if prev_date < df['date'].min():
                break
            if prev_date not in df['date'].values:
                prev_row = df[df['date'] == min_date].copy()
                prev_row['date'] = prev_date
                df = pd.concat([df, prev_row], ignore_index=True)
            min_date = prev_date
        
        # 將資料依日期排序
        df = df.sort_values(by='date').reset_index(drop=True)
        
        # 填充缺失值
        df['value'].fillna(method='ffill', inplace=True)
        df['change'].fillna(0, inplace=True)
        df['change percentage'].fillna('0%', inplace=True)
        
        # 儲存處理好的檔案至輸出資料夾
        output_file_path = os.path.join(output_folder, file)
        df.to_csv(output_file_path, index=False)
    
    # print("週資料處理完成。")

# 主程式入口
if __name__ == "__main__":
    # 設定XBI_clean資料夾
    clean_folder = 'XBI_clean'
    if not os.path.exists(clean_folder):
        os.makedirs(clean_folder)
    
    # 執行資料處理
    process_daily_data()
    process_monthly_data()
    process_weekly_data()
    
    # 移動小資料夾至XBI_clean資料夾
    os.rename('XBI_day', os.path.join(clean_folder, 'XBI_day'))
    os.rename('XBI_month', os.path.join(clean_folder, 'XBI_month'))
    os.rename('XBI_week', os.path.join(clean_folder, 'XBI_week'))

print("西本資訊資料處理完成")

  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,


日資料處理完成
西本資訊資料處理完成


  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,
  df = df.append({'date': missing_date,


### 中國國家數據局


In [6]:
import os
import pandas as pd

# 填充一月份的YOY%缺失值
def fill_missing_YOY_for_Jan(data):
    for year in data['Date'].str.split('/').str[0].unique():
        jan_year_indices = data[(data['Date'].str.startswith(year)) & (data['Date'].str.endswith('/01'))]
        for index in jan_year_indices.index:
            if pd.isna(data.loc[index, 'YOY(%)']):
                current_CV = jan_year_indices.loc[index, 'CV(10K tons)']
                prev_year_index = data[(data['Date'] == str(int(year)-1) + '/01')].index
                if not prev_year_index.empty:
                    prev_CV = data.loc[prev_year_index, 'CV(10K tons)'].values[0]
                    YOY = ((current_CV - prev_CV) / prev_CV) * 100
                    data.loc[index, 'YOY(%)'] = round(YOY, 1)
                else:
                     data.loc[index, 'YOY(%)'] = pd.NA

# 填充一月份的ACCG%缺失值
def fill_missing_ACCG_for_Jan(data):
    for year in data['Date'].str.split('/').str[0].unique():
        jan_year_indices = data[(data['Date'].str.startswith(year)) & (data['Date'].str.endswith('/01'))]
        for index in jan_year_indices.index:
            if pd.isna(data.loc[index, 'ACCG(%)']):
                current_ACCV = jan_year_indices.loc[index, 'ACCV(10K tons)']
                prev_year_index = data[(data['Date'] == str(int(year)-1) + '/01')].index
                if not prev_year_index.empty:
                    prev_ACCV = data.loc[prev_year_index, 'ACCV(10K tons)'].values[0]
                    ACCG = ((current_ACCV - prev_ACCV) / prev_ACCV) * 100
                    data.loc[index, 'ACCG(%)'] = round(ACCG, 1)
                else:
                    data.loc[index, 'ACCG(%)'] = pd.NA

# 填充二月份的YOY%缺失值
def fill_missing_YOY_for_Feb(data):
    for year in data['Date'].str.split('/').str[0].unique():
        feb_year_indices = data[(data['Date'].str.startswith(year)) & (data['Date'].str.endswith('/02'))]
        for index in feb_year_indices.index:
            if pd.isna(data.loc[index, 'YOY(%)']):
                current_CV = feb_year_indices.loc[index, 'CV(10K tons)']
                prev_year_index = data[(data['Date'] == str(int(year)-1) + '/02')].index
                if not prev_year_index.empty:
                    prev_CV = data.loc[prev_year_index, 'CV(10K tons)'].values[0]
                    YOY = ((current_CV - prev_CV) / prev_CV) * 100
                    data.loc[index, 'YOY(%)'] = round(YOY, 1)
                else:
                    data.loc[index, 'YOY(%)'] = pd.NA


# 创建 CS_month 和 CS_clean 文件夹
output_folder = 'CS_clean/CS_month'
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# 获取 Data_CS_unclean 文件夹中的所有文件路径
data_folder = os.path.abspath('../Data_CS_unclean')
file_paths = [os.path.join(data_folder, f) for f in os.listdir(data_folder) if os.path.isfile(os.path.join(data_folder, f))]

# 遍历处理每个文件
for file_path in file_paths:
    # 读取数据
    data = pd.read_csv(file_path, skipinitialspace=True, skip_blank_lines=True)

    # 筛选出一月份和二月份的数据
    jan_indices = data[data['Date'].str.endswith('/01')]
    feb_indices = data[data['Date'].str.endswith('/02')]

    # 对每年一月份进行处理
    for year in jan_indices['Date'].str.split('/').str[0].unique():
        jan_year_indices = jan_indices[jan_indices['Date'].str.startswith(year)]
        for index in jan_year_indices.index:
            if pd.isna(data.loc[index, 'CV(10K tons)']):
                feb_year_indices = feb_indices[feb_indices['Date'].str.startswith(year)]
                for feb_index in feb_year_indices.index:
                    if feb_year_indices.loc[feb_index, 'Date'] == year + '/02':
                        data.loc[index, 'CV(10K tons)'] = feb_year_indices.loc[feb_index, 'ACCV(10K tons)'] / 2
                        data.loc[index, 'ACCV(10K tons)'] = feb_year_indices.loc[feb_index, 'ACCV(10K tons)'] / 2
                        break

    # 对每年二月份进行处理
    for year in feb_indices['Date'].str.split('/').str[0].unique():
        feb_year_indices = feb_indices[feb_indices['Date'].str.startswith(year)]
        for index in feb_year_indices.index:
            if pd.isna(data.loc[index, 'CV(10K tons)']):
                data.loc[index, 'CV(10K tons)'] = feb_year_indices.loc[index, 'ACCV(10K tons)'] / 2

    # 填充缺失值
    fill_missing_YOY_for_Jan(data)
    fill_missing_ACCG_for_Jan(data)
    fill_missing_YOY_for_Feb(data)

    # 删除包含空值的行
    data.dropna(axis=0, thresh=3, inplace=True)

    # 保存处理后的数据到 CS_month 文件夹中
    file_name = os.path.basename(file_path)
    output_file_path = os.path.join(output_folder, file_name)
    data.to_csv(output_file_path, index=False)


print("中國數據網原始資料清理完成")


中國數據網原始資料清理完成
