In [None]:
# !pip install pika datasets

In [None]:
from datasets import load_dataset
import datasets
dl_manager = datasets.DownloadManager()

# common iterator pattern

In [None]:

def iter_mode_1(dataset):
    for row in dataset:
        text = ""
        for data in row["段落"]:
            text += data["内容"]
        yield text
        
def iter_mode_qa(dataset):
    for row in dataset:
        text = f'{row["问"]}{row["答"]}'
        yield text

# publish tool
publish data to rabbitmq

In [None]:
import pika
import tqdm
import logging
import time

def setup_logging():
    formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
                                  datefmt='%Y-%m-%d %H:%M:%S')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger = logging.getLogger('my_module')
    logger.setLevel(logging.DEBUG)
    logger.addHandler(handler)
    return logger

logger = setup_logging()


def get_rabbit_channel():
    connection = pika.BlockingConnection(pika.URLParameters('amqp://user:password@hz.knogen.com/%2f'))
    connection.process_data_events(time_limit=5)
    pk_channel = connection.channel()
    # pk_channel.is_closed()
    return pk_channel


# 测试大小上限为 16GB
SIZE_LIMIT = 16 * 1024 * 1024 * 1024 

def publish_data(channel_name, iterator_func):
    
    pk_channel = get_rabbit_channel()
    pk_channel.queue_declare(queue=channel_name)
    file_size_current = 0
    for data in tqdm.tqdm(iterator_func()):
        while 1:
            try:
                file_size_current += len(data)
                pk_channel.basic_publish(exchange='', routing_key=channel_name, body=data)
                if file_size_current > SIZE_LIMIT:
                    return
                break
            except (pika.exceptions.ChannelClosed,pika.exceptions.ChannelWrongStateError,pika.exceptions.StreamLostError):
                logger.info("ChannelClosed")
                time.sleep(1)
                pk_channel = get_rabbit_channel()
            except Exception as e:
                logger.exception(e)
                break
    pk_channel.close()

# publish MNBVC

In [None]:
def get_mnbvc_data():
    dataset = load_dataset("liwu/MNBVC", 'gov_xuexiqiangguo', split='train', streaming=True,trust_remote_code=True)
    for i, row in enumerate(iter_mode_1(dataset)):
        if i < 3:
            print(i,row)
        yield row

publish_data("gov_xuexiqiangguo", get_mnbvc_data)

In [None]:
def get_mnbvc_data():
    dataset = load_dataset("liwu/MNBVC", 'gov_xuexiqiangguo', split='train', streaming=True,trust_remote_code=True)
    for i, row in enumerate(iter_mode_1(dataset)):
        if i < 1:
            print(i,row)
        yield row.encode("GBK",errors="ignore")

publish_data("gov_xuexiqiangguo_gbk", get_mnbvc_data)

In [None]:
def get_mnbvc_data():
    dataset = load_dataset("liwu/MNBVC", 'qa_zhihu', split='train', streaming=True,trust_remote_code=True)
    for i, row in enumerate(iter_mode_qa(dataset)):
        if i < 3:
            print(i,row)
        yield row

publish_data("qa_zhihu", get_mnbvc_data)

In [None]:
def get_mnbvc_data():
    dataset = load_dataset("liwu/MNBVC", 'qa_zhihu', split='train', streaming=True,trust_remote_code=True)
    for i, row in enumerate(iter_mode_qa(dataset)):
        if i < 1:
            print(i,row)
        yield row.encode("GBK",errors="ignore")

publish_data("qa_zhihu_gbk", get_mnbvc_data)

In [None]:
def get_mnbvc_data():
    dataset = load_dataset("liwu/MNBVC", 'qa_mfa', split='train', streaming=True,trust_remote_code=True)
    for i, row in enumerate(iter_mode_qa(dataset)):
        if i < 1:
            print(i,row)
        yield row

publish_data("qa_mfa", get_mnbvc_data)

In [None]:
def get_mnbvc_data():
    dataset = load_dataset("liwu/MNBVC", 'qa_mfa', split='train', streaming=True,trust_remote_code=True)
    for i, row in enumerate(iter_mode_qa(dataset)):
        if i < 1:
            print(i,row)
        yield row.encode("GBK",errors="ignore")

publish_data("qa_mfa_gbk", get_mnbvc_data)

