In [51]:
!pip install transformers

[0m

In [52]:
!pip install ipywidgets widgetsnbextension pandas-profiling



[0m

In [53]:
!jupyter nbextension enable --py widgetsnbextension

Enabling notebook extension jupyter-js-widgets/extension...
      - Validating: [32mOK[0m


In [54]:
import tensorflow as tf
import collections
import json
import os
import pandas as pd
import csv
from transformers import DistilBertTokenizer

max_seq_length = 64

tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")

REVIEW_BODY_COLUMN = "review_body"
REVIEW_ID_COLUMN = "review_id"

LABEL_COLUMN = "star_rating"
LABEL_VALUES = [1, 2, 3, 4, 5]

In [55]:
label_map = {}
for (i, label) in enumerate(LABEL_VALUES):
    label_map[label] = i

In [56]:
print(label_map)

{1: 0, 2: 1, 3: 2, 4: 3, 5: 4}


In [57]:
class InputFeatures(object):
    """BERT特徴量ベクトル"""

    def __init__(self, input_ids, input_mask, segment_ids, label_id, review_id, date, label):
        self.input_ids = input_ids
        self.input_mask = input_mask
        self.segment_ids = segment_ids
        self.label_id = label_id
        self.review_id = review_id
        self.date = date
        self.label = label


In [58]:
class Input(object):
    """シーケンス分類で用いるトレーニング/テストの単一の入力"""

    def __init__(self, text, review_id, date, label=None):
        """入力のコンストラクタ
        Args:
          text: 文字列。トークン化されていない一つ目のシーケンスのテキスト。
            単一シーケンスのタスクではこのシーケンスのみを指定する。
          label: (オプショナル) 文字列。サンプルのラベル。トレーニングや検証用のサンプルでは指定する。
            テスト用のサンプルでは指定しない。
        """
        self.text = text
        self.review_id = review_id
        self.date = date
        self.label = label


In [59]:
def convert_input(the_input, max_seq_length):
    # まず、BERTが学習したデータ形式と合うようにデータを前処理する。
    # 1. テキストを小文字にする（BERT lowercaseモデルを用いる場合）
    # 2. トークン化する（例、"sally says hi" -> ["sally", "says", "hi"]）
    # 3. 単語をWordPieceに分割（例、"calling" -> ["call", "##ing"]）
    #
    # この辺りの処理はTransformersライブラリのトークナイザーがまかなってくれます。

    tokens = tokenizer.tokenize(the_input.text)
    tokens.insert(0, '[CLS]')
    tokens.append('[SEP]')
    print("**{} tokens**\n{}\n".format(len(tokens), tokens))

    encode_plus_tokens = tokenizer.encode_plus(
        the_input.text,
        pad_to_max_length=True,
        max_length=max_seq_length,
        truncation=True
    )
    
    # 事前学習済みBERTの語彙ID。トークンを表す。（トークン数が `max_seq_length` 未満であれば0をパディングする）
    input_ids = encode_plus_tokens["input_ids"]

    # BERTがどのトークンに注目するかを0/1で指定。`input_ids` のパディング部分のベクトル要素には0を割り当てる。
    input_mask = encode_plus_tokens["attention_mask"]

    # テキスト分類のような単一シーケンスのタスクではセグメントIDは常に0とする。質問回答や次文予測のような2シーケンスタスクの場合は1を割り当てる。
    segment_ids = [0] * max_seq_length

    # それぞれのトレーニングデータの行のラベル（`star_rating` 1〜5）
    label_id = label_map[the_input.label]

    features = InputFeatures(
        input_ids=input_ids,
        input_mask=input_mask,
        segment_ids=segment_ids,
        label_id=label_id,
        review_id=the_input.review_id,
        date=the_input.date,
        label=the_input.label,
    )

    print("**{} input_ids**\n{}\n".format(len(features.input_ids), features.input_ids))
    print("**{} input_mask**\n{}\n".format(len(features.input_mask), features.input_mask))
    print("**{} segment_ids**\n{}\n".format(len(features.segment_ids), features.segment_ids))
    print("**label_id**\n{}\n".format(features.label_id))
    print("**review_id**\n{}\n".format(features.review_id))
    print("**date**\n{}\n".format(features.date))
    print("**label**\n{}\n".format(features.label))

    return features


In [60]:
def transform_inputs_to_tfrecord(inputs, output_file, max_seq_length):
    # データをBERTが理解できるフォーマットに変換する
    records = []
    tf_record_writer = tf.io.TFRecordWriter(output_file)

    for (input_idx, the_input) in enumerate(inputs):
        if input_idx % 10000 == 0:
            print("Writing input {} of {}\n".format(input_idx, len(inputs)))

        features = convert_input(the_input, max_seq_length)

        all_features = collections.OrderedDict()

        # input_ids、input_mask、segment_ids、label_idsを含んだTFRecordを作成
        all_features["input_ids"] = tf.train.Feature(int64_list=tf.train.Int64List(value=features.input_ids))
        all_features["input_mask"] = tf.train.Feature(int64_list=tf.train.Int64List(value=features.input_mask))
        all_features["segment_ids"] = tf.train.Feature(int64_list=tf.train.Int64List(value=features.segment_ids))
        all_features["label_ids"] = tf.train.Feature(int64_list=tf.train.Int64List(value=[features.label_id]))

        tf_record = tf.train.Example(features=tf.train.Features(feature=all_features))
        tf_record_writer.write(tf_record.SerializeToString())

        # Feature Storeに格納する、すべての特徴量を含んだレコードを作成
        records.append(
            {
                "input_ids": features.input_ids,
                "input_mask": features.input_mask,
                "segment_ids": features.segment_ids,
                "label_id": features.label_id,
                "review_id": the_input.review_id,
                "date": the_input.date,
                "label": features.label,
            }
        )

    tf_record_writer.close()

    return records

In [61]:
from datetime import datetime
from time import strftime

# timestamp = datetime.now().replace(microsecond=0).isoformat()
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
print(timestamp)

2024-01-07T12:05:09Z


In [62]:
import pandas as pd

data = [
    [
        5,
        "ABCD12345",
        """I needed an "antivirus" application and know the quality of Norton products.  This was a no brainer for me and I am glad it was so simple to get.""",
    ],
    [
        3,
        "EFGH12345",
        """The problem with ElephantDrive is that it requires the use of Java. Since Java is notorious for security problems I haveit removed from all of my computers. What files I do have stored are photos.""",
    ],
    [
        1,
        "IJKL2345",
        """Terrible, none of my codes worked, and I can't uninstall it.  I think this product IS malware and viruses""",
    ],
]

df = pd.DataFrame(data, columns=["star_rating", "review_id", "review_body"])

# Input クラスを使用して、データからサンプルを作成する。
inputs = df.apply(
    lambda x: Input(label=x[LABEL_COLUMN], text=x[REVIEW_BODY_COLUMN], review_id=x[REVIEW_ID_COLUMN], date=timestamp),
    axis=1,
)

In [63]:
df

Unnamed: 0,star_rating,review_id,review_body
0,5,ABCD12345,"I needed an ""antivirus"" application and know t..."
1,3,EFGH12345,The problem with ElephantDrive is that it requ...
2,1,IJKL2345,"Terrible, none of my codes worked, and I can't..."


In [64]:
inputs

0    <__main__.Input object at 0x7f0b2fb4cbe0>
1    <__main__.Input object at 0x7f0b2fb4de10>
2    <__main__.Input object at 0x7f0b2fb4d780>
dtype: object

In [65]:
# date が Feature Store の仕様に合わせて ISO-8601 になっていることを確認
print(inputs[0].date)

2024-01-07T12:05:09Z


In [66]:
output_file = "./data.tfrecord"

In [67]:
records = transform_inputs_to_tfrecord(inputs, output_file, max_seq_length)

Writing input 0 of 3

**37 tokens**
['[CLS]', 'i', 'needed', 'an', '"', 'anti', '##virus', '"', 'application', 'and', 'know', 'the', 'quality', 'of', 'norton', 'products', '.', 'this', 'was', 'a', 'no', 'brain', '##er', 'for', 'me', 'and', 'i', 'am', 'glad', 'it', 'was', 'so', 'simple', 'to', 'get', '.', '[SEP]']

**64 input_ids**
[101, 1045, 2734, 2019, 1000, 3424, 23350, 1000, 4646, 1998, 2113, 1996, 3737, 1997, 10770, 3688, 1012, 2023, 2001, 1037, 2053, 4167, 2121, 2005, 2033, 1998, 1045, 2572, 5580, 2009, 2001, 2061, 3722, 2000, 2131, 1012, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

**64 input_mask**
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

**64 segment_ids**
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0



In [68]:
import pandas as pd

In [69]:
df = pd.read_parquet("/mnt/amazon_reviews_2015.snappy.parquet",columns=["star_rating","review_id","review_body"])

In [70]:
df

Unnamed: 0,star_rating,review_id,review_body
0,5,b'R2C20GSMIOZYVP',b'I have made multiple purchases of this prosc...
1,3,b'RPI30SPP1J9U9',"b""I am not sure if it's a product or storage p..."
2,1,b'RKYY2ZQGUV06L',"b""I was hoping this had a stronger taste than..."
3,5,b'RKYYAEA9G3CD4',b'Awesome Tea!'
4,4,b'R17ZQPU555KVR6',"b""This tasty spread tastes just like a melted ..."
...,...,...,...
41905626,5,b'R2341YPSNIJ3NB',"b""I got this case for my violin, as my old one..."
41905627,5,b'R34HOANGHY4878',b'just as excpected'
41905628,4,b'R3APW14Y9V4QOP',"b""It has ten really cool sounds, the beat step..."
41905629,5,b'R18BIAZS3JP0MI',b'very clear sound thank you amazon i recommen...


In [71]:
df['review_id'] = df['review_id'].str.decode("utf-8")

In [72]:
df['review_body'] = df['review_body'].str.decode("utf-8","ignore")

In [73]:
df

Unnamed: 0,star_rating,review_id,review_body
0,5,R2C20GSMIOZYVP,I have made multiple purchases of this prosciu...
1,3,RPI30SPP1J9U9,I am not sure if it's a product or storage pro...
2,1,RKYY2ZQGUV06L,I was hoping this had a stronger taste than r...
3,5,RKYYAEA9G3CD4,Awesome Tea!
4,4,R17ZQPU555KVR6,This tasty spread tastes just like a melted Re...
...,...,...,...
41905626,5,R2341YPSNIJ3NB,"I got this case for my violin, as my old one w..."
41905627,5,R34HOANGHY4878,just as excpected
41905628,4,R3APW14Y9V4QOP,"It has ten really cool sounds, the beat step w..."
41905629,5,R18BIAZS3JP0MI,very clear sound thank you amazon i recommende...


In [74]:
print(f"df Memory Usage: {df.memory_usage(deep=True).sum() / 1024**3} GB")

df Memory Usage: 13.552809612825513 GB


In [75]:
!pip install pandarallel

[0m

In [76]:
from pandarallel import pandarallel
import os
os.environ['JOBLIB_TEMP_FOLDER'] = '/tmp'

pandarallel.initialize(nb_workers=2, progress_bar=True, use_memory_fs=False)

INFO: Pandarallel will run on 2 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.


In [20]:
# Input クラスを使用して、データからサンプルを作成する。
#inputs = df.apply(
#    lambda x: Input(label=x[LABEL_COLUMN], text=x[REVIEW_BODY_COLUMN], review_id=x[REVIEW_ID_COLUMN], date=timestamp),
#    axis=1,
#)

In [45]:
#import os
#os.environ['JOBLIB_TEMP_FOLDER'] = '/tmp'
#!export TMPDIR=/var/tmp
#!pip3 install --no-cache-dir accelerate

In [None]:
#def func(x):
#    return lambda x: Input(label=x[LABEL_COLUMN], text=x[REVIEW_BODY_COLUMN], review_id=x[REVIEW_ID_COLUMN], date=timestamp)

inputs = df.parallel_apply(
    lambda x: Input(label=x[LABEL_COLUMN], text=x[REVIEW_BODY_COLUMN], review_id=x[REVIEW_ID_COLUMN], date=timestamp),
    axis=1
)

VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=20952816), Label(value='0 / 209528…

In [50]:
inputs

0    <__main__.Input object at 0x7f0b2fb4c0d0>
1    <__main__.Input object at 0x7f0b2fb4c2b0>
2    <__main__.Input object at 0x7f0b2fb4cbb0>
dtype: object

In [40]:
output_file = "./data.tfrecord"

In [42]:
# date が Feature Store の仕様に合わせて ISO-8601 になっていることを確認
print(inputs[0].date)

2024-01-07T09:30:01Z


In [43]:
records = transform_inputs_to_tfrecord(inputs, output_file, max_seq_length)

Writing input 0 of 41905631



TypeError: cannot use a string pattern on a bytes-like object

In [44]:
import re

my_bytes = b'apple,banana,kiwi'

m = re.search("apple", my_bytes.decode('utf-8'))


In [45]:
m

<re.Match object; span=(0, 5), match='apple'>