对数据集的分区，该数据集通常太大，无法放入内存。最快的过程：  
使用散列功能将客户ID映射到分区。  
计算整数散列，然后除以分区数  
按分区对数据框架进行分组，并将每个分区写入适当的目录和文件  
对于无法全部放入内存的大文件，请通过分块读取，并通过分区功能发送每个块  
现在，我们可以在单个分区上开发一个自动化的特征工程管道。  
开发管道后，我们可以使用Spark或Dask等框架通过管道并行运行分区。这将加快整体特征工程过程，并允许我们扩展到更大的数据集。  

In [18]:
import os
import pandas as pd
import numpy as np

import hashlib

# 倒入ipython的交互模块、输出显示方式，比如可以一次性输出所有打印，不用print
from IPython.core.interactiveshell import InteractiveShell
# 显示每一行的结果，一般都是只显示最后一行，而"all"是每一行都显示
InteractiveShell.ast_node_interactivity = "all"

# 分块并行处理数据
N_PARTITIONS = 1000

In [19]:
# 将一个 customer_id（字符串）进行哈希处理，生成一个 16进制整数
# 输出一个唯一且不可逆的整数值，便于后续分区或其他用途
def id_to_hash(customer_id):
    """Return a 16-bit integer hash of a customer id string"""
    # hashlib 是 Python 标准库中的一个模块，用于实现各种哈希算法（如 MD5、SHA256 等）。
    # 将输入的 customer_id 转换为 UTF-8 编码的字节串
    # 使用 MD5 哈希算法 对字节串生成哈希值
    # 哈希值的固定长度特性适合快速比较和查找。
    return int(hashlib.md5(customer_id.encode('utf-8')).hexdigest(), 16)

In [20]:
members = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/members_v3.csv', nrows = 1)
members

Unnamed: 0,msno,city,bd,gender,registered_via,registration_init_time
0,Rb9UwLQTrxzBVwCB6+bCcSQWZ9JiNLC9dXtM1oEsZA8=,1,0,,11,20110911


In [21]:
transactions = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/transactions.csv', nrows = 1)
transactions

Unnamed: 0,msno,payment_method_id,payment_plan_days,plan_list_price,actual_amount_paid,is_auto_renew,transaction_date,membership_expire_date,is_cancel
0,YyO+tlZtAXYXoZhNr3Vg3+dfVQvrBVGO8j1mfqe4ZHc=,41,30,129,129,1,20150930,20151101,0


In [22]:
logs = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/user_logs.csv', nrows = 1)
logs

Unnamed: 0,msno,date,num_25,num_50,num_75,num_985,num_100,num_unq,total_secs
0,rxIP2f2aN0rYNp+toI0Obt/N/FYQX8hcO1fTmmy2h34=,20150513,0,0,0,0,1,1,280.335


In [23]:
train = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/train.csv', nrows = 1)
train

Unnamed: 0,msno,is_churn
0,waLDQMmcOu2jLDaV1ddDkgCrB/jl6sD66Xzs0Vqax1Y=,1


In [24]:
test = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/sample_submission_v2/churn_comp_refresh/sample_submission_v2.csv', nrows = 1)
test

Unnamed: 0,msno,is_churn
0,4n+fXlyJvfQnTeKXTWT507Ll4JVYGrOC8LHCfwBmPE4=,0


In [25]:
# 将msno列进行哈希处理

# 获取了 members DataFrame 中第一个用户的 msno 值。
id_to_hash(members.loc[0, 'msno'])
# 计算了该 msno 值的哈希值，根据哈希值和分区数量，确定了该 msno 对应的数据应该被分配到哪个分区。
id_to_hash(members.loc[0, 'msno']) % N_PARTITIONS

209512247756457468966515739358104959027

27

id_to_hash(members.loc[0, 'msno'])： 这行代码首先提取了 members DataFrame 中第一行 (索引为 0) 中 msno 列的值，然后将该值传入了自定义的哈希函数 id_to_hash 进行计算， 生成了一个巨大的整数 209512247756457468966515739358104959027 (哈希值)
  
id_to_hash(members.loc[0, 'msno']) % N_PARTITIONS： 然后对这个哈希值取模 N_PARTITIONS ( 假设 N_PARTITIONS = 1000)， 得到结果 27 。  
  
这里的 N_PARTITIONS 代表数据需要被划分成多少个分区。  

% 是取模运算符，用来确定特定数据应分配到哪个分区。  

In [26]:
# 交易表哈希处理
id_to_hash(transactions.loc[0, 'msno'])
id_to_hash(transactions.loc[0, 'msno']) % N_PARTITIONS  # 取模划分区

311407269432611323870693642675616983728

728

