In [14]:
import os
import re
from typing import Dict
import json
import requests
import tarfile
import io

import argparse
from functools import partial
from typing import Any

from nemo_curator.download.doc_builder import (
    DocumentDownloader,
    DocumentExtractor,
    DocumentIterator,
)

import nemo_curator as nc
from nemo_curator import ScoreFilter, Sequential
from nemo_curator.filters import RepeatingTopNGramsFilter, WordCountFilter
from nemo_curator.datasets import DocumentDataset
from nemo_curator.modifiers.pii_modifier import PiiModifier
from nemo_curator.modifiers.unicode_reformatter import UnicodeReformatter
from nemo_curator.modules.modify import Modify
from nemo_curator.utils.distributed_utils import get_client
from nemo_curator.utils.script_utils import add_distributed_args



In [76]:
class LivedoorCorpusDownloader(DocumentDownloader):
    def __init__(self, download_dir: str):
        super().__init__()

        if not os.path.isdir(download_dir):
            os.makedirs(download_dir)

        self._download_dir = download_dir
        print("Download directory: ", self._download_dir)

    def download(self, url: str) -> str:
        filename = os.path.basename(url)
        output_file = os.path.join(self._download_dir, filename)

        if os.path.exists(output_file):
            print(f"File '{output_file}' already exists, skipping download.")
            return output_file

        print(f"Downloading dataset from '{url}'...")
        response = requests.get(url)

        with tarfile.open(fileobj=io.BytesIO(response.content)) as tarf:
            tarf.extractall(self._download_dir, members=tarf)

        return output_file


class LivedoorCorpusIterator(DocumentIterator):

    def __init__(self):
        super().__init__()
        self._counter = -1


    def iterate(self, file_path):
        self._counter = -1
        file_name = os.path.basename(file_path)

        with open(file_path, "r", encoding="utf-8") as file:
            lines = file.readlines()
            
            content = ''.join(lines[3:]).strip()
            if "関連リンク" in content:
                content_split = content.split("関連リンク",1)
                content = content_split[0].strip("■").strip()
                related_link = content_split[1]
            
            else:
                related_link = ""

            self._counter += 1
            meta = {
                "filename": file_name,
            }
            content = {
                "url": lines[0].strip(),
                "date": lines[1].strip(),
                "title":lines[2].strip(),
                "content":content,
                "related link":related_link
            }
            
            #print(content)
            

            record = {**meta, **content}
            return record

def download_and_convert_to_jsonl() -> str:
    """
    Downloads the emails dataset and converts it to JSONL format.

    Returns:
        str: The path to the JSONL file.
    """

    # Download the dataset in raw format and convert it to JSONL.
    downloader =  LivedoorCorpusDownloader(DATA_DIR)
    output_path = os.path.join(DATA_DIR, "livedoor.jsonl")
    raw_fp = downloader.download(DATASET_URL)
    
    child_dir = DATA_DIR + "/text/"
    
    directories = [d for d in os.listdir(child_dir) if d not in ["CHANGES.txt", "README.txt"]]
    iterator = LivedoorCorpusIterator()

    
    # Parse the raw data and write it to a JSONL file.
    with open(output_path, "w") as f:
        for directory in directories:
            files = [f for f in os.listdir(child_dir+directory) if f not in ["LICENSE.txt"]]
            
            for file_path in files:
                abs_file_path = child_dir+directory+"/"+file_path
                record = iterator.iterate(abs_file_path)
                json_record = json.dumps(record, ensure_ascii=False)
                f.write(json_record + "\n")

    return output_path



def run_curation_pipeline(args:Any, jsonl_fp: str) -> str:
    """
    Run the curation pipeline on the dataset.

    Args:
        args (Any): Command-line arguments.
        jsonl_fp (str): The path to the uncurated JSONL file.

    Returns:
        str: The path to the curated JSONL file.
    """
    client = get_client(args, args.device)
    print(f"    Running the curation pipeline on '{jsonl_fp}'...")
    orig_dataset = DocumentDataset.read_json(jsonl_fp, add_filename=True)
    dataset = orig_dataset
    #print(type(orig_dataset))
    #print(len(orig_dataset.df))
    
    curation_steps = Sequential(
        [
            #
            # Unify the text encoding to Unicode.
            #
            Modify(UnicodeReformatter(), text_field="title"),
            Modify(UnicodeReformatter(), text_field="content"),
            Modify(UnicodeReformatter(), text_field="related link"),
            #
            nc.Score(
                WordCountFilter(min_words=80).score_document,
                text_field="content",
                score_field="word_count",
                score_type=int,
            )
            
        ]
    )

    dataset = curation_steps(dataset)
    dataset = dataset.persist()
    
    print(type(orig_dataset))
    print(orig_dataset.df)

    print(f"    Original dataset length: {len(orig_dataset.df)}")
    print(f"    After running the curation pipeline: {len(dataset.df)}")
    print(f"    Writing to '{jsonl_fp}'...")
    out_path = os.path.join(
        os.path.dirname(jsonl_fp),
        "curated",
    )
    os.makedirs(out_path, exist_ok=True)
    dataset.to_json(out_path, write_to_filename=True)
    client.close()
    return os.path.join(out_path, os.path.basename(jsonl_fp))


def main():
    parser = argparse.ArgumentParser()
    parser = add_distributed_args(parser)
    args = parser.parse_args(args=[])
    # Limit the total number of workers to ensure we don't run out of memory.
    args.n_workers = min(args.n_workers, 1)

    # Prepare the download and JSONL directories.
    if not os.path.isdir(DATA_DIR):
        os.makedirs(DATA_DIR)

    jsonl_fp = download_and_convert_to_jsonl()
    print(jsonl_fp)
    run_curation_pipeline(args, jsonl_fp)


In [77]:
DATASET_URL = "https://www.rondhuit.com/download/ldcc-20140209.tar.gz"
DATA_DIR = "./livedoor_corpus"

In [78]:
main()

Download directory:  ./livedoor_corpus
Downloading dataset from 'https://www.rondhuit.com/download/ldcc-20140209.tar.gz'...
./livedoor_corpus/livedoor.jsonl


Perhaps you already have a cluster running?
Hosting the HTTP server on port 33881 instead


    Running the curation pipeline on './livedoor_corpus/livedoor.jsonl'...
Reading 1 files
<class 'nemo_curator.datasets.doc_dataset.DocumentDataset'>
Dask DataFrame Structure:
              content                       date filename related link   title     url word_count
npartitions=1                                                                                    
               object  datetime64[ns, UTC+09:00]   object       object  object  object      int64
                  ...                        ...      ...          ...     ...     ...        ...
Dask Name: assign, 13 graph layers
    Original dataset length: 7367
    After running the curation pipeline: 7367
    Writing to './livedoor_corpus/livedoor.jsonl'...
Writing to disk complete for 1 partitions
