1. Start new session of pyspark

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .config('spark.driver.memory', '48g')\
        .getOrCreate()

In [0]:
spark

2. load data

In [0]:
train_cvs_path = "dbfs:/user/hive/warehouse/train_csv"
test_cvs_path = "dbfs:/user/hive/warehouse/test_csv"
validation_cvs_path = "dbfs:/user/hive/warehouse/validation_csv"

In [0]:
# df = spark.read.format("delta").load(delta_table_path)
df_train_pyspark = spark.read.format("delta").load(train_cvs_path)
df_test_pyspark = spark.read.format("delta").load(test_cvs_path)
df_validation_pyspark = spark.read.format("delta").load(validation_cvs_path)

In [0]:
df_train_pyspark.show(5, truncate= False)

+----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
#test data completion after loading
print("count of article:")
print(df_train_pyspark.select("article").count())
print(df_test_pyspark.select("article").count())
print(df_validation_pyspark.select("article").count())

print("count of highlights:")
print(df_train_pyspark.select("highlights").count())
print(df_test_pyspark.select("highlights").count())
print(df_validation_pyspark.select("highlights").count())


count of article:
287113
11490
13368
count of highlights:
287113
11490
13368


3. Data cleaning

In [0]:
!pip install nltk

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting nltk
  Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.5/1.5 MB 20.6 MB/s eta 0:00:00