以下代码创建N_PARTITIONS空目录。每个目录中的文件将命名完全相同，因此目录名称可用于区分分区。

In [27]:
import os

base_dir = 'data/partitions/'

if not os.path.exists(base_dir + 'p999'):
    # 为每个分区创建文件目录
    for i in range(N_PARTITIONS):
        # 如果目录已经存在，exist_ok=False，会抛出异常
        os.makedirs(base_dir + f'p{i}', exist_ok=False)
    
len(os.listdir(base_dir))

1000

创建文件

每个分区都有5个csv文件。

transactions.csv  
train.csv  
test.csv  
members.csv  
logs.csv  
以下代码为每个N_PARTITION分区中的五个文件中的每个文件写入标头。  

In [28]:
# 配置 IPython 交互式环境，使其仅显示最后一个表达式的结果。
# 从 transactions DataFrame 获取所有列名，并以逗号分隔的形式生成一个字符串。
InteractiveShell.ast_node_interactivity = 'last_expr'  # 仅显示最后一个 有效表达式 的输出
','.join(list(transactions.columns))

'msno,payment_method_id,payment_plan_days,plan_list_price,actual_amount_paid,is_auto_renew,transaction_date,membership_expire_date,is_cancel'

创建分区目录: 根据 N_PARTITIONS 的值，在 ../data/partitions/ 目录下创建多个子目录 (p0, p1, ... p999)，用于数据分区。  

创建空白 CSV 文件： 在每个分区目录下，创建五个指定名称的空白 CSV 文件 (transactions.csv, train.csv, test.csv, members.csv, logs.csv)。  
 
写入表头： 将之前读取的 DataFrame 的列名写入相应的 CSV 文件，作为表头。

In [29]:
def create_blank_partitions():
    """Create blank files in each partition and write the file header"""
    # 为每个分区创建带有头文件的五个csv文件，1000个文件
    for i in range(N_PARTITIONS):
        directory = base_dir + f'p{i}/'
        # 创建五个csv文件
        for file in ['transactions.csv', 'train.csv', 'test.csv', 'members.csv', 'logs.csv']:
            # 文件头写入第一行
            with open(directory + file, 'w') as f:
                if file == 'transactions.csv':
                    f.write(','.join(list(transactions.columns)))
                elif file == 'train.csv':
                    f.write(','.join(list(train.columns)))
                elif file == 'test.csv':
                    f.write(','.join(list(train.columns)))
                elif file == 'members.csv':
                    f.write(','.join(list(members.columns)))
                elif file == 'logs.csv':
                    f.write(','.join(list(logs.columns)))
                    
    return directory

In [30]:
directory = create_blank_partitions()
# 返回最后一个创建的p999问价夹中的内容
os.listdir(directory)

['test.csv', 'members.csv', 'logs.csv', 'train.csv', 'transactions.csv']

In [31]:
# 当需要写入数据时，我们将使用附加(a)选项打开现有文件，并添加到任何文件中。此时，每个文件都只有一个标题。
pd.read_csv(directory + 'members.csv').head()

Unnamed: 0,msno,city,bd,gender,registered_via,registration_init_time


### 向csv文件内写入数据，写一行的例子  
 
对于每个文件，将数据写入分区的一个选项是一次迭代一个行。处理行的过程是：  

通过散列将客户ID转换为整数  
通过模数除以分区数将整数转换为分区数  
将行附加到正确的分区目录和文件  
让我们看看这在单行上是如何工作的。

In [32]:
# 一行一行的方式迭代df
# i表示行号，row表示该行
for i, row in members.iterrows():
    # 通过对id进行散列查找分区
    partition = id_to_hash(row['msno']) % N_PARTITIONS
    # 'a'：打开文件进行追加。如果文件不存在，会创建新文件。如果文件已经存在，新的内容会被添加到文件的末尾，而不会覆盖原有内容
    with open(base_dir + f'p{partition}/members.csv', 'a') as f:
        # 写一个换行符，然后写信息
        f.write('\n')
        # 以逗号为分隔符写入每个新的csv文件中，遍历行
        f.write(','.join([str(x) for x in row.values]))
    if i > 1:
        break

In [33]:
pd.read_csv(base_dir + f'p{partition}/members.csv')

Unnamed: 0,msno,city,bd,gender,registered_via,registration_init_time
0,Rb9UwLQTrxzBVwCB6+bCcSQWZ9JiNLC9dXtM1oEsZA8=,1,0,,11,20110911


In [17]:
一切看起来在第一次尝试时都进展顺利。然而，我们可能想问一下，使用 iterrows 逐行遍历数据集是否是最快的选择。

SyntaxError: invalid character '。' (U+3002) (2725757882.py, line 1)

不同方法的性能

有许多不同的选项来处理将数据写入到正确的分区。为了找出哪种方法最好，我们将尝试 4 种方法：

