# Roberta Embedding Pipeline

This project requires sentence level RoBERTa Embeddings. We found that our code took ~40 seconds to embed a short transcript worth of data: hence the necessity of moving to the cloud. 

Spark made the most sense because calculating embeddings in a highly parallel task that requires sizeable memory storage to hold the embedding model and resulting vectors. For example, doing this task of a lambda function wouldnt work because the lambda function wouldnt be able to store the embedding model - thus requiring an api backend for running this process. Spark was a great option because it comes with prebuild nlp libraries which were convenient to use. 

Ideally we would have run our entire pipeline on Spark. While Spark is very fast for parallel tasks, it is very slow for returning the results to the primary node and saving them back to s3. Thus, only the result of the analysis should be returned - and not the embedding vectors themselves. 

Due to resource constraints, it did not make sense to use Spark for the remainder of the project. Thus we wrote code to do the inefficient task of calculating embeddings, and saving the smaller (but still large) resulting pooled embeddings to s3.

In particular, this pipeline takes sentence level data, tokenizes it, calculates word-level embeddings, transforms the word-level embeddings using average pooling, and packages the result. The resulting array is incompatible with export types, so the arrays gets turned into a string representation, to be returned to an array upon receipt of the data. 

# Set Up

In [1]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.3.1",
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [2]:
sc.install_pypi_package('spark-nlp')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1684440212536_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting spark-nlp
  Downloading https://files.pythonhosted.org/packages/ad/02/b86152a985aaa66d628b9d07263a5df640daaae32388cbbf38851677baf6/spark_nlp-4.4.2-py2.py3-none-any.whl (489kB)
Installing collected packages: spark-nlp
Successfully installed spark-nlp-4.4.2

In [3]:
from pyspark.ml import Pipeline, PipelineModel
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import *
from sparknlp.common import RegexRule
from sparknlp.base import *
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Create Pipeline

In [20]:
documentAssembler = DocumentAssembler()\
    .setInputCol("sentences")\
    .setOutputCol("document")

tokenizer = Tokenizer()\
    .setInputCols(["document"])\
    .setOutputCol("token")

embeddings = RoBertaEmbeddings.pretrained()\
  .setInputCols("document", "token")\
  .setOutputCol("embeddings")

embeddingsSentence = SentenceEmbeddings()\
  .setInputCols(["document", "embeddings"])\
  .setOutputCol("sentence_embeddings")\
  .setPoolingStrategy("AVERAGE")

embeddingsFinisher = EmbeddingsFinisher()\
  .setInputCols("sentence_embeddings")\
  .setOutputCols("finished_embeddings")\
  .setOutputAsVector(True)\
  .setCleanAnnotations(False)

my_pipeline = Pipeline(
      stages = [
        documentAssembler,
        tokenizer,
        embeddings,
        embeddingsSentence,
        embeddingsFinisher
      ])



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

roberta_base download started this may take some time.
Approximate size to download 284.8 MB
[OK!]

# Load Data

In [8]:
ami = spark.read.parquet('s3://decoding-democracy-embed/ami.parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
ami.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------------+--------------------+-----------+----------+--------------+-----------------+
|index|transcript_id|           sentences|topic_count|topic_desc|has_topic_desc|__index_level_0__|
+-----+-------------+--------------------+-----------+----------+--------------+-----------------+
|    1|        AMI_0| Well , let's sta...|          0|   opening|          true|                0|
+-----+-------------+--------------------+-----------+----------+--------------+-----------------+
only showing top 1 row

# Run Pipeline for Transcripts

In [21]:
pipelineModel = my_pipeline.fit(ami)
result = pipelineModel.transform(ami)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# See Results

In [22]:
result.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- index: long (nullable = true)
 |-- transcript_id: string (nullable = true)
 |-- sentences: string (nullable = true)
 |-- topic_count: long (nullable = true)
 |-- topic_desc: string (nullable = true)
 |-- has_topic_desc: boolean (nullable = true)
 |-- __index_level_0__: long (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: i

# Clean

In [12]:
def array_to_string(my_list):
    return '[' + ','.join([str(elem) for elem in my_list]) + ']'


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
array_to_string_udf = udf(array_to_string, StringType())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
result=result.withColumn('finished_embeddings_str', array_to_string_udf(result["finished_embeddings"]))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
result = result.drop("document")
result = result.drop("token")
result = result.drop("embeddings")
result = result.drop("sentence_embeddings")
result = result.drop("finished_embeddings")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
result.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- index: long (nullable = true)
 |-- transcript_id: string (nullable = true)
 |-- sentences: string (nullable = true)
 |-- topic_count: long (nullable = true)
 |-- topic_desc: string (nullable = true)
 |-- has_topic_desc: boolean (nullable = true)
 |-- __index_level_0__: long (nullable = true)
 |-- finished_embeddings_str: string (nullable = true)

In [27]:
result.write.parquet("s3://decoding-democracy-embed/ami_out_small2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Repeat Pipeline for Transcripts

In [28]:
transcripts = spark.read.parquet('s3://decoding-democracy-embed/transcripts.parquet')
transcripts.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+--------------------+
|transcript_id|           sentences|
+-------------+--------------------+
| d0a7e5864959|And older woman J...|
+-------------+--------------------+
only showing top 1 row

In [29]:
pipelineModel = my_pipeline.fit(transcripts)
result = pipelineModel.transform(transcripts)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
result.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- transcript_id: string (nullable = true)
 |-- sentences: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |

In [31]:
result=result.withColumn('finished_embeddings_str', array_to_string_udf(result["finished_embeddings"]))
result = result.drop("document")
result = result.drop("token")
result = result.drop("embeddings")
result = result.drop("sentence_embeddings")
result = result.drop("finished_embeddings")



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
result.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- transcript_id: string (nullable = true)
 |-- sentences: string (nullable = true)
 |-- finished_embeddings_str: string (nullable = true)

In [None]:
result.write.parquet("s3://decoding-democracy-embed/transcript_out")