In [ ]:
%pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Text Summarization with Pyspark") \
    .getOrCreate()

sc = spark.sparkContext

In [ ]:
%pyspark
from pyspark.sql import Row

all_wikipedia_files = sc.wholeTextFiles("s3://strata-demo-data/nlp_data/*(Clean Splits).txt")
wikipage_rdd_list = []
#for i in range(30):
for i in range(1):
    wikipage_rdd = all_wikipedia_files\
        .filter(lambda element: element[0].endswith('_' + str(i) + '(Clean Splits).txt'))\
        .values()\
        .flatMap(lambda text_string: text_string.split("---END.OF.DOCUMENT---"))\
        .map(lambda text_string: text_string.strip("\r\n").replace("\r\n", "\n"))
  
    wikipage_rdd.cache()
    # Change from rdd of strings to rdd of rows
    wikipage_rdd = wikipage_rdd.map(lambda element: Row(Body=element))
    wikipage_rdd_list.append(wikipage_rdd)
    
combined_wikipage_rdd = sc.union(wikipage_rdd_list)

# Convert from rdd to dataframe
combined_wikipage_dataframe = spark.createDataFrame(combined_wikipage_rdd).repartition(256)
combined_wikipage_dataframe.head()
#combined_wikipage_dataframe.write.format("csv")\
#    .option("header", "true")\
#    .mode("append")\
#    .save("s3://strata-demo-data/nlp_data/Post_Processed_Wikipedia_Corpus.csv")

In [ ]:
%pyspark
import re
from pyspark.sql.functions import udf
# Extract Wikipedia page title into new column
def extract_wikipage_title(wikipage_body):
    matched_item = re.match("(.*)\\n", wikipage_body)
    return matched_item.group(1) if matched_item else "N/A"

# Remove Wikipedia page title from item body
def remove_title_from_item_body(wikipage_body):
    matched_item = re.match("(.*)\\n", wikipage_body)
    return wikipage_body[matched_item.end(1):].strip("\n") if matched_item else wikipage_body
    
    
extract_wikipage_title_udf = udf(extract_wikipage_title)
remove_title_from_item_body_udf = udf(remove_title_from_item_body)

combined_wikipage_dataframe = combined_wikipage_dataframe\
    .filter(combined_wikipage_dataframe.Body != "")\
    .withColumn("Title", extract_wikipage_title_udf(combined_wikipage_dataframe.Body))\
    .select("Title", "Body")

combined_wikipage_dataframe = combined_wikipage_dataframe\
    .withColumn("Body", remove_title_from_item_body_udf(combined_wikipage_dataframe.Body))
    
combined_wikipage_dataframe.cache()
combined_wikipage_dataframe.head()



In [ ]:
%pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from gensim.summarization.summarizer import summarize

# Summarize "Body" column and assign to "Summary"
def summarize_body(body):
    try:
        text_summary = summarize(body)
    except ValueError:
        text_summary = body
    return text_summary
    
# summarize_body_udf = udf(lambda body: summarize(body), StringType()) 
summarize_body_udf = udf(summarize_body, StringType())

combined_wikipage_dataframe = combined_wikipage_dataframe\
    .withColumn("Summary", summarize_body_udf(combined_wikipage_dataframe.Body))

combined_wikipage_dataframe.cache()
examples = combined_wikipage_dataframe.head(3)
for example in examples:
    print("Title:")
    print(example.Title)
    print("Body:")
    print(example.Body)
    print("Summary:")
    print(example.Summary)

In [ ]:
%pyspark
combined_wikipage_dataframe.write.parquet("s3://strata-demo-data/nlp_data/wikipage_summarization.parquet")