df.iterrows()： 逐行遍历 DataFrame，每一行表示为一个 Series 对象。

df.itertuples()： 逐行遍历 DataFrame，每一行表示为一个元组对象。

df.apply()： 逐行遍历 DataFrame，每次使用 apply 函数。

groupby(partition) 并使用 to_csv() 保存每个分组： 逐个分区地遍历 DataFrame。

这四种方法具有不同的适用性和性能特征（请参阅这个 Stack Overflow 回答）。 要找出哪个最快的方法，最好的方式是都尝试一下。 这并不意味着代表所有使用情况，因此你的具体结果可能会有所不同。

In [34]:
from timeit import default_timer as timer

In [35]:
members = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/members_v3.csv')
members.shape

(6769473, 6)

### Iterrows 方法

要尝试的第一个实现是 iterrows。这种方法相当慢，因为 Pandas 在迭代之前会将每一行打包成一个 Pandas Series 对象。然而，它允许我们使用常规的定位方式访问每个值。

Iterrows: 这是 Pandas DataFrame 的一个方法，用于逐行迭代 DataFrame。

In [36]:
start = timer()

# 遍历 members.csv 中的行列内容
for i, row in members.iterrows():
    # 通过对id进行散列查找分区号
    partition = id_to_hash(row['msno']) % N_PARTITIONS
    
    # 打开文件将 members.csv 中的信息追加到各个分区的 csv 文件中
    with open(base_dir + f'p{partition}/members.csv', 'a') as f:
        # Write a new line and then data
        f.write('\n')
        f.write(','.join([str(x) for x in row.values]))

    # 每一万条信息打印一次
    if i % 10000 == 0:
        # 打印处理进度，用当前处理到第几行处以总条数，保留两位显示百分比    '\r'是回车
        print(f'{100 * round(i / members.shape[0], 2)}% complete. {round(timer() - start)} seconds elapsed.', end = '\r')

end = timer()
print(f'Processing {i} lines took {round(end - start)} seconds using iterrows.')

Processing 6769472 lines took 575 seconds using iterrows.


In [37]:
pd.read_csv(base_dir + f'p{partition}/members.csv').head()

Unnamed: 0,msno,city,bd,gender,registered_via,registration_init_time
0,+zMKqjvsTvD7O0Fvntk3VXe4ovwvD4KYk6PJZ92Ky60=,1,0,,9,20161227
1,3p2AY1tZAYa4LFcs0/plkuPv2hY9smh/xgcbKjtU9Dc=,5,26,male,3,20141109
2,BO0XUBzHeItkHI3N5g4uL08Ld1T/ZW/8GrbjBmT4s3w=,1,0,,7,20161228
3,qy7PNK2EE4+x6xeIdqjFVw5FlmxnFKylv6LKqqGbSo4=,1,23,female,4,20170113
4,c36721uHBQyhoVko21J9rR44Fex2ul72a74k0M7IkiQ=,1,0,,4,20170121


In [None]:
这种方法有效，但速度相当慢。

### Itertuples 方法

itertuples 应该比 iterrows 快，因为 Pandas 将每一行打包为一个元组而不是一个 Series 对象。   
这种权衡是，我们需要在访问 Series 的元素时小心，因为我们不能通过名称来引用它们。  
例如，为了确保我们正在哈希处理客户 ID (msno)，我们需要获取元组的第二个元素。

Itertuples: 这是 Pandas DataFrame 的一个方法，用于逐行迭代 DataFrame。

元组 (tuple): Python 中的一种数据结构，类似于列表，但是元组是不可变的，并且通常用于表示固定的数据项。

Series 对象： 这是 Pandas 中的一种数据结构，类似于一维数组，并且带有索引。

权衡 (tradeoff): 这里指的是，使用 itertuples 方法获得性能提升的同时，也带来了一些新的挑战。

通过名称来引用它们 (refer to them by name): 指的是使用列名来访问 Series 中的数据（例如 row['msno']）。

元组的第二个元素: 由于元组不能通过列名来访问，因此需要使用基于位置的索引（例如row[1]）来访问指定列的值。

In [38]:
_ = create_blank_partitions()

In [39]:
start = timer()

for i, tup in enumerate(members.itertuples()):
    
    # 计算分区 id
    partition = id_to_hash(tup[1]) % N_PARTITIONS
    
    # Open file for appending
    with open(base_dir + f'p{partition}/members.csv', 'a') as f:
        # Write a new line and then data
        f.write('\n')
        f.write(','.join([str(x) for x in tup[1:]]))
        
    if i % 10000 == 0:
        print(f'{100 * round(i / members.shape[0], 2)}% complete. {round(timer() - start)} seconds elapsed.', end = '\r')

