In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/tariff-data/tariff_data.txt
/kaggle/input/world-port-index/UpdatedPub150.csv
/kaggle/input/world-port-index/WPI_Explanation_of_Data_Fields.pdf


In [3]:

def _load_data(self):
    valid_rows = []
    with open(self.file_path, 'r', encoding='utf-8') as f:
        current_hs = None
        for line in f:
            # 使用严格分割确保列数
            cols = line.strip().split('\t')
            cols = [c.strip() for c in cols if c.strip()]
            
            # 强制填充到6列
            while len(cols) < 6:
                cols.append('')  # 补全空字符串
            
            # 主分类行处理
            if len(cols) >=3 and self._is_valid_hs(cols[0]):
                current_hs = cols[0]
                valid_rows.append([
                    current_hs,  # HS Code
                    cols[1] if len(cols)>1 else '',  # Sub Code
                    cols[2] if len(cols)>2 else '',  # Description
                    cols[3] if len(cols)>3 else '',  # MFN Raw
                    cols[4] if len(cols)>4 else '',  # Special Raw
                    cols[5] if len(cols)>5 else '',  # ADCVD Raw
                ])
            
            # 子分类行处理
            elif current_hs and len(cols)>=1:
                valid_rows.append([
                    current_hs,  # HS Code
                    cols[0] if len(cols)>0 else '',  # Sub Code
                    cols[1] if len(cols)>1 else '',  # Description
                    cols[2] if len(cols)>2 else '',  # MFN Raw
                    cols[3] if len(cols)>3 else '',  # Special Raw
                    cols[4] if len(cols)>4 else '',  # ADCVD Raw
                ])
    
    # 验证数据维度
    print(f"样本数据维度检查: {len(valid_rows[0])} 列 (应为6)")
    self.df = pd.DataFrame(
        valid_rows,
        columns=['HS Code', 'Sub Code', 'Description', 'MFN Raw', 'Special Raw', 'ADCVD Raw']
    )
    return self

In [4]:
import pandas as pd
import re
from pathlib import Path

class TariffParser:
    def __init__(self, file_path):
        self.file_path = Path(file_path)
        self.df = None
        self._load_data()
        
    def _load_data(self):
        """动态适应列数 (严格版本)"""
        valid_rows = []
        with open(self.file_path, 'r', encoding='utf-8') as f:
            current_hs = None
            for line in f:
                # 严格分割列并补全
                cols = line.strip().split('\t')
                cols = [c.strip() for c in cols]
                
                # 强制填充到6列
                while len(cols) < 6:
                    cols.append('')
                
                # 主分类行
                if self._is_valid_hs(cols[0]):
                    current_hs = cols[0]
                    valid_rows.append([
                        current_hs,          # HS Code (0)
                        cols[1] if len(cols)>1 else '',  # Sub Code (1)
                        cols[2] if len(cols)>2 else '',  # Description (2)
                        cols[3] if len(cols)>3 else '',  # MFN Raw (3)
                        cols[4] if len(cols)>4 else '',  # Special Raw (4)
                        cols[5] if len(cols)>5 else '',  # ADCVD Raw (5)
                    ])
                
                # 子分类行
                elif current_hs:
                    valid_rows.append([
                        current_hs,          # HS Code (0)
                        cols[0] if len(cols)>0 else '',  # Sub Code (1)
                        cols[1] if len(cols)>1 else '',  # Description (2)
                        cols[2] if len(cols)>2 else '',  # MFN Raw (3)
                        cols[3] if len(cols)>3 else '',  # Special Raw (4)
                        cols[4] if len(cols)>4 else '',  # ADCVD Raw (5)
                    ])
        
        # 验证数据完整性
        if valid_rows:
            print(f"首行数据样例: {valid_rows[0]}")
            print(f"末行数据样例: {valid_rows[-1]}")
            assert all(len(row)==6 for row in valid_rows), "存在列数不一致的行"
        
        self.df = pd.DataFrame(
            valid_rows,
            columns=['HS Code', 'Sub Code', 'Description', 'MFN Raw', 'Special Raw', 'ADCVD Raw']
        )
        print("数据加载成功，列结构:", self.df.columns.tolist())
        return self

    def _is_valid_hs(self, code):
        return bool(re.match(r'^\d{4,10}(\.\d{2,4})*$', str(code)))

    # ...（其余方法保持不变）...

