In [1]:
from pyspark.sql.functions import udf
import spacy
import string

nlp = spacy.load('en_core_web_lg')

In [2]:
ROOT = '../datasets/combined'

In [3]:
raw_df = spark.read.json(f'{ROOT}/combined_raw')
raw_df.show()

+--------------------+
|             article|
+--------------------+
|U.S. investment b...|
|Mexican coffee pr...|
|Oil companies are...|
|Adam Dunn … you a...|
|The surprising re...|
|» County’s annual...|
|Dropcam will prob...|
|Wild Insects need...|
|British scientist...|
|The Big Bang Theo...|
|A nurse made the ...|
|Mother-of-three D...|
|The Scots were ki...|
|He may not be com...|
|The wreckage of t...|
|Inter Milan were ...|
|18-year-old Aditi...|
|Tiger Woods grima...|
|Danny Cipriani to...|
|US jobs numbers a...|
+--------------------+
only showing top 20 rows



In [4]:
raw_df.count()

40954102

In [5]:
def preprocess_text(text):
    # Convert text to lowercase
    text = text.lower()
    
    # Remove stopwords and short words with spaCy
    doc = nlp(text)
    text = ' '.join([token.text for token in doc if not token.is_stop and len(token.text) > 2])
    
    # Remove punctuation
    text = ''.join([char for char in text if char not in string.punctuation])
    
    # Apply spaCy's language model to generate text embeddings
    doc = nlp(text)
    embeddings = doc.vector.tolist()
    
    return embeddings

In [6]:
preprocess_udf = udf(preprocess_text)
processed_df = raw_df.withColumn('article', preprocess_udf(raw_df['article']))
processed_df.show()

+--------------------+
|             article|
+--------------------+
|[-0.1344387233257...|
|[-0.6017873287200...|
|[0.03559869527816...|
|[-0.3951138556003...|
|[0.41297334432601...|
|[-1.9053480625152...|
|[1.11993110179901...|
|[0.36661779880523...|
|[-0.1308644264936...|
|[0.72363460063934...|
|[-0.1466323137283...|
|[-0.5513845086097...|
|[-0.7763857841491...|
|[-0.5878886580467...|
|[-0.2158189564943...|
|[-1.8650419712066...|
|[-0.8801572918891...|
|[-0.8520722985267...|
|[-0.1342076659202...|
|[-0.2789138257503...|
+--------------------+
only showing top 20 rows



In [7]:
processed_df.write.mode('overwrite').json(f'{ROOT}/combined_processed_spacy')