end = timer()
print(f'Processing {i} lines took {round(end - start)} seconds using itertuples.')

Processing 6769472 lines took 725 seconds using itertuples.


iterrows() 返回的是 Series，每一行的数据被包装成一个 Pandas Series 对象，索引是列名，值是对应列的数据。  
itertuples() 返回的是一整行的数据，但以 命名元组（namedtuple） 的形式，既可以通过字段名（列名）访问，也可以通过索引访问。

In [40]:
pd.read_csv(base_dir + f'p{partition}/members.csv').head()

Unnamed: 0,msno,city,bd,gender,registered_via,registration_init_time
0,+zMKqjvsTvD7O0Fvntk3VXe4ovwvD4KYk6PJZ92Ky60=,1,0,,9,20161227
1,3p2AY1tZAYa4LFcs0/plkuPv2hY9smh/xgcbKjtU9Dc=,5,26,male,3,20141109
2,BO0XUBzHeItkHI3N5g4uL08Ld1T/ZW/8GrbjBmT4s3w=,1,0,,7,20161228
3,qy7PNK2EE4+x6xeIdqjFVw5FlmxnFKylv6LKqqGbSo4=,1,23,female,4,20170113
4,c36721uHBQyhoVko21J9rR44Fex2ul72a74k0M7IkiQ=,1,0,,4,20170121


In [None]:
这种方法快得多，因为 Pandas 不必将每一行转换为一个 Series 对象，而 Series 对象比元组有更多的开销。

In [41]:
_ = create_blank_partitions()

### Apply 方法

另一个选项是使用 apply 方法来处理行。   
为了使用 apply， 我们需要编写一个小函数，该函数保存当前行，然后使用 axis = 1 将 apply 函数调用到 DataFrame 上，这会将每一行发送到函数。   
这也会将每一行作为 Series 发送到函数，但在实践中它看起来比 iterrows 快得多。

In [42]:
def save_row(row, name):
    # Find the partition number by hashing the id
    partition = id_to_hash(row['msno']) % N_PARTITIONS
    
    # Open file for appending
    with open(base_dir + f'p{partition}/{name}.csv', 'a') as f:
        # Write a new line and then data
        f.write('\n')
        f.write(','.join([str(x) for x in row.values]))

In [43]:
from tqdm import tqdm_notebook
from tqdm import tqdm
tqdm.pandas()

start = timer()
# apply 应用函数，将csv中的每一行作用于函数，速度适中，有良好的封装性
members.progress_apply(save_row, axis = 1, name = 'members')
end = timer()

print(f'Processing {members.shape[0]} rows took {round(end - start)} seconds using apply.')

100%|██████████████████████████████| 6769473/6769473 [09:56<00:00, 11356.47it/s]

Processing 6769473 rows took 596 seconds using apply.





In [44]:
pd.read_csv(base_dir + f'p{partition}/members.csv').head()

Unnamed: 0,msno,city,bd,gender,registered_via,registration_init_time
0,+zMKqjvsTvD7O0Fvntk3VXe4ovwvD4KYk6PJZ92Ky60=,1,0,,9,20161227
1,3p2AY1tZAYa4LFcs0/plkuPv2hY9smh/xgcbKjtU9Dc=,5,26,male,3,20141109
2,BO0XUBzHeItkHI3N5g4uL08Ld1T/ZW/8GrbjBmT4s3w=,1,0,,7,20161228
3,qy7PNK2EE4+x6xeIdqjFVw5FlmxnFKylv6LKqqGbSo4=,1,23,female,4,20170113
4,c36721uHBQyhoVko21J9rR44Fex2ul72a74k0M7IkiQ=,1,0,,4,20170121


In [None]:
所以 apply 比 iterrows 快，但比 itertuples 慢（至少在这种情况下）。

## Groupby 方法

In [None]:
_ = create_blank_partitions()

In [None]:
Groupby

我们将尝试的最后一个选项是在一次操作中，将所有客户 ID 转换为分区编号后，按分区对数据进行分组。  

一次性使用哈希函数计算分区。  

按分区进行分组。  

将分组后的 DataFrame 写入到正确的分区目录和文件中。  

为了找出将所有客户 ID 转换为整数的最快方法，我们可以比较 map 和 apply。  

In [None]:
Groupby: 指的是 Pandas DataFrame 中的 groupby() 方法，用于将数据按照某些列的值进行分组。  

Partition: 分区，指将数据划分成更小的部分。  

Hashing function: 哈希函数，一种用于将数据映射为固定长度哈希值的函数。  

map： pandas中的map函数，可以高效的映射转换。  

apply pandas中的apply函数， 可以基于行或者列进行处理，比较灵活，但是效率不如向量化操作。  