In [None]:
def get_mnbvc_data():
    dataset = load_dataset("liwu/MNBVC", 'qa_chatgpt', split='train', streaming=True,trust_remote_code=True)
    for i, row in enumerate(iter_mode_qa(dataset)):
        if i < 3:
            print(i,row)
        yield row

publish_data("qa_chatgpt", get_mnbvc_data)

In [None]:
def get_mnbvc_data():
    dataset = load_dataset("liwu/MNBVC", 'crawler_oscar', split='train', streaming=True,trust_remote_code=True)
    for i, row in enumerate(iter_mode_1(dataset)):
        if i < 3:
            print(i,row)
        yield row

publish_data("crawler_oscar", get_mnbvc_data)

In [None]:
def get_poetry_data():
    dataset = load_dataset("erhwenkuo/poetry-chinese-zhtw", split="train")
    for i, row in enumerate(dataset):
        if row['category'] == "五代十國":
            text = f"{row['title']} {row['author']} {row['text']}"
            if i <3:
                print(text)
            yield text

publish_data("poetry_wdsg", get_poetry_data)

In [None]:
def get_poetry_data():
    dataset = load_dataset("erhwenkuo/poetry-chinese-zhtw", split="train")
    for i, row in enumerate(dataset):
        if row['category'] == "唐":
            text = f"{row['title']} {row['author']} {row['text']}"
            if i <3:
                print(text)
            yield text

publish_data("poetry_t", get_poetry_data)

In [None]:
def get_poetry_data():
    dataset = load_dataset("erhwenkuo/poetry-chinese-zhtw", split="train")
    for i, row in enumerate(dataset):
        if row['category'] == "宋":
            text = f"{row['title']} {row['author']} {row['text']}"
            if i <3:
                print(text)
            yield text

publish_data("poetry_s", get_poetry_data)

In [None]:
def get_poetry_data():
    dataset = load_dataset("erhwenkuo/poetry-chinese-zhtw", split="train")
    for i, row in enumerate(dataset):
        if row['category'] == "元":
            text = f"{row['title']} {row['author']} {row['text']}"
            if i <3:
                print(text)
            yield text

publish_data("poetry_y", get_poetry_data)

In [None]:
def get_poetry_data():
    dataset = load_dataset("erhwenkuo/poetry-chinese-zhtw", split="train")
    for i, row in enumerate(dataset):
        if row['category'] == "清":
            text = f"{row['title']} {row['author']} {row['text']}"
            if i <3:
                print(text)
            yield text

publish_data("poetry_q", get_poetry_data)

In [None]:
def get_poetry_data():
    dataset = load_dataset("Iess/chinese_modern_poetry", split="train")
    for i, row in enumerate(dataset):
        text = row["response"]
        if i <3:
            print(text)
        yield text

publish_data("poetry_modern", get_poetry_data)

# publish gene

In [None]:
import datasets
import gzip
dl_manager = datasets.DownloadManager()
from Bio import SeqIO

In [None]:
def filter_fn(char: str) -> str:
    """
    Transforms any letter different from a base nucleotide into an 'N'.
    """
    if char in {'A', 'T', 'C', 'G'}:
        return char
    else:
        return 'A'


def clean_sequence(seq: str) -> str:
    """
    Process a chunk of DNA to have all letters in upper and restricted to
    A, T, C, G and N.
    """
    seq = seq.upper()
    seq = map(filter_fn, seq)
    seq = ''.join(list(seq))
    return seq


In [None]:

def get_genoe():
    url = "https://ftp.ncbi.nlm.nih.gov/genomes/refseq/vertebrate_mammalian/Homo_sapiens/latest_assembly_versions/GCF_000001405.40_GRCh38.p14/GCF_000001405.40_GRCh38.p14_genomic.fna.gz"
    file = dl_manager.download(url)

    with gzip.open(file,'rt')as f:
        fasta_sequences = SeqIO.parse(f, 'fasta')

        for record in fasta_sequences:

            sequence, description = str(record.seq), record.description
            sequence = clean_sequence(sequence)
            seq_length = len(sequence)

            chunk_length = 1_000_000
            # split into chunks
            num_chunks = seq_length  // chunk_length

            # if num_chunks < 1:
            #     continue

            for i in range(num_chunks+1):
                start_pos = i * chunk_length
                end_pos = min(seq_length, (i+1) * chunk_length)
                chunk_sequence = sequence[start_pos:end_pos]
                yield chunk_sequence
        
publish_data("genoe_human", get_genoe)


In [None]:

