# _CommonCrawl日本語データの分割抽出

 このNotebookは, CommonCrawl日本語データ分割抽出を行うものです.

GENIACプロジェクトのTeam Hatakeyama(仮)ではCommonCrawlのデータを

質良く加工して日本語LLMの学習に役立つコーパス作成を進めています.

そのコーパス作成で問題となるポイントとして,

CommonCrawlのデータが多すぎることがあります(なんと100TB程度!).


 多くの言語が集まるデータから日本語のデータのみを抜く工程

でもデータ数の多さからチームメンバーだけで行うことは困難です.

そのため, チーム内外でCommonCrawlからの日本語データを分割で抽出し,

最終的に統合するということを目指しています.


## 手順 (Google Colab)..

今回CommonCrawlにおける90000個のアーカイブデータ(warc)を分割で処理します.

90000個を10個ずつのバッチに分けて処理, その結果をGoogleDriveに格納することで,

90000個すべての処理を目指します.

そして, その上で行っていただきたいのは,

1)このセル以降のセルを上から順番に実行していく.

2)最後のセルにあるbatch_numberを変更して, セルを実行
https://colab.research.google.com/drive/1Gq8HQ0iyASH5iOAkosclJEQTwYJvYRmy#scrollTo=UawI0uZgAjz6&line=3&uniqifier=1

3)ファイルメニューに現れる/submit/に保存されている{batch_number}.gzファイルを
ダウンロード

4)バッチの処理が終わったことをgenaic slackなどで共有いただき, アップロードするDriveの場所の指示を受けてください.

60分程度実行すると, 1バッチ終了します.


### 手順 (個別環境 linux)

1)このGoogle Colab Notebookをダウンロードしてください.

2)jupyterが使用できるpython環境で実行してください.

In [None]:
#for Google Colab
import sys
from google.colab import drive
from IPython.display import Javascript
display(Javascript('''google.colab.output.setIframeHeight(0, true, {maxHeight: 100})'''))

drive.mount('/content/drive')

###---- ROOT_PATH を記載 ----###
ROOT_PATH = '/content/drive/MyDrive/ColabNotebooks/work/GENIAC/'
sys.path.append(ROOT_PATH)

## Environment

pythonのライブラリをインストールします.

※初回だけ実行してください

In [None]:
!pip install warcio
!pip install beautifulsoup4
!pip install trafilatura

### Function - File Utils

今回の処理に必要な関数(ファイル処理関係)をインストールします.

※初回だけ実行してください

In [None]:
import os
import glob
import gzip
import shutil
import requests

base_url = "https://data.commoncrawl.org/"
os.makedirs("data/gz", exist_ok=True)
os.makedirs("data/warc", exist_ok=True)

def download_file(url, save_path):

    response = requests.get(url, stream=True)

    if response.status_code == 200:
        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=128):
                f.write(chunk)
        print(f"ファイルが正常にダウンロードされました: {save_path}")
    else:
        print(f"ファイルのダウンロードに失敗しました。ステータスコード: {response.status_code}")


