In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, current_timestamp
from datasets import load_dataset, Dataset, DatasetDict
import huggingface_hub

# ETL of Large Source Dataset
Before uploading the dataset into a database and dataloading pipeline, the data has to be converted into I.I.E (independent and identifiable data) form by including a unique I.D and timestamp for each row. The data will also be split into its' train and validation subsets.

In [2]:
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr-serverless connect --application-id 00fq6j1a0fiulq09 --language python --emr-execution-role-arn arn:aws:iam::597161074694:role/service-role/AmazonEMR-ServiceRole-20250211T131858

Waiting for EMR Serverless application state to become STARTED
Waiting for EMR Serverless application state to become STARTED
Initiating EMR Serverless connection..
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,00fq933kluiv1p0a,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


## Connecting to PySpark
Since the data is large (~2 million rows), processing the data in a distributed cluster will speed up processing.

In [3]:
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "50g") \
    .appName('spark') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/15 00:33:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
tinystories = "skeskinen/TinyStories-GPT4"
train_data = load_dataset(tinystories, revision = 'refs/convert/parquet',split="train[:2196080]")
val_data = load_dataset(tinystories, revision = 'refs/convert/parquet', split="train[2196080:]")

Using the latest cached version of the dataset since skeskinen/TinyStories-GPT4 couldn't be found on the Hugging Face Hub
Found the latest cached dataset configuration 'default' at /home/sagemaker-user/.cache/huggingface/datasets/skeskinen___tiny_stories-gpt4/default/0.0.0/3c6dc87e4c3c6079b89e9271c5f91d0f226c103c (last modified on Sat Feb 15 00:19:37 2025).


In [5]:
train_data

Dataset({
    features: ['story', 'summary', 'source', 'prompt', 'words', 'features'],
    num_rows: 2196080
})

In [6]:
val_data

Dataset({
    features: ['story', 'summary', 'source', 'prompt', 'words', 'features'],
    num_rows: 549020
})

In [7]:
train_data = spark.createDataFrame(train_data)

In [8]:
val_data = spark.createDataFrame(val_data)

## Unique ID and Timestamp
Adding these columns will make the records in the dataset independently identifiable for use with a database in AWS.

In [9]:
# Add columns with PySpark UDFs
train_data = train_data.withColumn("unique_id", monotonically_increasing_id()) 
train_data = train_data.withColumn("timestamp", current_timestamp())

In [10]:
# Add columns with PySpark UDFs
val_data = val_data.withColumn("unique_id", monotonically_increasing_id()) 
val_data = val_data.withColumn("timestamp", current_timestamp())

## Features and Words
These columns were originally formatted in array data structures, and need to be converted to string format to allow for use with the training pipeline for language models.

In [15]:
def features_to_string(arr):
    kind = 'narrative features: '
    return kind + ", ".join(arr)

In [16]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

In [17]:
features_to_string = udf(features_to_string)

In [39]:
train_data = train_data.withColumn('string_features', features_to_string(col('features')))

In [40]:
val_data = val_data.withColumn('string_features', features_to_string(col('features')))

In [33]:
def words_to_string(arr):
    kind = 'vocabulary features: '
    return kind + ", ".join(arr)

In [34]:
words_to_string = udf(words_to_string)

In [35]:
train_data = train_data.withColumn('string_words', words_to_string(col('words')))
val_data = val_data.withColumn('string_words', words_to_string(col('words')))

In [46]:
train_data = train_data.drop('words').drop('features')
val_data = val_data.drop('words').drop('features')

In [50]:
train_data.printSchema()

root
 |-- prompt: string (nullable = true)
 |-- source: string (nullable = true)
 |-- story: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unique_id: long (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- string_features: string (nullable = true)
 |-- string_words: string (nullable = true)



## Uploading to HuggingFace

In [51]:
#converting to huggingface dataset objects
train_data = Dataset.from_spark(train_data)
val_data = Dataset.from_spark(val_data)

25/02/15 01:01:09 WARN TaskSetManager: Stage 3 contains a task of very large size (359110 KiB). The maximum recommended task size is 1000 KiB.
25/02/15 01:01:22 WARN TaskSetManager: Stage 6 contains a task of very large size (359110 KiB). The maximum recommended task size is 1000 KiB.
25/02/15 01:01:29 WARN PythonRunner: Detected deadlock while completing task 2.0 in stage 6 (TID 14): Attempting to kill Python Worker
25/02/15 01:01:29 WARN PythonRunner: Detected deadlock while completing task 1.0 in stage 6 (TID 13): Attempting to kill Python Worker
25/02/15 01:01:29 WARN PythonRunner: Detected deadlock while completing task 7.0 in stage 6 (TID 19): Attempting to kill Python Worker
25/02/15 01:01:29 WARN PythonRunner: Detected deadlock while completing task 3.0 in stage 6 (TID 15): Attempting to kill Python Worker
25/02/15 01:01:29 WARN PythonRunner: Detected deadlock while completing task 5.0 in stage 6 (TID 17): Attempting to kill Python Worker
25/02/15 01:01:30 WARN PythonRunner: De

In [52]:
df_splits = {'train': train_data, 'validation': val_data}

In [53]:
repo_id = 'Alexis-Az/TinyStories'

In [54]:
full_data = DatasetDict(df_splits)

In [55]:
full_data.push_to_hub(repo_id=repo_id)

Uploading the dataset shards:   0%|          | 0/7 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/314 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/314 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/314 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/314 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/314 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/314 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/314 [00:00<?, ?ba/s]

Uploading the dataset shards:   0%|          | 0/2 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/275 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/275 [00:00<?, ?ba/s]

README.md:   0%|          | 0.00/685 [00:00<?, ?B/s]

CommitInfo(commit_url='https://huggingface.co/datasets/Alexis-Az/TinyStories/commit/2b8499b9f9d157b30ed23ed91ce9b1360d38a93a', commit_message='Upload dataset', commit_description='', oid='2b8499b9f9d157b30ed23ed91ce9b1360d38a93a', pr_url=None, repo_url=RepoUrl('https://huggingface.co/datasets/Alexis-Az/TinyStories', endpoint='https://huggingface.co', repo_type='dataset', repo_id='Alexis-Az/TinyStories'), pr_revision=None, pr_num=None)