def get_genoe():
    url = "https://ftp.ncbi.nlm.nih.gov/genomes/refseq/vertebrate_mammalian/Hylobates_moloch/latest_assembly_versions/GCF_009828535.3_HMol_V3/GCF_009828535.3_HMol_V3_genomic.fna.gz"
    file = dl_manager.download(url)

    with gzip.open(file,'rt')as f:
        fasta_sequences = SeqIO.parse(f, 'fasta')

        for record in fasta_sequences:

            sequence, description = str(record.seq), record.description
            sequence = clean_sequence(sequence)
            seq_length = len(sequence)

            chunk_length = 1_000_000
            # split into chunks
            num_chunks = seq_length  // chunk_length

            # if num_chunks < 1:
            #     continue

            for i in range(num_chunks +1):
                start_pos = i * chunk_length
                end_pos = min(seq_length, (i+1) * chunk_length)
                chunk_sequence = sequence[start_pos:end_pos]
                yield chunk_sequence
        
publish_data("genoe_cby", get_genoe)


In [None]:

def get_genoe():
    url = "https://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Robbsia_andropogonis/latest_assembly_versions/GCF_000566705.1_Ba3549/GCF_000566705.1_Ba3549_genomic.fna.gz"
    file = dl_manager.download(url)

    with gzip.open(file,'rt')as f:
        fasta_sequences = SeqIO.parse(f, 'fasta')

        for record in fasta_sequences:

            sequence, description = str(record.seq), record.description
            sequence = clean_sequence(sequence)
            seq_length = len(sequence)

            chunk_length = 1_000_000
            # split into chunks
            num_chunks = seq_length  // chunk_length

            # if num_chunks < 1:
            #     continue

            for i in range(num_chunks+1):
                start_pos = i * chunk_length
                end_pos = min(seq_length, (i+1) * chunk_length)
                chunk_sequence = sequence[start_pos:end_pos]
                yield chunk_sequence
        
publish_data("genoe_xj", get_genoe)


In [None]:

def get_genoe():
    url = "https://ftp.ncbi.nlm.nih.gov/genomes/refseq/fungi/Cladophialophora_bantiana/latest_assembly_versions/GCF_000835475.1_Clad_bant_CBS_173_52_V1/GCF_000835475.1_Clad_bant_CBS_173_52_V1_genomic.fna.gz"
    file = dl_manager.download(url)

    with gzip.open(file,'rt')as f:
        fasta_sequences = SeqIO.parse(f, 'fasta')

        for record in fasta_sequences:

            sequence, description = str(record.seq), record.description
            sequence = clean_sequence(sequence)
            seq_length = len(sequence)

            chunk_length = 1_000_000
            # split into chunks
            num_chunks = seq_length  // chunk_length

            # if num_chunks < 1:
            #     continue

            for i in range(num_chunks+1):
                start_pos = i * chunk_length
                end_pos = min(seq_length, (i+1) * chunk_length)
                chunk_sequence = sequence[start_pos:end_pos]
                yield chunk_sequence
        
publish_data("genoe_zj", get_genoe)


# 无理数

In [None]:
# pi upload 10B¶
import datasets
import zipfile

def read_zip_file(file):
    
    B = 1_000_000
    with zipfile.ZipFile(file,'r') as myzip:
        for fname in myzip.namelist():
            with myzip.open(fname,"r")as f:
                data = f.read()
                for i in range(0,10):
                    start = i*B
                    end = i*B + B
                    yield data[start:end]
                
def get_pi_text():
    for i in range(0,10):
        URL = f"https://files.pilookup.com/pi/{i*100_000_000 + 1}-{i*100_000_000+100_000_000}.zip"
        file = dl_manager.download(URL)
        yield from read_zip_file(file)
        

publish_data("num_pi", get_pi_text)

In [None]:
# EulersNumberE upload 1B
url= "https://archive.org/download/EulersNumberE7.5BillionDigits/part1.txt"
file = dl_manager.download(url)

def get_e_text():
    url= "https://archive.org/download/EulersNumberE7.5BillionDigits/part1.txt"
    file = dl_manager.download(url)
    B = 1_000_000
    with open(file,'rt')as f:
        data = f.read()
    
        for i in range(0,100):
            start = i*B + 2
            end = i*B + 2 + B
            yield data[start:end]
        

publish_data("num_e", get_e_text)



# shakespeares

download a zip file that contain multiple shakespeares works

website: https://www.folger.edu/explore/shakespeares-works/download/
download url: https://flgr.sh/txtfssAlltxt