In [45]:
%%timeit -n 1 -r 3
members['msno'].map(id_to_hash) % 1000

3.07 s ± 20.3 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


In [None]:
%%timeit 是 Jupyter Notebook 的一个魔法命令，用于测量代码块的运行时间。
	参数：
	-n 1 表示每次只运行一次（loop count）。
	-r 3 表示重复测试 3 次，然后取平均值和标准差。

In [47]:
%%timeit -n 1 -r 3
members['msno'].apply(id_to_hash) % 1000

2.96 s ± 77.5 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


members['msno'].map(id_to_hash) % 1000：

members['msno']: 选取 members DataFrame 中列名为 msno 的数据，返回一个 Series 对象。

.map(id_to_hash): 对 msno 列的每个元素执行 id_to_hash 函数。

% 1000: 对 map 返回的哈希值进行取模操作，除以 1000 得到分区索引。

作用： 这行代码使用 map 方法，对 members DataFrame 中所有 msno 列的值进行哈希运算，并取模，从而获得分区索引。

members['msno'].apply(id_to_hash) % 1000：

members['msno']: 选取 members DataFrame 中列名为 msno 的数据，返回一个 Series 对象。

.apply(id_to_hash): 对 msno 列的每个元素执行 id_to_hash 函数。

% 1000: 对 apply 返回的哈希值进行取模操作，除以 1000 得到分区索引。

作用： 这行代码使用 apply 方法，对 members DataFrame 中所有 msno 列的值进行哈希运算，并取模，从而获得分区索引。

*看起来 apply 稍微快一些*，  
我们将一次性将所有客户 ID 转换为分区，因此这不是一个很大的时间成本。  
在我们继续之前，我们应该确保哈希和转换为分区操作创建的分区大小接近。

In [48]:
members['partition'] = members['msno'].apply(id_to_hash) % 1000
members['partition'].value_counts().head()

partition
689    7027
842    7012
844    6992
660    6986
91     6983
Name: count, dtype: int64

In [49]:
members['partition'].value_counts().describe()

count    1000.000000
mean     6769.473000
std        81.213231
min      6494.000000
25%      6716.000000
50%      6773.000000
75%      6825.000000
max      7027.000000
Name: count, dtype: float64

看起来每个分区中的成员数量相当恒定。我们可以检查另一个数据集来确保这一点。  
最大的文件中有7027个用户，最小的有6494个用户

In [50]:
transactions = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/transactions.csv')
transactions['partition'] = transactions['msno'].apply(id_to_hash) % 1000
transactions['partition'].value_counts().describe()

count     1000.000000
mean     21547.746000
std        618.434304
min      19572.000000
25%      21125.750000
50%      21528.500000
75%      21962.750000
max      23714.000000
Name: count, dtype: float64

接下来的单元格（cell）将以分组（groupby）的方式运行，来对数据进行分区。  
需要注意的最大事项是，确保我们每次都以附加模式（使用 open with a）写入文件。  
当我们使用 to_csv 写入 CSV 文件时，我们可以传入一个已经打开的文件。  
我们也不写入表头，因为我们已经在每个文件中创建了表头，并且不写入索引。

In [51]:
start = timer()
members['partition'] = members['msno'].apply(id_to_hash) % N_PARTITIONS

# Iteration through grouped partitions
# 通过 partition 直接进行分组，将分组后的 grouped 行直接写入csv文件中
for partition, grouped in members.groupby('partition'):

    # 删除分区号
    grouped = grouped.drop(columns = 'partition')
    # Open file for appending
    # 追加组中数据
    with open(base_dir + f'p{partition}/members.csv', 'a') as f:
        f.write('\n')
        grouped.to_csv(f, header = False, index = False)
        
end = timer()
print(f'Processing {members.shape[0]} rows took {round(end - start)} seconds using groupby.')

Processing 6769473 rows took 12 seconds using groupby.


In [52]:
pd.read_csv(base_dir + f'p{partition}/members.csv').head()

Unnamed: 0,msno,city,bd,gender,registered_via,registration_init_time
0,bpIibSSY6wymQbGaQOR9q6dcWKg7lUfw3Y+LttzAQNQ=,1,0,,4,20170104
1,HZwqy9brMyBDuFVpXlAqli8yoAixLc1rA0ExAZYZR50=,1,0,male,3,20130220
2,kW0/xDZUihRKFMa3ti+vq3fF/O2li5aYpY+szvzg0ko=,1,0,,3,20130227
3,yCUq5TNkbcJF0inE45ICYI//gZ+FzPmwSZWmFie4nk8=,6,31,female,9,20130305
4,VsM62mNuBRPH2YZSZKaRlD0IQsqoJa55aKxukV84oY4=,1,0,female,3,20150213


