# ccnet spark pipeline 实现

## 1. 导入依赖

In [1]:
from ccnet_spark.text_normalizer import normalize
from ccnet_spark.pipe_preprocess import load_segments
from ccnet_spark.pipe_hash import compute_hashes,split_doc2para
from ccnet_spark.pipe_lid import predictLang
from ccnet_spark.pipe_tokenized import doSentencePiece
from ccnet_spark.pipe_perplexity import doDocLM
from ccnet_spark.pipe_ppbucket import doPPBucket
from ccnet_spark.pipe_save import save_partation,load_partation
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import explode
from pyspark.sql.functions import sum as spark_sum

# 初始化 SparkSession
spark = SparkSession.builder.appName("CCNETSpark")  \
                    .config("spark.executor.memory", "64g") \
                    .config("spark.driver.memory", "32g") \
                    .config("spark.driver.maxResultSize", "32g") \
                    .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
                    .getOrCreate()

24/04/02 15:47:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/02 15:47:05 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


## 2. 参数配置

In [2]:
def getModePara(mode):
    if(mode=="test"):
        n_segments=10
        cache_folder="/root/wxl_folder/cache_data/"
        date="2019-09" ## hardcode ,现在只能是这个
        segments=[i for i in range(n_segments)]
        min_len=300
        isSample=True
        sampleRate=0.01
        num_partitions=1
    else:
        n_segments=4
        cache_folder="/root/wxl_folder/cache_data/"
        date="2019-09" ## hardcode ,现在只能是这个
        segments=[i for i in range(n_segments)]
        min_len=300
        isSample=False
        sampleRate=1
        num_partitions=4
    return [cache_folder,date,segments,min_len,isSample,sampleRate,num_partitions]
mode="test"
cache_folder,date,segments,min_len,isSample,sampleRate,num_partitions=getModePara(mode)

## 2.1 读取文件数据，处理成spark DataFrame

In [None]:
s=time.time()
spark_df=load_segments(spark,segments,cache_folder,date=date,isSample=isSample,sampleRate=sampleRate,min_len=min_len)
# spark_df = spark_df.repartition(num_partitions, "cc_segment")  # 使用哈希分区，"column_name" 是分区键
e=time.time()
print(f"load {len(segments)} segments,time consume:{e-s}s")
if(mode=="test"):
    doc_count=spark_df.count()
    print(f"load {doc_count} docs from:{len(segments)} segments")

2024-04-02 15:47 INFO 759634:root - load segment 0, with sampleRate:1.0%,min_len:300,with date:2019-09
2024-04-02 15:47 INFO 759634:root - load segment 1, with sampleRate:1.0%,min_len:300,with date:2019-09
2024-04-02 15:47 INFO 759634:root - load segment 2, with sampleRate:1.0%,min_len:300,with date:2019-09
2024-04-02 15:47 INFO 759634:root - load segment 3, with sampleRate:1.0%,min_len:300,with date:2019-09
2024-04-02 15:47 INFO 759634:root - load segment 4, with sampleRate:1.0%,min_len:300,with date:2019-09
2024-04-02 15:47 INFO 759634:root - load segment 5, with sampleRate:1.0%,min_len:300,with date:2019-09
2024-04-02 15:47 INFO 759634:root - load segment 6, with sampleRate:1.0%,min_len:300,with date:2019-09
2024-04-02 15:47 INFO 759634:root - load segment 7, with sampleRate:1.0%,min_len:300,with date:2019-09
2024-04-02 15:47 INFO 759634:root - load segment 8, with sampleRate:1.0%,min_len:300,with date:2019-09
2024-04-02 15:47 INFO 759634:root - load segment 9, with sampleRate:1.0%,

load 10 segments,time consume:3.3463284969329834s


