1.读取CSV文件

In [None]:
import pandas as pd
import dask
import dask.dataframe as dd
from datetime import timedelta

file_path = r'.\data'

#dask处理大型文件，相较于pandas可极大节省时间。
#因为表中存在URL链接，因此解码容易出错，忽略。
video_data = dd.read_csv(file_path+r'\video_data.csv',encoding='GB18030',encoding_errors='ignore',
usecols=['category', 'author_id','name','title',
'rid','comment','likes','share','duration','createtime',
'product','sales','volume','aweme_url','collect_count',
'download_count','forward_count','play_count','product_count',],
dtype={'category':'string','author_id': 'string','name': 'string','title': 'string',
'rid': 'string','comment': 'string','likes': 'float32','share':'float32','duration': 'float32',
'product': 'string','sales':'float32','volume':'float32','aweme_url':'string','collect_count':'float32',
'download_count': 'float32','forward_count': 'float32','play_count':'float32','product_count': 'float32'
})

print(video_data.head())

2.选取并保留活跃博主

In [None]:
start_week_begin = pd.Timestamp("2023-06-05")
start_week_end = start_week_begin + timedelta(days=7)
end_week_begin = pd.Timestamp("2024-06-24")
end_week_end = end_week_begin + timedelta(days=7)

authors_info = dd.read_csv(file_path+r'\authors_info.csv',encoding='UTF-8')
print(authors_info["video_earliest_createtime"].head())

authors_info["video_earliest_createtime"] = dd.to_datetime(
    authors_info["video_earliest_createtime"],
    format="%Y-%m-%d %H:%M:%S",
    errors="coerce")
authors_info["video_latest_createtime"] = dd.to_datetime(
    authors_info["video_latest_createtime"],
    format="%Y-%m-%d %H:%M:%S",
    errors="coerce")

start_week_mask = (authors_info["video_earliest_createtime"] >= start_week_begin) & \
                  (authors_info["video_earliest_createtime"] <= start_week_end)
end_week_mask = (authors_info["video_latest_createtime"] >= end_week_begin) & \
                (authors_info["video_latest_createtime"] <= end_week_end)

start_week_authors = authors_info.loc[start_week_mask, "author_id"].unique()
end_week_authors = authors_info.loc[end_week_mask, "author_id"].unique()

active_authors = set(start_week_authors) & set(end_week_authors)#得到活跃博主清单

print(f"活跃博主数量: {len(active_authors)}")
print("活跃博主ID示例:", list(active_authors)[:5])

active_mask = video_data["author_id"].isin(active_authors)
video_data = video_data[active_mask]#筛选活跃博主

print("活跃博主数据示例:")
print(video_data.head())

3.以video_data为主表进行数据合并操作

In [None]:
video_data = video_data[video_data.sales != 0]

category_columns = [
    'v11_category_big', 'v11_category_big_id',
    'v11_category_first', 'v11_category_first_id',
    'v11_category_second', 'v11_category_second_id',
    'v11_category_third', 'v11_category_third_id',
    'v11_category_fourth', 'v11_category_fourth_id']

product_data = dd.read_csv(
    file_path+r'\product_data.csv',
    usecols=['volume_text','brand_name','product_id','product_title'] + category_columns,
    dtype={'volume_text':'string','brand_name':'string',
    'product_id':'string','product_title':'string',
    'v11_category_big':'category',
    'v11_category_big_id':'int16',
    'v11_category_first':'category',
    'v11_category_first_id':'int16',
    'v11_category_second':'category',
    'v11_category_second_id':'int16',
    'v11_category_third':'category',
    'v11_category_third_id':'int16',
    'v11_category_fourth':'category',
    'v11_category_fourth_id':'int16'})

def clean_volume_text(text):
    if pd.isna(text):
        return 0

    #清晰数据中的引号与逗号，以实现后续的float格式转换
    text = str(text).replace("'", "").replace('"', '')
    text = text.replace(",", "")

    try:
        return float(text)
    except ValueError:
        return 0


product_data['volume_numeric'] = product_data['volume_text'].map(clean_volume_text, meta=('volume_text', 'float64'))

#首先获取每个product_title的最大volume_numeric
max_volume_per_title = product_data.groupby('product_title')['volume_numeric'].max().reset_index()

#然后与原始数据合并，只保留那些volume_numeric等于最大值的行
product_data_deduplicated = product_data.merge(
    max_volume_per_title,
    on=['product_title', 'volume_numeric'],
    how='inner')

merged_dask = video_data.merge(
    product_data_deduplicated[['volume_text','brand_name','product_id','product_title'] + category_columns],
    left_on='product',
    right_on='product_title',
    how='left').drop(columns=['product_title'])

print(merged_dask.head())

fan_trend = dd.read_csv(file_path+r'\fan_trend.csv',encoding='UTF-8')

merged_dask['createtime'] = dd.to_datetime(merged_dask['createtime'], 
                                               format='%Y/%m/%d %H:%M', 
                                                errors='coerce')
fan_trend['time_node'] = dd.to_datetime(fan_trend['time_node'], 
                                              format='%Y-%m-%d %H:%M:%S', 
                                              errors='coerce')
merged_dask = merged_dask.assign(
    createtime = merged_dask['createtime'].astype('datetime64[ns]'))
fan_trend = fan_trend.assign(
    time_node = fan_trend['time_node'].astype('datetime64[ns]'))

merged_dask['date_only'] = dd.to_datetime(merged_dask['createtime']).dt.date
fan_trend['date_only'] = dd.to_datetime(fan_trend['time_node']).dt.date
merged_dask = merged_dask.assign(
    date_only = merged_dask['date_only'].astype('datetime64[ns]'))#修正日期格式
fan_trend = fan_trend.assign(
    date_only = fan_trend['date_only'].astype('datetime64[ns]'))#修正日期格式

merged_dask1 = merged_dask.merge(
    fan_trend[['cid','date_only','follower_count']],
    left_on=['author_id','date_only'],
    right_on=['cid', 'date_only'],
    how='left'
).drop(columns=['cid', 'date_only'])

print(merged_dask1.head())

merged_dask2 = merged_dask1.merge(
    authors_info[['author_id','category']],
    left_on=['author_id'],
    right_on=['author_id'],
    how='left')

#删除其他表，大幅释放内存
del merged_dask
del merged_dask1
del video_data
del product_data
del fan_trend

print(merged_dask2.head())

4.保存文件

In [None]:
#将多分区整合为单一文件，提高后续阅读效率
merged_dask2.to_csv(file_path+'\with_sales.csv',index=False,single_file=True)