按分组去存储csv文件就是提前将所有的分区号归类然后进行统一保存，相对于前面的iterrows、itertuples、progress_apply，都是一条一条去处理，而groupby是根据分组进行处理，很高效  
按分区对数据进行分组的方法是目前最快的方法。我们将把这个方法放入一个函数中，以便用于所有数据集。

可复用的哈希 DataFrame 函数

为了使该过程可复用，我们将编写一个函数来为我们完成此操作。  
它将接收一个 DataFrame，一个用于保存数据的文件名，以及一个可选的进度参数。该函数将使用哈希模运算（hash-modulo）分区数的方法，将客户 ID (msno) 映射到分区编号，按分区对 DataFrame 进行分组，并将分组后的 DataFrame 写入到相应的目录。

In [None]:
# members = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/members_v3.csv', nrows = 1)
# transactions = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/transactions.csv', nrows = 1)
# logs = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/user_logs.csv', nrows = 1)
# train = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/train.csv', nrows = 1)
# test = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/sample_submission_v2.csv', nrows = 1)

# _ = create_blank_partitions()

In [53]:
pd.read_csv('data/partitions/p999/members.csv').head()

Unnamed: 0,msno,city,bd,gender,registered_via,registration_init_time
0,bpIibSSY6wymQbGaQOR9q6dcWKg7lUfw3Y+LttzAQNQ=,1,0,,4,20170104
1,HZwqy9brMyBDuFVpXlAqli8yoAixLc1rA0ExAZYZR50=,1,0,male,3,20130220
2,kW0/xDZUihRKFMa3ti+vq3fF/O2li5aYpY+szvzg0ko=,1,0,,3,20130227
3,yCUq5TNkbcJF0inE45ICYI//gZ+FzPmwSZWmFie4nk8=,6,31,female,9,20130305
4,VsM62mNuBRPH2YZSZKaRlD0IQsqoJa55aKxukV84oY4=,1,0,female,3,20150213


In [54]:
# 传入df、需要拆分的csv文件，处理多少行后显示进度信息，默认不显示
def partition_by_hashing(df, name, progress = None):
    """Partition a dataframe into N_PARTITIONS by hashing the id.
    
    Params
    --------
        df (pandas dataframe): dataframe for partition. Must have 'msno' column.
        name (str): name of dataframe. Used for saving the row data.
        progress (int, optional): number of rows to be processed before displaying information.
                                  Defaults to None
                                  
    Returns:
    --------
        Nothing returned. Dataframe is saved one line at a time as csv files to the N_PARTITIONS 
    """
    """通过对 id 进行哈希将 DataFrame 划分为 N_PARTITIONS。

    参数
    --------
        df (pandas DataFrame): 需要划分的 DataFrame。必须包含 'msno' 列。
        name (str): DataFrame 的名称。用于保存行数据。
        progress (int, 可选): 每处理多少行后显示进度信息。默认为 None。
        
    返回
    --------
        无返回值。DataFrame 会一行一行地保存为 CSV 文件，划分到 N_PARTITIONS 中。
"""
    start = timer()
    
    # Map the customer id to a partition number
    df['partition'] = df['msno'].apply(id_to_hash) % N_PARTITIONS
    
    # Iterate through one row at a time
    for partition, grouped in df.groupby('partition'):
        
        # Don't need to save the partition column
        grouped = grouped.drop(columns = 'partition')
        
        # Open file for appending
        with open(base_dir + f'p{partition}/{name}.csv', 'a') as f:
            # Write a new line and then data
            f.write('\n')
            grouped.to_csv(f, header = False, index = False)
            
        # Record progress every `progress` steps
        if progress is not None:
            if partition % progress == 0:
                print(f'{100 * round(partition / N_PARTITIONS, 2)}% complete. {round(timer() - start)} seconds elapsed.', end = '\r')
    
    end = timer()
    if progress is not None:
        print(f'\n{df.shape[0]} rows processed in {round(end - start)} seconds.')

In [56]:
# 直接调用
members = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/members_v3.csv')
partition_by_hashing(members, name = 'members', progress = 10)

99.0% complete. 10 seconds elapsed.nds elapsed..
6769473 rows processed in 10 seconds.


In [57]:
pd.read_csv(base_dir + f'p{partition}/members.csv').head()

Unnamed: 0,msno,city,bd,gender,registered_via,registration_init_time
0,bpIibSSY6wymQbGaQOR9q6dcWKg7lUfw3Y+LttzAQNQ=,1,0,,4,20170104
1,HZwqy9brMyBDuFVpXlAqli8yoAixLc1rA0ExAZYZR50=,1,0,male,3,20130220
2,kW0/xDZUihRKFMa3ti+vq3fF/O2li5aYpY+szvzg0ko=,1,0,,3,20130227
3,yCUq5TNkbcJF0inE45ICYI//gZ+FzPmwSZWmFie4nk8=,6,31,female,9,20130305
4,VsM62mNuBRPH2YZSZKaRlD0IQsqoJa55aKxukV84oY4=,1,0,female,3,20150213