if __name__ == "__main__":
    try:
        parser = TariffParser('/kaggle/input/tariff-data/tariff_data.txt')
        parser.process().export('structured_tariffs.csv')
    except Exception as e:
        print(f"执行错误: {str(e)}")

首行数据样例: ['8701', '', 'Tractors (other than tractors of heading 8709):', '', '', '']
末行数据样例: ['8703.21.01', '', 'Of a cylinder capacity not exceeding 1,000 cc', '', '2.5% 1/', 'Free (A+, AU, B, BH, CL, CO, D, E, IL, JO, KR, MA, OM, P, PA, PE, S, SG)']
数据加载成功，列结构: ['HS Code', 'Sub Code', 'Description', 'MFN Raw', 'Special Raw', 'ADCVD Raw']
执行错误: 'TariffParser' object has no attribute 'process'


In [2]:
import pandas as pd
import re
from pathlib import Path
class TariffParser:
    def __init__(self, file_path):
        self.file_path = Path(file_path)
        self.df = None
        self._load_data()
        
    def _load_data(self):
        """动态适应列数 (严格版本)"""
        valid_rows = []
        with open(self.file_path, 'r', encoding='utf-8') as f:
            current_hs = None
            for line in f:
                cols = line.strip().split('\t')
                cols = [c.strip() for c in cols]
                
                # 强制填充到6列
                while len(cols) < 6:
                    cols.append('')
                
                # 主分类行
                if self._is_valid_hs(cols[0]):
                    current_hs = cols[0]
                    valid_rows.append([
                        current_hs,          # HS Code (0)
                        cols[1] if len(cols)>1 else '',  # Sub Code (1)
                        cols[2] if len(cols)>2 else '',  # Description (2)
                        cols[3] if len(cols)>3 else '',  # MFN Raw (3)
                        cols[4] if len(cols)>4 else '',  # Special Raw (4)
                        cols[5] if len(cols)>5 else '',  # ADCVD Raw (5)
                    ])
                
                # 子分类行
                elif current_hs:
                    valid_rows.append([
                        current_hs,          # HS Code (0)
                        cols[0] if len(cols)>0 else '',  # Sub Code (1)
                        cols[1] if len(cols)>1 else '',  # Description (2)
                        cols[2] if len(cols)>2 else '',  # MFN Raw (3)
                        cols[3] if len(cols)>3 else '',  # Special Raw (4)
                        cols[4] if len(cols)>4 else '',  # ADCVD Raw (5)
                    ])
        
        if valid_rows:
            print(f"首行数据样例: {valid_rows[0]}")
            print(f"末行数据样例: {valid_rows[-1]}")
            assert all(len(row) == 6 for row in valid_rows), "存在列数不一致的行"
        
        self.df = pd.DataFrame(
            valid_rows,
            columns=['HS Code', 'Sub Code', 'Description', 'MFN Raw', 'Special Raw', 'ADCVD Raw']
        )
        print("数据加载成功，列结构:", self.df.columns.tolist())
        return self

    def _is_valid_hs(self, code):
        return bool(re.match(r'^\d{4,10}(\.\d{2,4})*$', str(code)))
    
    def process(self):
        """
        如果需要对数据进行处理，可以在此方法中添加处理逻辑。
        目前暂时不做额外处理，直接返回 self 以支持链式调用。
        """
        # 示例：你可以在这里对 self.df 进行清洗或转换
        return self

    def export(self, output_file):
        """导出处理后的数据到 CSV 文件"""
        self.df.to_csv(output_file, index=False)
        print(f"数据已导出到 {output_file}")

if __name__ == "__main__":
    try:
        parser = TariffParser('/kaggle/input/tariff-data/tariff_data.txt')
        parser.process().export('structured_tariffs.csv')
    except Exception as e:
        print(f"执行错误: {str(e)}")