## 3 字段分析
1. wet 文件本身带有长度："length": length,这个是从wet的"Content-Length:"读出来的，和我计算len(raw_content）有出入。考虑原因是原先的length不只是说raw_content，还包括title等。

In [None]:
if(mode=="test"):
    print("=== TestMode Log:")
    s=time.time()
    print(spark_df.summary())
    tmp_df = spark_df.withColumn("compute_length", F.length(spark_df["raw_content"]))
    tmp_df.select("url","length","raw_content","title","nlines","compute_length").show(5)
    e=time.time()
    print(f"time consume:{e-s}s")

### 3.1 修改length

In [None]:
spark_df=spark_df.withColumn("length", F.length(spark_df["raw_content"]))

## 4. hash计算

### 4.2 udf 处理添加新字段

In [None]:
# 假设spark_df是您的DataFrame
# 使用UDF对raw_content字段进行处理
split_result = spark_df.withColumn("split_content", split_doc2para(spark_df["raw_content"]))
if(mode=="test"):
    print("=== TestMode Log:")
    s=time.time()
    print(split_result.summary())
    split_result.select("url","length","nlines","raw_content","split_content").show(5)
    e=time.time()
    print(f"time consume:{e-s}s")

### 4.3 将新字段展开获取paragraph级别row

In [None]:
exploded_df=split_result.withColumn("exploded_content", explode(split_result.split_content))
exploded_df = exploded_df.withColumn("raw_line_id", exploded_df.exploded_content.raw_line_id) \
                         .withColumn("raw_line", exploded_df.exploded_content.raw_line) \
                         .drop("exploded_content")
if(mode=="test"):
    print("=== TestMode Log:")
    s=time.time()
    print(exploded_df.summary())
    exploded_df.select("url","raw_content","raw_line_id","raw_line").show(5)
    e=time.time()
    print(f"time consume:{e-s}s")

### 4.4 添加hash 列

In [None]:
# Assuming you have a dataframe named 'df' with a 'raw_line' column
hash_df = exploded_df.withColumn("hash_value", compute_hashes(exploded_df.raw_line))

# Show the resulting dataframe
if(mode=="test"):
    print("=== TestMode Log:")
    s=time.time()
    print(hash_df.summary())
    hash_df.show(5)
    e=time.time()
    print(f"time consume:{e-s}s")

### 4.5根据 hash 去重

In [None]:
deduplicated_df = hash_df.dropDuplicates(['hash_value'])
# Show the resulting dataframe
if(mode=="test"):
    print("=== TestMode Log:")
    s=time.time()
    print(deduplicated_df.summary())
    deduplicated_df.select("url","length","nlines","raw_content","raw_line_id","hash_value").show(5)
    e=time.time()
    print(f"time consume:{e-s}s")

### 4.6 聚合
将段落重新聚合为doc

In [None]:
group_df = deduplicated_df.groupBy("digest").agg(
    F.first("url").alias("url"),
    F.first("date_download").alias("date_download"),
    F.first("source_domain").alias("source_domain"),
    F.first("cc_segment").alias("cc_segment"),
    F.first("length").alias("original_length"),
    F.first("nlines").alias("original_nlines"),
    F.first("title").alias("title"),
    F.concat_ws("\n", F.collect_list("raw_line").alias("raw_content")).alias("raw_content"),
    F.count("raw_line_id").alias("nlines"),
    F.collect_list("raw_line_id").alias("line_ids"),
)
group_df=group_df.withColumn("length", F.length(group_df["raw_content"]))
if(mode=="test"):
    print("=== TestMode Log:")
    group_df.cache()
    s=time.time()
    group_df.select("url","original_length","original_nlines","raw_content","length","nlines").show(5)
    e=time.time()
    print(f"time consume:{e-s}s")

### 4.7 计算留存比例

In [None]:
if(mode=="test"):
    print("=== TestMode Log:")
    s = time.time()
    origin_chars = spark_df.agg(spark_sum("length")).collect()[0][0]
    remain_chars = group_df.agg(spark_sum("length")).collect()[0][0]
    e = time.time()
    print(f"origin chars:{origin_chars/1000/1000}M,remain_chars:{remain_chars/1000/1000}M \n \
            keep chars:{round(remain_chars/origin_chars*100,3)} % time consume:{e-s}")
else:
    print("=== DevMode Log:")
    s = time.time()
    origin_chars = spark_df.agg(spark_sum("length")).collect()[0][0]
    remain_chars = group_df.agg(spark_sum("length")).collect()[0][0]
    e = time.time()
    print(f"origin chars:{origin_chars/1000/1000}M,remain_chars:{remain_chars/1000/1000}M \n \
            keep chars:{round(remain_chars/origin_chars*100,3)} % time consume:{e-s}s")

## 5. 语言识别导入

In [None]:
lang_df = group_df.withColumn("lang_score", predictLang("raw_content"))
lang_df = lang_df.withColumn("lang", lang_df.lang_score.lang) \
                         .withColumn("score", lang_df.lang_score.score) \
                         .drop("lang_score")
if(mode=="test"):
    print("=== TestMode Log:")
    s = time.time()
    lang_df.select("url","raw_content","lang","score").show(5)
    e = time.time()
    print(f"time consume:{e-s}s")

## 6. MultiSentencePiece 分词

In [None]:
lm_df = lang_df.withColumn("tokenized", doSentencePiece("raw_content","lang"))
if(mode=="test"):
    print("=== TestMode Log:")
    s = time.time()
    lm_df.select("url","raw_content","lang","score","tokenized").show(5)
    e = time.time()
    print(f"time consume:{e-s}s")

## 7. 困惑度

In [None]:
doclm_df = lm_df.withColumn("perplexity", doDocLM("tokenized","lang"))
if(mode=="test"):
    print("=== TestMode Log:")
    s = time.time()
    doclm_df.select("url","raw_content","lang","score","tokenized","perplexity").show(5)
    e = time.time()
    print(f"time consume:{e-s}s")

## 8. PerplexityBucket

In [None]:
bucket_df = doclm_df.withColumn("bucket", doPPBucket("perplexity","lang"))
if(mode=="test"):
    print("=== TestMode Log:")
    s = time.time()
    bucket_df.select("url","raw_content","lang","score","tokenized","perplexity","bucket").show(50)
    e = time.time()
    print(f"time consume:{e-s}s")

## 9. dropKeys

In [None]:
drop_df = bucket_df.drop("tokenized")
if(mode=="test"):
    print("=== TestMode Log:")
    s = time.time()
    print(drop_df.summary())
    e = time.time()
    print(f"time consume:{e-s}s")

## 10. split by lang:save & load partation

In [None]:
save_partation(drop_df,cache_folder,date,isSample,sampleRate,min_len)
selected_bucket="head"
selected_lang="en"
df_en_head=load_partation(spark,selected_lang,selected_bucket,cache_folder,date,isSample,sampleRate,min_len)
df_en_head.select("url","raw_content","perplexity","length","cc_segment").show()
print(df_en_head.count())