# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run this cell to set up and start your interactive session.


In [1]:
%additional_python_modules "transformers"

%idle_timeout 2880
%glue_version 4.0
%worker_type G.8X
%number_of_workers 32

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 
Additional python modules to be included:
transformers
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.8X
Previous number of workers: None
Setting new number of workers to: 32
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.8X
Number of Workers: 32
Session ID: e867bbab-5de8-44ee-befe-9e91fe1c5baf
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
--additional-python-modules transformers
Waiting for session e867bbab-5de8-44ee-befe-9e91fe1c5

####  Libraries


In [2]:
from pyspark.sql.functions import udf, explode, col
from pyspark.sql.functions import sum as _sum
from pyspark.sql.types import ArrayType, StringType, IntegerType

import re
import transformers

None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


####  Load processed files from the s3 bucket


In [53]:
JSONL_NAME = "papers-part10_filtered"


dynamic_frame = glueContext.create_dynamic_frame.from_options(
        connection_type="s3", 
        connection_options={"paths": [f"s3://arcee-medical-dataset/raw-files/filtered_jsonl/{JSONL_NAME}.jsonl"]},
         format="json"
    )





In [3]:
dynamic_frame = glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={
            "paths": ["s3://arcee-medical-dataset/processed/books.csv"],  # Path to your .txt files
            "recurse": True  # Optional: To read files in subdirectories
        },
        format="csv",  # Treat .txt files as single-column CSV
        format_options={
            "withHeader": True,  # Assuming .txt files do not have headers
        }
    )




#### Example: Convert the DynamicFrame to a Spark DataFrame


In [4]:
df = dynamic_frame.toDF()



#### Drop duplicates and null values from the dataframe in terms of the text column

In [5]:
spark_df =df.dropDuplicates(["text"]).dropna()




In [6]:
from pyspark.sql.functions import expr

# Replace newline characters with a space in each row of the 'text' column
df_cleaned = spark_df.withColumn("text", expr("replace(text, '\n', ' ')"))

# Show the updated DataFrame to verify the changes
df_cleaned.show(5)

+--------+--------+--------------------+
|corpusid|pubmedid|                text|
+--------+--------+--------------------+
|        |        |CHARLES CLARKE NE...|
|        |        |GERMAN SHORTSTORI...|
|        |        |Get Full Access a...|
|        |        |-> wall pw siglan...|
|        |        |BY JONATHAN BOARD...|
+--------+--------+--------------------+
only showing top 5 rows


In [7]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
import transformers

# Assuming the tokenizer initialization is successful
tokenizer = transformers.AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf", token="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")

# Define the schema of the UDF's return type
schema = StructType([
    StructField("tokens", ArrayType(IntegerType()), nullable=True),
    StructField("token_count", IntegerType(), nullable=True)
])

# Modify the function to return both token IDs and token count
def get_tokens_and_count(text):
    if not isinstance(text, str):
        return ([], 0)
    try:
        # Encode the text and return both the token IDs and the token count
        encoded_input = tokenizer.encode(text, add_special_tokens=True)
        return (encoded_input, len(encoded_input))
    except Exception as e:
        print(f"Error encoding text: {e}")
        return ([], 0)

# Define the UDF with the structured return type
get_tokens_and_count_udf = udf(get_tokens_and_count, schema)

# Apply the UDF to the DataFrame to create new 'tokens' and 'token_count' columns
df_with_tokens_and_count = df_cleaned.withColumn("tokens_and_count", get_tokens_and_count_udf(col("text")))

# Expand the struct into separate columns
df_final = df_with_tokens_and_count.select(
    "*", 
    "tokens_and_count.tokens", 
    "tokens_and_count.token_count"
).drop("tokens_and_count")

# Show the result
df_final.show(5)

+--------+--------+--------------------+--------------------+-----------+
|corpusid|pubmedid|                text|              tokens|token_count|
+--------+--------+--------------------+--------------------+-----------+
|        |        |CHARLES CLARKE NE...|[1, 26871, 17101,...|     214043|
|        |        |GERMAN SHORTSTORI...|[1, 402, 1001, 27...|      59371|
|        |        |Get Full Access a...|[1, 3617, 14846, ...|     108223|
|        |        |-> wall pw siglan...|[1, 1599, 10090, ...|      37748|
|        |        |BY JONATHAN BOARD...|[1, 6770, 435, 11...|     212753|
+--------+--------+--------------------+--------------------+-----------+
only showing top 5 rows

tokenizer_config.json: 100%|##########| 776/776 [00:00<00:00, 6.15MB/s]
tokenizer.model: 100%|##########| 500k/500k [00:00<00:00, 195MB/s]
tokenizer.json: 100%|##########| 1.84M/1.84M [00:00<00:00, 51.1MB/s]
special_tokens_map.json: 100%|##########| 414/414 [00:00<00:00, 4.33MB/s]


In [41]:
# from pyspark.sql.functions import sum

# # Sum the token_count column to get the total number of tokens
# total_tokens = df_final.agg(sum("token_count")).collect()[0][0]

# print(f"Total number of tokens: {total_tokens}")

In [42]:
# # Filter the DataFrame to only include rows with 0 tokens
# df_final = df_final.filter("token_count = 0")

# # Show the filtered DataFrame
# df_final.show()

#### Save

In [8]:
s3_bucket_path_all = f"s3://arcee-medical-dataset/tokenized-parquet-files/{JSONL_NAME}.parquet"

df_final.write.mode('overwrite').parquet(s3_bucket_path_all)