def decompress_gz(gz_path, output_path, remove_gz=True, fill_blank_gz=False):
    with gzip.open(gz_path, 'rb') as f_in:
        with open(output_path, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    print(f"{gz_path}が解凍され、{output_path}に保存されました。")
    if remove_gz:
        os.remove(gz_path)

    if fill_blank_gz:
        with open(gz_path, 'w') as f:
            f.write("")

def get_cc_path_list(path_dir="data/path_list/*"):
    path_list = []
    for file_path in glob.glob(path_dir):
        print(file_path)
        with open(file_path, "r") as f:
            temp_path_list = f.readlines()

        temp_path_list = [path.strip() for path in temp_path_list]

        path_list += temp_path_list

    return path_list


def cc_path_to_urls(cc_path):
    url = base_url+cc_path
    filename = cc_path.replace("/", "_")
    gz_path = f"data/gz/{filename}"
    warc_path = f"data/warc/{filename}".replace(".gz", "")

    return url, gz_path, warc_path


def download_warc_file(path):
    url, gz_path, warc_path = cc_path_to_urls(path)

    if os.path.exists(warc_path):
        print(f"warc_pathにはファイルが存在しています")
        return warc_path
    try:
        if os.path.exists(gz_path):
            print(f"gz_pathがすでに存在します: {gz_path}")
        else:
            print("downloading "+url)
            download_file(url, gz_path)
        print("decompressing "+gz_path)
        decompress_gz(gz_path, warc_path,
                      remove_gz=False, fill_blank_gz=True)
        return warc_path
    except Exception as e:
        print(e)
        print("fail loading "+url)
        return warc_path


### Function - Process Utils

今回の処理に必要な関数(warcファイルの処理)をインストールします.

※初回だけ実行してください

In [None]:
from warcio.archiveiterator import ArchiveIterator
from bs4 import BeautifulSoup
from tqdm import tqdm
import json
import glob
import os
from trafilatura import fetch_url, extract

def halfwidth_ratio(s):
    if len(s) == 0:  # 空の文字列の場合は0を返す
        return 0
    halfwidth_count = sum(
        1 for char in s
        if '\u0020' <= char <= '\u007E' or  # 基本的なASCII範囲
           '\uFF61' <= char <= '\uFF9F' or  # 半角カタカナ
           char in ('\u0009', '\u000A', '\u000D')  # タブ、改行、復帰
    )
    return halfwidth_count / len(s)


def pre_clean(soup):
    texts_with_tags = []
    for tag in soup.find_all(True):
        # 特定のタグを除外する場合
        # if tag.name not in ['html', 'body', 'ul']:
        text = tag.get_text(separator="\n", strip=True)
        spl_text = text.split("\n")
        spl_text = [i.strip() for i in spl_text if i.strip()]  # 空の文字列を除外
        for item in spl_text:
            if tag.name == "script" or tag.name == "style":
                continue
            texts_with_tags.append((item, tag.name))  # テキストとタグの名前をタプルとして追加
    return texts_with_tags


def extract_japanese_from_warc(path,
                               save_dir="json",
                               max_num=10**10,
                               ):
    ja_soup_list = []
    path = path.replace("\\", "/")  # for windows env
    filename = path.split("/")[-1].replace(".warc", ".json")
    if os.path.exists(f"{save_dir}/{filename}"):
        print("already done")
        return
    # 途中から再開する用の位置情報の取得
    if len(ja_soup_list) > 0:
        fin_record_id = ja_soup_list[-1]["record_id"]
    else:
        fin_record_id = 0
    # WARCファイルを開く
    record_id = 0
    with open(path, 'rb') as stream:
        for record in tqdm(ArchiveIterator(stream)):
            record_id += 1
            if record_id <= fin_record_id:
                continue
            if record.rec_type == 'response':
                if record.http_headers.get_header('Content-Type') == 'text/html':
                    content = record.content_stream().read()
                    soup = BeautifulSoup(content, 'html.parser')
                    # <html>タグからlang属性を取得
                    html_tag = soup.find('html')
                    if html_tag and html_tag.has_attr('lang'):
                        lang = html_tag['lang']
                        texts = pre_clean(soup)

                        if len(texts) == 0:
                            continue
                        if lang == "ja":
                            if soup.title is not None:
                                title = soup.title.string
                            else:
                                title = ""
                            texts = extract(content, include_tables=False,target_lang='ja',favour_precision=True) #trafilaturaでテキスト抽出
                            print(texts)
                            d = {
                                "record_id": record_id,
                                "url": record.rec_headers.get_header('WARC-Target-URI'),
                                "title": title,
                                "timestamp": record.rec_headers.get_header('WARC-Date'),
                                "text": texts,
                            }
                            ja_soup_list.append(d)
                        if len(ja_soup_list) > max_num:
                            break
    return ja_soup_list

def download_and_parse(cc_path, base_dir=None):
    # warcファイルのダウンロード
    warc_path = download_warc_file(cc_path)
    # ファイル関連の処理
    os.makedirs(base_dir, exist_ok=True)
    # パス関連の処理
    file_name = os.path.basename(warc_path)
    base_name = os.path.splitext(file_name)[0]
    file_base_name = "_".join(base_name.split("_")[2:])
    if base_dir is None:
        base_dir = "/tmp/"
    save_gz_path = f"{base_dir}/{file_base_name}_japanese.json.gz"
    try:
        tag_records = extract_japanese_from_warc(warc_path)
        is_error = False
        error_text = ""
    except Exception as e:
        tag_records = []
        is_error = True
        print(e)
        error_text = str(e)
    # 保存用のdictを作製
    save_dict = {
      "tag_records" : tag_records,
      "is_error" : is_error,
      "cc_path" : cc_path,
      "warc_path" : warc_path,
      "error_text" : error_text
    }
    with gzip.open(save_gz_path, 'wt', encoding="utf-8") as zipfile:
       json.dump(save_dict, zipfile, indent=2, ensure_ascii=False)
    return

def curation(batch_number, submit_dir="/content/submit", is_debug=False):
    cc_path_list = get_cc_path_list()
    if is_debug:
        n_batch = 3
    else:
        n_batch = 10
    start_idx, end_idx = batch_number * n_batch, (batch_number+1) * n_batch
    target_path_list  = cc_path_list[start_idx:end_idx]
    for cc_path in tqdm(target_path_list):
        download_and_parse(cc_path, f"process/batch{batch_number}")
    shutil.make_archive(f'{submit_dir}/{batch_number}',
                        format='zip', root_dir=f"process/batch{batch_number}")

    shutil.rmtree("process/")


## Data Preprocess

### Step1 Download CommonCrawl Paths

CommonCrawlにおけるアーカイブ(warc)が保存されているパスの文字列が

圧縮されたファイル(gz)で保存されている.

このファイルをダウンロードして, 解凍, data/data_list配下に保存する


In [None]:
"""
download path list from commoncrawl
"""
# Parameter
# 今回処理するwarcのパスリストが圧縮されているURL
# CC-MAIN-2023-50以外にも存在するが, 一旦このURLのみで行う
path_urls = [
    "https://data.commoncrawl.org/crawl-data/CC-MAIN-2023-50/warc.paths.gz",
]
# パスリストをダウンロードするフォルダの作成
os.makedirs("data", exist_ok=True)
os.makedirs("data/path_list", exist_ok=True)

# Process
# Parameterで指定したURLからパス(gz)をダウンロードし,解凍する
for url in path_urls:
    file_name = url.split("/")[-2]+".gz"
    try:
        # パスリストが格納されているgzファイルをdata_list配下に保存
        download_file(url, f"data/path_list/{file_name}")
        # 保存されたgzファイルを解凍する
        decompress_gz(f"data/path_list/{file_name}",
                      f"data/path_list/{os.path.splitext(file_name)[0]}")
    except:
        pass


### step2 Check Step1 Result

warcのパスの文字列が保存されているかを確認する

※step1で異常があった際に実行してください

In [None]:
# Process
# 保存されているwarcファイルのパスのリストを取得
cc_paths = get_cc_path_list(path_dir="data/path_list/*")
# 表示
cc_paths

### Step3 : Download Warc and Extract Japanese Site Data

warc.gzファイルのURI(その一部)を元にwarc.gzファイルをダウンロード/解凍(->warc)する.

warcファイルを読み込み, 内部に保存されている日本語サイトのデータのみを抽出する.

抽出した結果をjson.gz形式で圧縮して保存.

warcファイルは90000個ほど存在するため, この一部のみを処理していく必要あり,

そのため, 処理するファイルのパスをいくつかのbatchに分けている.

このbatchの番号(batch_number)を指定し, そのbatchにおける日本語ページを取得

取得した結果をまとめたzipファイルがsubmit/{batch_number}.zipに保存される.

保存されている内容を指定のGoogleDriveに配置ください

---

#### 行っていただく内容

- is_debugをTrueにして動くかを確認 (初回のみ)

- batch_numberを変更 (取りくむbatchをご指定ください)

- is_debugをFalseにしてデータ抽出/加工スタート

#### 2024/03/04 更新

結果のzipファイルを自動的にダウンロードするように変更いたしました


In [None]:
from google.colab import files
# 処理結果を自動的にダウンロードするように変更

start=500
end=500

for batch_number in range(start, end+1):
    is_debug = False

    # 保存用ディレクトリの指定
    # submit_dir = "submit"
    # もしdriveがマウントできれば,上のsubmit_dirをコメントアウト, 以下をコードにしてください.
    submit_dir = ROOT_PATH + "submit"

    # batchの番号に従って,データの処理
    curation(batch_number, submit_dir=submit_dir, is_debug=is_debug)

    # ファイルのダウンロード
    # files.download(f"./submit/{batch_number}.zip")