Collecting tqdm
  Downloading tqdm-4.66.1-py3-none-any.whl (78 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 78.3/78.3 kB 14.5 MB/s eta 0:00:00
Collecting regex>=2021.8.3
  Downloading regex-2023.10.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (773 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 773.9/773.9 kB 53.0 MB/s eta 0:00:00
Installing collected packages: tqdm, regex, nltk
Successfully installed nltk-3.8.1 regex-2023.10.3 tqdm-4.66.1
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

from pyspark.sql.functions import udf, col, lower, trim, regexp_replace, concat_ws, lit
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer

In [0]:
# check NULL value
print("count of NULL in article:")
print(df_train_pyspark.where(col("article").isNull()).count())
print(df_test_pyspark.where(col("article").isNull()).count())
print(df_validation_pyspark.where(col("article").isNull()).count())

print("count of NULL in highlights:")
print(df_train_pyspark.where(col("highlights").isNull()).count())
print(df_test_pyspark.where(col("highlights").isNull()).count())
print(df_validation_pyspark.where(col("highlights").isNull()).count())

count of NULL in article:
0
0
0
count of NULL in highlights:
0
0
0


In [0]:
# drop duplicates
df_train_pyspark = df_train_pyspark.dropDuplicates()
df_test_pyspark = df_test_pyspark.dropDuplicates()
df_validation_pyspark = df_validation_pyspark.dropDuplicates()
print(f"Number of rows in df_train_pyspark after dropping the duplicates: {df_train_pyspark.count()}")
print(f"Number of rows in df_test_pyspark after dropping the duplicates: {df_test_pyspark.count()}")
print(f"Number of rows in df_validation_pyspark after dropping the duplicates: {df_validation_pyspark.count()}")
# nothing dropped

Number of rows in df_train_pyspark after dropping the duplicates: 287113
Number of rows in df_test_pyspark after dropping the duplicates: 11490
Number of rows in df_validation_pyspark after dropping the duplicates: 13368


contraction_mapping

lowercased

remove 's

remove puncuation + numbers

remove \n (since there are \n in the highlights)

remove any unnessesary whitespace

In [0]:
contraction_mapping = contraction_mapping ={"ain't": "is not", "aren't": "are not","can't": "cannot", "'cause": "because", "could've": "could have", "couldn't": "could not",

                           "didn't": "did not", "doesn't": "does not", "don't": "do not", "hadn't": "had not", "hasn't": "has not", "haven't": "have not",

                           "he'd": "he would","he'll": "he will", "he's": "he is", "how'd": "how did", "how'd'y": "how do you", "how'll": "how will", "how's": "how is",

                           "I'd": "I would", "I'd've": "I would have", "I'll": "I will", "I'll've": "I will have","I'm": "I am", "I've": "I have", "i'd": "i would",

                           "i'd've": "i would have", "i'll": "i will",  "i'll've": "i will have","i'm": "i am", "i've": "i have", "isn't": "is not", "it'd": "it would",

                           "it'd've": "it would have", "it'll": "it will", "it'll've": "it will have","it's": "it is", "let's": "let us", "ma'am": "madam",

                           "mayn't": "may not", "might've": "might have","mightn't": "might not","mightn't've": "might not have", "must've": "must have",

                           "mustn't": "must not", "mustn't've": "must not have", "needn't": "need not", "needn't've": "need not have","o'clock": "of the clock",

                           "oughtn't": "ought not", "oughtn't've": "ought not have", "shan't": "shall not", "sha'n't": "shall not", "shan't've": "shall not have",

                           "she'd": "she would", "she'd've": "she would have", "she'll": "she will", "she'll've": "she will have", "she's": "she is",

                           "should've": "should have", "shouldn't": "should not", "shouldn't've": "should not have", "so've": "so have","so's": "so as",

                           "this's": "this is","that'd": "that would", "that'd've": "that would have", "that's": "that is", "there'd": "there would",

                           "there'd've": "there would have", "there's": "there is", "here's": "here is","they'd": "they would", "they'd've": "they would have",

                           "they'll": "they will", "they'll've": "they will have", "they're": "they are", "they've": "they have", "to've": "to have",

                           "wasn't": "was not", "we'd": "we would", "we'd've": "we would have", "we'll": "we will", "we'll've": "we will have", "we're": "we are",

                           "we've": "we have", "weren't": "were not", "what'll": "what will", "what'll've": "what will have", "what're": "what are",

                           "what's": "what is", "what've": "what have", "when's": "when is", "when've": "when have", "where'd": "where did", "where's": "where is",

                           "where've": "where have", "who'll": "who will", "who'll've": "who will have", "who's": "who is", "who've": "who have",

                           "why's": "why is", "why've": "why have", "will've": "will have", "won't": "will not", "won't've": "will not have",

                           "would've": "would have", "wouldn't": "would not", "wouldn't've": "would not have", "y'all": "you all",

                           "y'all'd": "you all would","y'all'd've": "you all would have","y'all're": "you all are","y'all've": "you all have",

                           "you'd": "you would", "you'd've": "you would have", "you'll": "you will", "you'll've": "you will have",

                           "you're": "you are", "you've": "you have"}

In [0]:
for key, value in contraction_mapping.items():
    df_train_pyspark = df_train_pyspark.withColumn("article", regexp_replace(df_train_pyspark.article, key, value))
    df_train_pyspark = df_train_pyspark.withColumn("highlights", regexp_replace(df_train_pyspark.highlights, key, value))

    df_test_pyspark = df_test_pyspark.withColumn("article", regexp_replace(df_test_pyspark.article, key, value))
    df_test_pyspark = df_test_pyspark.withColumn("highlights", regexp_replace(df_test_pyspark.highlights, key, value))

    df_validation_pyspark = df_validation_pyspark.withColumn("article", regexp_replace(df_validation_pyspark.article, key, value))
    df_vaildation_pyspark = df_validation_pyspark.withColumn("highlights", regexp_replace(df_validation_pyspark.highlights, key, value))

In [0]:
def data_clean(df, columns_name):
  df = df.withColumn(columns_name, lower(col(columns_name))) # lowercased
  df = df.withColumn(columns_name, regexp_replace(col(columns_name), "'s", ""))
  df = df.withColumn(columns_name, regexp_replace(col(columns_name), r'\([^)]*\)', ''))
  df = df.withColumn(columns_name, regexp_replace(col(columns_name), "[^a-zA-Z\\s]", " "))
  df = df.withColumn(columns_name, regexp_replace(col(columns_name), "\n", ""))
  df = df.withColumn(columns_name, trim(regexp_replace(col(columns_name), " +", " ")))
  if columns_name == "highlights":
    delimiter = " "
    df = df.withColumn(columns_name, concat_ws(delimiter, lit("soseq"), df[columns_name], lit("eoseq")))

  return df

In [0]:
df_train_pyspark = data_clean(df_train_pyspark, "article")
df_test_pyspark = data_clean(df_test_pyspark, "article")
df_validation_pyspark = data_clean(df_validation_pyspark, "article")

df_train_pyspark = data_clean(df_train_pyspark, "highlights")
df_test_pyspark = data_clean(df_test_pyspark, "highlights")
df_validation_pyspark = data_clean(df_validation_pyspark, "highlights")

In [0]:
df_validation_pyspark.show(5, truncate=False)

+----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

tokenlization

In [0]:
tokenizer_article = Tokenizer(inputCol="article", outputCol="tokenized_article")
tokenizer_highlights = Tokenizer(inputCol="highlights", outputCol="tokenized_highlights")

df_train_pyspark = tokenizer_article.transform(df_train_pyspark)
df_train_pyspark = tokenizer_highlights.transform(df_train_pyspark)

df_test_pyspark = tokenizer_article.transform(df_test_pyspark)
df_test_pyspark = tokenizer_highlights.transform(df_test_pyspark)

df_validation_pyspark = tokenizer_article.transform(df_validation_pyspark)
df_validation_pyspark = tokenizer_highlights.transform(df_validation_pyspark)

df_validation_pyspark.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                  id|             article|          highlights|   tokenized_article|tokenized_highlights|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|b1d4dd6749a0523ed...|dynamo kiev face ...|soseq dynamo kiev...|[dynamo, kiev, fa...|[soseq, dynamo, k...|
|3a713e68e18993f30...|a bank worker tri...|soseq amish kansa...|[a, bank, worker,...|[soseq, amish, ka...|
|4db59c496497fde0c...|what have fearne ...|soseq stylish cel...|[what, have, fear...|[soseq, stylish, ...|
|e45a7c7eb72961518...|former french int...|soseq claude guea...|[former, french, ...|[soseq, claude, g...|
|a38c6716a53f709f3...|a mystery food so...|soseq dozens of l...|[a, mystery, food...|[soseq, dozens, o...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



stopwords removal

In [0]:
stop_words_remover_article = StopWordsRemover(inputCol="tokenized_article", outputCol="stopword_removed_article")
stop_words_remover_highlights = StopWordsRemover(inputCol="tokenized_highlights", outputCol="stopword_removed_highlights")


df_train_pyspark = stop_words_remover_article.transform(df_train_pyspark)
df_train_pyspark = stop_words_remover_highlights.transform(df_train_pyspark)

df_test_pyspark = stop_words_remover_article.transform(df_test_pyspark)
df_test_pyspark = stop_words_remover_highlights.transform(df_test_pyspark)

df_validation_pyspark = stop_words_remover_article.transform(df_validation_pyspark)
df_validation_pyspark = stop_words_remover_highlights.transform(df_validation_pyspark)

df_validation_pyspark.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+---------------------------+
|                  id|             article|          highlights|   tokenized_article|tokenized_highlights|stopword_removed_article|stopword_removed_highlights|
+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+---------------------------+
|b1d4dd6749a0523ed...|dynamo kiev face ...|soseq dynamo kiev...|[dynamo, kiev, fa...|[soseq, dynamo, k...|    [dynamo, kiev, fa...|       [soseq, dynamo, k...|
|3a713e68e18993f30...|a bank worker tri...|soseq amish kansa...|[a, bank, worker,...|[soseq, amish, ka...|    [bank, worker, tr...|       [soseq, amish, ka...|
|4db59c496497fde0c...|what have fearne ...|soseq stylish cel...|[what, have, fear...|[soseq, stylish, ...|    [fearne, cotton, ...|       [soseq, stylish, ...|
|e45a7c7eb72961518...|former french int.

In [0]:
df_train = df_train_pyspark.withColumn("article",concat_ws(" ", col("stopword_removed_article"))).withColumn("highlights",concat_ws(" ", col("stopword_removed_highlights")))
df_test = df_test_pyspark.withColumn("article",concat_ws(" ", col("stopword_removed_article"))).withColumn("highlights",concat_ws(" ", col("stopword_removed_highlights")))
df_validation = df_validation_pyspark.withColumn("article",concat_ws(" ", col("stopword_removed_article"))).withColumn("highlights",concat_ws(" ", col("stopword_removed_highlights")))

df_train.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+---------------------------+
|                  id|             article|          highlights|   tokenized_article|tokenized_highlights|stopword_removed_article|stopword_removed_highlights|
+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+---------------------------+
|002c115c31577d0e9...|washington pilots...|soseq first offic...|[washington, the,...|[soseq, first, of...|    [washington, pilo...|       [soseq, first, of...|
|0203b8f730d58be22...|freya noble daily...|soseq counting co...|[by, freya, noble...|[soseq, counting,...|    [freya, noble, da...|       [soseq, counting,...|
|0222d97ad9ce49402...|sanaa thousands y...|soseq air force r...|[sanaa, thousands...|[soseq, air, forc...|    [sanaa, thousands...|       [soseq, air, forc...|
|025bee3924aab5c23...|afghan soldier sh.

In [0]:
df_test.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+---------------------------+
|                  id|             article|          highlights|   tokenized_article|tokenized_highlights|stopword_removed_article|stopword_removed_highlights|
+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+---------------------------+
|c8624a481fb71e13f...|spectacular momen...|soseq photos capt...|[this, is, the, s...|[soseq, photos, c...|    [spectacular, mom...|       [soseq, photos, c...|
|33259941aa02eea8c...|team sky geraint ...|soseq norway alex...|[team, sky, gerai...|[soseq, norway, a...|    [team, sky, gerai...|       [soseq, norway, a...|
|feae53518bfe6e7dd...|famous italian de...|soseq italian gla...|[famous, italian,...|[soseq, italian, ...|    [famous, italian,...|       [soseq, italian, ...|
|0651c529fad57729b...|jenny wallenda ma.

In [0]:
df_validation.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+---------------------------+
|                  id|             article|          highlights|   tokenized_article|tokenized_highlights|stopword_removed_article|stopword_removed_highlights|
+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+---------------------------+
|b1d4dd6749a0523ed...|dynamo kiev face ...|soseq dynamo kiev...|[dynamo, kiev, fa...|[soseq, dynamo, k...|    [dynamo, kiev, fa...|       [soseq, dynamo, k...|
|3a713e68e18993f30...|bank worker tried...|soseq amish kansa...|[a, bank, worker,...|[soseq, amish, ka...|    [bank, worker, tr...|       [soseq, amish, ka...|
|4db59c496497fde0c...|fearne cotton den...|soseq stylish cel...|[what, have, fear...|[soseq, stylish, ...|    [fearne, cotton, ...|       [soseq, stylish, ...|
|e45a7c7eb72961518...|former french int.

save data into csv

In [0]:
"""
train_cvs_path = "dbfs:/user/hive/warehouse/train_csv"
test_cvs_path = "dbfs:/user/hive/warehouse/test_csv"
validation_cvs_path = "dbfs:/user/hive/warehouse/validation_csv"
"""

csv_train_output_path = "dbfs:/user/hive/warehouse/pro_train.csv"
csv_test_output_path = "dbfs:/user/hive/warehouse/pro_test.csv"
csv_validation_output_path = "dbfs:/user/hive/warehouse/pro_validation.csv"

In [0]:
#Sample.coalesce(1).write.format(“com.databricks.spark.csv”).option(“header”, “true”).save(“dbfs:/FileStore/df/Sample.csv”)
df_train.select("article", "highlights").coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/tables/csv_train_output.csv")


In [0]:
df_test.select("article", "highlights").coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/tables/csv_test_output.csv")

In [0]:
df_validation.select("article", "highlights").coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("dbfs:/FileStore/tables/csv_validation_output.csv")