## 训练数据  

现在我们可以使用此函数来对训练数据进行分区。

In [58]:
train = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/train.csv')
partition_by_hashing(train, name = 'train', progress = 10)

99.0% complete. 1 seconds elapsed.onds elapsed..
992931 rows processed in 1 seconds.


In [59]:
pd.read_csv(base_dir + f'p{partition}/train.csv').head()

Unnamed: 0,msno,is_churn
0,Ad9xf2W6ID3zguduv1lKdla80V/iT2cFWbDxIcEMQOs=,1
1,nL5mKfbpD9mjLJiIbOa1MsrHXKlmC4Nt5S3ieKtqUq8=,1
2,TmSYOz2xCdk5j1hmUP72/FQdkU3kpoqEl9RDc9UBtf8=,1
3,NNYVPCEq8Pk2QYIGSBSIiO+XcX/Sqa2TG5+szr4DMuk=,1
4,bhzedCaoyawwHOssUE6IXI1BP4I0/4nDC1H6CtXcIK4=,1


### 测试数据

函数的优点在于我们可以不断地应用它，只需要改变参数即可！

Testing Data: 测试数据。  

function: 这里指代之前定义的函数，也就是可复用的哈希 DataFrame 函数。  

keep applying it: 可以反复应用它。  

changing only the arguments: 只需要修改参数，就可以将函数复用到不同数据。  

In [60]:
test = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/sample_submission_v2/churn_comp_refresh/sample_submission_v2.csv')
partition_by_hashing(test, name = 'test', progress = 10)

99.0% complete. 1 seconds elapsed.onds elapsed..
907471 rows processed in 1 seconds.


In [61]:
pd.read_csv(base_dir + f'p{partition}/test.csv').head()

Unnamed: 0,msno,is_churn
0,bFTbnI7GC8TZJ2m9dfLF0bkFIEsy/0ERZzmFFekPWpY=,0
1,SvdAOG3xp9glZIopNrlnOOiOLHdmnpHfwRGnNUpOrmo=,0
2,GG+L3Jv6naoL8JjQYPIem7ISmtoHcMX453sqTSFcl2Q=,0
3,OdT/fU9BKs+KrLeypHtoeQut97PBNWlDvjnMCUsJVzY=,0
4,f0SvlQcRZBgZbOaoPBkj57xo81+GnlluK1rLyy0PQnc=,0


### 交易数据

倒数第二个数据集是客户交易数据。

In [62]:
transactions = pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/transactions.csv')
partition_by_hashing(transactions, name = 'transactions', progress = 10)

99.0% complete. 39 seconds elapsed.onds elapsed..
21547746 rows processed in 40 seconds.


In [63]:
pd.read_csv(base_dir + f'p{partition}/transactions.csv').head()

Unnamed: 0,msno,payment_method_id,payment_plan_days,plan_list_price,actual_amount_paid,is_auto_renew,transaction_date,membership_expire_date,is_cancel
0,igTF6Ef1Y1chfDlEjV+59Hgp4mfh8ZNVXF/vlv1TvhY=,41,30,129,129,1,20160418,20160518,0
1,wlwPlei0VKJ6AF07YrL04gpbebUPitqtnqa3CnL4tmI=,41,30,149,0,1,20150107,20160106,0
2,m40NTt7XHbAVwCI4y6TYH0XBJdPeVVpBUSKT9NXP7xE=,39,30,149,149,1,20161231,20170210,0
3,wQ8y0ZtCUniH1rWDKyltgP+nfxDV9hAvjcFBa7l9uAU=,41,30,99,99,1,20160731,20160831,0
4,CZ87lCEDKmbk7EXCy6Kh0zopNJxoEBTfWq9Eq9Az+28=,32,195,894,894,0,20160726,20170209,0


### 用户日志数据

由于文件太大，最终的数据集无法直接传递给函数， 这甚至会阻止我们将整个文件读取到内存中。   
相反，我们可以使用 Pandas 一次读取一个数据块（chunk），并将该函数应用于每个数据块。  
实际上有两个大小明显不同的日志文件，但我们将对每个文件使用相同的分块方法。

In [64]:
print(os.stat('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/user_logs.csv').st_size / 1e9)
print(os.stat('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/user_logs_v2/churn_comp_refresh/user_logs_v2.csv').st_size / 1e9)

30.514081415
1.431465728