首行数据样例: ['8701', '', 'Tractors (other than tractors of heading 8709):', '', '', '']
末行数据样例: ['8703.21.01', '', 'Of a cylinder capacity not exceeding 1,000 cc', '', '2.5% 1/', 'Free (A+, AU, B, BH, CL, CO, D, E, IL, JO, KR, MA, OM, P, PA, PE, S, SG)']
数据加载成功，列结构: ['HS Code', 'Sub Code', 'Description', 'MFN Raw', 'Special Raw', 'ADCVD Raw']
数据已导出到 structured_tariffs.csv


In [8]:
import os
# 保存为CSV（添加编码和引号处理）
df = parser.df 
df.to_csv(
    'structured_tariffs.csv',
    index=False,
    na_rep='',
    encoding='utf-8-sig',  # 更好的跨平台兼容性
    quoting=1  # 非数值字段加引号
)

print("\n结构化CSV已生成：structured_tariffs.csv")
print("输出文件路径：", os.path.abspath('structured_tariffs.csv'))


结构化CSV已生成：structured_tariffs.csv
输出文件路径： /kaggle/working/structured_tariffs.csv


In [None]:
# 可解析格式示例
"4% + $1.5/kg" → 按千克计算为4% + 1.5美元/kg
"2.5% 1/" → 2.5%
"Free (MX)" → 0%

In [11]:
# define
import re
import pandas as pd

def parse_mfn_rate(mfn_raw: str) -> str:
    """
    从 MFN Raw 中提取数值税率（如 '4% 1/' -> '4.0%'；'Free 1/' -> '0.0%'）。
    """
    if not mfn_raw:
        return None
    # 如果出现 Free
    if 'free' in mfn_raw.lower():
        return '0.0%'
    
    # 尝试匹配形如 '2%'、'2.5%'、'4%'、'27.5%' 等
    match = re.search(r'(\d+(\.\d+)?)\%', mfn_raw)
    if match:
        return match.group(1) + '%'
    return None


def parse_usmca_rate(special_raw: str) -> str:
    """
    简单示例：如果 special_raw 中出现 'Free'，
    且我们认为它适用于 USMCA（美加墨），则返回 '0.0%'；否则返回 None。
    实际业务中需根据更精确的协定代码来判断。
    """
    if not special_raw:
        return None
    if 'free' in special_raw.lower():
        # 这里可以进一步判断是否包含对 US / CA / MX 的说明
        # 如果仅有 Free(AU, SG...) 而没有美加墨，则也可能是 None
        # 这里只做演示，假设只要出现 Free 就给 0.0%
        return '0.0%'
    return None


def parse_anti_dumping(adcvd_raw: str) -> str:
    """
    如果 ADCVD Raw 中出现形如 '25%'、'27.5%' 等数值，则视为反倾销税。
    """
    if not adcvd_raw:
        return None
    match = re.search(r'(\d+(\.\d+)?)\%', adcvd_raw)
    if match:
        return match.group(1) + '%'
    return None


In [12]:
# abstrsct and concrete
# 假设 df 是从文本中解析完毕的结果
df = pd.DataFrame([
    {
        'HS Code': '8701.21.00',
        'Sub Code': '',
        'Description': 'With only compression-ignition...',
        'MFN Raw': '4% 1/',
        'Special Raw': 'Free (A+, AU, ... )',
        'ADCVD Raw': '25%'
    },
    {
        'HS Code': '8701.30.50',
        'Sub Code': '',
        'Description': 'Other track-laying tractors',
        'MFN Raw': 'Free 1/',
        'Special Raw': 'Free (A+, AU, ... )',
        'ADCVD Raw': '27.5%'
    },
    # 其他...
])

# 定义一个函数，逐行解析 MFN、USMCA、Anti-Dumping
def extract_rates(row):
    mfn_rate = parse_mfn_rate(row['MFN Raw'])
    usmca_rate = parse_usmca_rate(row['Special Raw'])
    ad_rate = parse_anti_dumping(row['ADCVD Raw'])
    return pd.Series({
        'HS Code': row['HS Code'],
        'MFN Rate': mfn_rate,
        'USMCA Rate': usmca_rate,
        'Anti-Dumping': ad_rate
    })

# 对 df 中每行应用 extract_rates
parsed_df = df.apply(extract_rates, axis=1)

print(parsed_df)


      HS Code MFN Rate USMCA Rate Anti-Dumping
0  8701.21.00       4%       0.0%          25%
1  8701.30.50     0.0%       0.0%        27.5%
