In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import split
from pyspark.sql.functions import udf, col, current_timestamp
from pyspark.sql.types import MapType, StringType, ArrayType, IntegerType, StructType, StructField
from transformers import AutoTokenizer
import torch
import boto3
from io import BytesIO
from transformers import T5Tokenizer, T5ForConditionalGeneration
import spacy

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
s3_bucket = 'reddit-tifu'
AWS_ACCESS_KEY_ID = '**'
AWS_SECRET_ACCESS_KEY = '**'

In [3]:
def create_spark(appname, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY):
    conf = SparkConf()
    
    conf.setAll([
        ("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4"),
        ("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID),
        ("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
    ])
    
    spark = SparkSession.builder \
        .master("local[*]") \
        .config(conf=conf) \
        .appName(appname) \
        .getOrCreate()
    return spark

In [4]:
spark = create_spark("data prep", AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

:: loading settings :: url = jar:file:/home/ec2-user/.local/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-749d7950-f52e-4498-a0ae-5fa9cbd4125d;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 228ms :: artifacts dl 6ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	-----------------------------

In [5]:
df = spark.read.parquet(f"s3a://{s3_bucket}/subreddits_train_data.parquet")

25/05/01 19:20:26 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [6]:
def shuffle_text(text):
    keywords = []
    nlp = spacy.load('en_core_web_lg')
    
    for item in nlp(text):
        if not item.is_stop:
            keywords.append(item.text)
    keywords_para = ' '.join(keywords)

    return keywords_para

def extract_keywords(text):
    try:
        split_text = text.split("Text: ")
        text = split_text[1]
        keywords_para = shuffle_text(text)

        return {
        "text": text,
        "keywords": keywords_para
        }
    except:
        return {'text': '', 'keywords': ''}

In [7]:
df_small = df.limit(2)

In [8]:
extract_keywords_udf = udf(extract_keywords, MapType(StringType(), StringType()))
annotated_df = df_small.withColumn("AnnotatedSections", extract_keywords_udf(df_small["PostText"]))
annotated_df.select("AnnotatedSections").take(1)

                                                                                

[Row(AnnotatedSections={'keywords': 'read questions answers posted , thread invite share like highlight week - interesting discussion , informative answer , insightful question overlooked , .', 'text': "Nobody can read all the questions and answers that are posted here, so in this thread we invite you to share anything you'd like to highlight from the last week - an interesting discussion, an informative answer, an insightful question that was overlooked, or anything else."})]

In [9]:
tokenizer = T5Tokenizer.from_pretrained("t5-small")

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


In [10]:
def tokens_to_ids_attention_mask(prompt):
    if tokenizer.pad_token is None:
        tokenizer.add_special_tokens({'pad_token': '[PAD]'})

    token_ids_dict = {}
    attention_mask_dict = {}
    
    for section in prompt.keys():
        sentence = prompt[section]
        inputs = tokenizer(sentence, truncation=True, padding='max_length', max_length=512, return_tensors="pt")
        token_ids_dict[section] = inputs['input_ids'].tolist()[0]
        attention_mask_dict[section] = inputs['attention_mask'].tolist()[0]

    tokens = {}
    for section in prompt.keys():
        tokens[section] = [tokenizer.decode(token) for token in token_ids_dict[section]]

    for section in prompt.keys():
        assert len(token_ids_dict[section]) == len(attention_mask_dict[section]) == len(tokens[section]) 
        
    return (token_ids_dict['keywords'], token_ids_dict['text'], 
            attention_mask_dict['keywords'], attention_mask_dict['text'], 
            tokens['keywords'], tokens['text'])

In [11]:
schema = StructType([
 StructField("keywords_token_ids", ArrayType(IntegerType()), False),
 StructField("text_token_ids", ArrayType(IntegerType()), False),
 StructField("keywords_attention_mask", ArrayType(IntegerType()), False),
 StructField("text_attention_mask", ArrayType(IntegerType()), False),
 StructField("keywords_tokens", StringType(), False),
 StructField("text_tokens", StringType(), False),   
])

In [12]:
tokens_to_ids_attention_mask_udf = udf(tokens_to_ids_attention_mask, schema)

In [13]:
df_with_ids_masks = annotated_df.withColumn("ids_masks_tokens", tokens_to_ids_attention_mask_udf(annotated_df["AnnotatedSections"]))

In [14]:
df_final = df_with_ids_masks.select("ids_masks_tokens.keywords_token_ids", "ids_masks_tokens.text_token_ids", 
                                    "ids_masks_tokens.keywords_attention_mask", "ids_masks_tokens.text_attention_mask",
                                   )

In [15]:
df_final.show(1, truncate=True)

                                                                                

+--------------------+--------------------+-----------------------+--------------------+
|  keywords_token_ids|      text_token_ids|keywords_attention_mask| text_attention_mask|
+--------------------+--------------------+-----------------------+--------------------+
|[608, 746, 4269, ...|[22009, 54, 608, ...|   [1, 1, 1, 1, 1, 1...|[1, 1, 1, 1, 1, 1...|
+--------------------+--------------------+-----------------------+--------------------+
only showing top 1 row



In [17]:
s3_path = f"s3a://{s3_bucket}/subreddits_train_data_tokenized.parquet"

In [18]:
df_final.limit(10).write.parquet(s3_path, mode="overwrite")

                                                                                