第二个用户日志可以使用之前的方式进行处理，因为它可以被完整地读取到内存中，但我们仍将继续使用分块方法。  
chunksize 指的是一次读取的行数。使用 Pandas 的 read_csv 并指定 chunksize，我们就可以一次迭代一个数据块的文件

In [65]:
# 1e6 = 1,000,000 = 100w
chunksize = 1e6
start = timer()

for chunk in pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/user_logs_v2/churn_comp_refresh/user_logs_v2.csv', chunksize = chunksize):
    partition_by_hashing(chunk, name = 'logs', progress = None)
    
    if (i + 1) % 10 == 0:
        print(f'{i * chunksize} rows processed.', end = '\r')

end = timer()
print(f'\nOverall time: {round(end - start)} seconds.')


Overall time: 49 seconds.


In [66]:
pd.read_csv(base_dir + f'p{partition}/logs.csv').head()

Unnamed: 0,msno,date,num_25,num_50,num_75,num_985,num_100,num_unq,total_secs
0,555bSTll4Rzaz1vBg/VfGfaXKEE8S74EAdKPOCkBqpg=,20170301,2,0,0,1,25,26,6306.242
1,KORJwjTctoWENM9oM2Rrl432wxllmC1RvP5p84PUFI8=,20170316,6,1,2,3,36,34,10718.255
2,+GdZIQJdsQSeRKyu/GONhgWzK4R8Ufm59RpzwAj4OCE=,20170310,5,1,0,0,21,21,4948.548
3,9yZc5dve4sg96RQT1FyTPSmcDWrIVu+qsI7W7oOmFvE=,20170306,0,0,1,0,28,1,3436.411
4,Rq9NMOw9RAbN3qUIG1MUU+vL0lqhAJUAsZiW0dsm/Lg=,20170322,9,7,4,2,48,67,13534.787


In [67]:
chunksize = 1e7

start = timer()

for i, chunk in enumerate(pd.read_csv('/Users/dususu/Desktop/kkbox-churn-prediction-challenge/user_logs.csv', chunksize = chunksize)):
    partition_by_hashing(chunk, name = 'logs', progress = None)
    
    if (i + 1) % 10 == 0:
        print(f'{i * chunksize} rows processed.', end = '\r')
    
end = timer()
print(f'\nOverall time: {round(end - start)} seconds.')

390000000.0 rows processed.
Overall time: 969 seconds.


In [68]:
pd.read_csv(base_dir + f'p{partition}/logs.csv').tail()

Unnamed: 0,msno,date,num_25,num_50,num_75,num_985,num_100,num_unq,total_secs
402013,fBNdslCoBSVgRLCTT/1wz2u5BJE0D4TuF+8g0+IPOLQ=,20160509,0,0,0,0,15,15,4187.548
402014,fBNdslCoBSVgRLCTT/1wz2u5BJE0D4TuF+8g0+IPOLQ=,20160703,1,0,0,0,4,4,924.083
402015,fBNdslCoBSVgRLCTT/1wz2u5BJE0D4TuF+8g0+IPOLQ=,20161004,2,0,0,0,24,26,6031.879
402016,fBNdslCoBSVgRLCTT/1wz2u5BJE0D4TuF+8g0+IPOLQ=,20170101,0,3,2,0,1,5,790.686
402017,xVJ8UUCOfXv5dXwxjFP5ffOHP+kgRRZbM7Mf5hJe0AE=,20150126,2,0,0,0,1,3,321.985


### 结论

在这个 Notebook 中，我们实现了一个数据集的分区，该数据集通常太大而无法放入内存。在尝试了几种选择后，我们最终决定了最快的方法，即：

使用哈希函数将客户 ID 映射到分区。

计算整数哈希值，然后除以分区数。

按分区对 DataFrame 进行分组，并将每个分区写入相应的目录和文件。

对于无法完全放入内存的大文件，通过分块读取，并将每个数据块发送到分区函数。

现在我们可以在单个分区上开发一个自动化的特征工程管道。 在管道开发完成后，我们可以使用 Spark 或 Dask 等框架并行运行管道中的分区。这将加快整体的特征工程过程，并允许我们扩展到更大的数据集。

### 下一步

为了实现机器学习解决方案，我们需要采取以下流程中概述的几个步骤：

预测工程： 定义业务需求，并将其转化为机器学习问题。创建一组有标签的历史示例（称为标签时间），这些示例可以用于为每个标签构建特征。

使用标签时间来构建每个标签的特征： 通过过滤截至截止时间之前的数据。这个过程可以使用自动化特征工程快速完成。

训练一个机器学习算法： 从这些特征预测标签。一旦模型优化完成，就使用它来对新数据进行预测。

由于数据已被分区，前两步可以并行快速完成。第一步在“预测工程”Notebook 中实现。