In [1]:
# Import necessary libraries
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("SummarizationApp") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Define the schema for the data
schema = StructType([
    StructField("id", StringType(), True),
    StructField("article", StringType(), True),
    StructField("highlights", StringType(), True)
])

# Read CSV files in HDFS using the defined schema
train_data = spark.read.schema(schema).option("header", "true").csv("hdfs:///cnn_dailymail/train.csv")
test_data = spark.read.schema(schema).option("header", "true").csv("hdfs:///cnn_dailymail/test.csv")
validation_data = spark.read.schema(schema).option("header", "true").csv("hdfs:///cnn_dailymail/validation.csv")

# Show the first 5 rows of each dataset (optional)
train_data.show(5)
test_data.show(5)
validation_data.show(5)

                                                                                

+--------------------+--------------------+--------------------+
|                  id|             article|          highlights|
+--------------------+--------------------+--------------------+
|0001d1afc246a7964...|By . Associated P...|Bishop John Folda...|
|He contracted the...|                null|                null|
|Church members in...| Grand Forks and ...|                null|
|0002095e55fcbd3a2...|"(CNN) -- Ralph M...|"" of using his r...|
|          Ralph Mata| an internal affa...| allegedly helped...|
+--------------------+--------------------+--------------------+
only showing top 5 rows

+--------------------+--------------------+--------------------+
|                  id|             article|          highlights|
+--------------------+--------------------+--------------------+
|92c514c913c0bdfe2...|Ever noticed how ...|Experts question ...|
|U.S consumer advi...|                null|                null|
|Safety tests cond...|                null|                null|


In [2]:
# Remove the "id" column from the dataframes
train_data = train_data.drop("id")
test_data = test_data.drop("id")
validation_data = validation_data.drop("id")

# Drop rows with missing values in the "article" or "highlights" columns
train_data = train_data.dropna(subset=["article", "highlights"])
test_data = test_data.dropna(subset=["article", "highlights"])
validation_data = validation_data.dropna(subset=["article", "highlights"])

In [3]:
from pyspark.sql.functions import udf
import re

# Define a function to clean text
def clean_text(text):
    text = re.sub(r"http\S+|www\S+|https\S+", '', text, flags=re.MULTILINE)
    text = re.sub(r'\@\w+|\#','', text)
    text = re.sub(r"[^a-zA-Z0-9]", " ", text)
    text = text.lower().strip()
    return text

# UDF for PySpark DataFrame
clean_udf = udf(clean_text, StringType())

# Apply cleaning to the datasets
train_data = train_data.withColumn("article", clean_udf(train_data["article"])).withColumn("highlights", clean_udf(train_data["highlights"]))
test_data = test_data.withColumn("article", clean_udf(test_data["article"])).withColumn("highlights", clean_udf(test_data["highlights"]))
validation_data = validation_data.withColumn("article", clean_udf(validation_data["article"])).withColumn("highlights", clean_udf(validation_data["highlights"]))
# Show the first 5 rows of dataframes
train_data.show(5)
test_data.show(5)
validation_data.show(5)

[Stage 3:>                                                          (0 + 1) / 1]Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

+--------------------+--------------------+
|             article|          highlights|
+--------------------+--------------------+
|by   associated p...|bishop john folda...|
|cnn     ralph mat...|of using his role...|
|an internal affai...|allegedly helped ...|
|a drunk driver wh...|craig eccleston t...|
|cnn     with a br...|nina dos santos s...|
+--------------------+--------------------+
only showing top 5 rows

+--------------------+--------------------+
|             article|          highlights|
+--------------------+--------------------+
|ever noticed how ...|experts question ...|
|a drunk teenage b...|drunk teenage boy...|
|                  17|ran towards anima...|
|dougie freedman i...|nottingham forest...|
|liverpool target ...|fiorentina goalke...|
+--------------------+--------------------+
only showing top 5 rows

+--------------------+--------------------+
|             article|          highlights|
+--------------------+--------------------+
|sally forrest  an...|sall

In [4]:
# Print the row count of each dataframe
print("Train Data Row Count:", train_data.count())
print("Test Data Row Count:", test_data.count())
print("Validation Data Row Count:", validation_data.count())

                                                                                

Train Data Row Count: 351784
Test Data Row Count: 14149
Validation Data Row Count: 16586


In [5]:
# Define the desired row limits for each dataset
row_limits = {
    "train_data": 1000,
    "test_data": 250,
    "validation_data": 250
}

# Limit each dataset to the specified number of rows
for dataset_name, row_limit in row_limits.items():
    globals()[dataset_name] = globals()[dataset_name].limit(row_limit)

print("Train Data Row Count:", train_data.count())
print("Test Data Row Count:", test_data.count())
print("Validation Data Row Count:", validation_data.count())

Train Data Row Count: 1000
Test Data Row Count: 250
Validation Data Row Count: 250


In [6]:
from pyspark.ml.feature import Tokenizer

tokenizer_article = Tokenizer(inputCol="article", outputCol="article_tokens")
train_data = tokenizer_article.transform(train_data)
test_data = tokenizer_article.transform(test_data)
validation_data = tokenizer_article.transform(validation_data)

tokenizer_highlights = Tokenizer(inputCol="highlights", outputCol="highlight_tokens")
train_data = tokenizer_highlights.transform(train_data)
test_data = tokenizer_highlights.transform(test_data)
validation_data = tokenizer_highlights.transform(validation_data)


In [7]:
from pyspark.ml.feature import StopWordsRemover

# Create a StopWordsRemover instance for article_tokens
stopwords_remover_article = StopWordsRemover(inputCol="article_tokens", outputCol="filtered_article_tokens")
train_data = stopwords_remover_article.transform(train_data)
test_data = stopwords_remover_article.transform(test_data)
validation_data = stopwords_remover_article.transform(validation_data)

# Create a StopWordsRemover instance for highlight_tokens
stopwords_remover_highlight = StopWordsRemover(inputCol="highlight_tokens", outputCol="filtered_highlight_tokens")
train_data = stopwords_remover_highlight.transform(train_data)
test_data = stopwords_remover_highlight.transform(test_data)
validation_data = stopwords_remover_highlight.transform(validation_data)

In [8]:
train_data.select("article_tokens", "filtered_article_tokens", "highlight_tokens", "filtered_highlight_tokens").show(truncate=False)




+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [9]:
from pyspark.ml.feature import Word2Vec

# Define Word2Vec parameters
word2vec_params = {
    "vectorSize": 100,
    "minCount": 5,
}

# Define a function to fit Word2Vec model and transform the DataFrame
def embed_with_word2vec(input_df, input_col, output_col):
    # Check if the output column already exists in the DataFrame
    if output_col not in input_df.columns:
        # Create a Word2Vec instance
        word2vec = Word2Vec(inputCol=input_col, outputCol=output_col, **word2vec_params)
        
        # Fit the Word2Vec model to the data
        model = word2vec.fit(input_df)
        
        # Transform the DataFrame using the trained model
        input_df = model.transform(input_df)
    
    return input_df

# Embed train_data for "article" and "highlight"
train_data = embed_with_word2vec(train_data, "filtered_article_tokens", "article_word_vectors")
train_data = embed_with_word2vec(train_data, "filtered_highlight_tokens", "highlight_word_vectors")

# Embed test_data for "article" and "highlight"
test_data = embed_with_word2vec(test_data, "filtered_article_tokens", "article_word_vectors")
test_data = embed_with_word2vec(test_data, "filtered_highlight_tokens", "highlight_word_vectors")

# Embed validation_data for "article" and "highlight"
validation_data = embed_with_word2vec(validation_data, "filtered_article_tokens", "article_word_vectors")
validation_data = embed_with_word2vec(validation_data, "filtered_highlight_tokens", "highlight_word_vectors")


Traceback (most recent call last):                                              
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

In [10]:
# Check the schema of the DataFrame
validation_data.printSchema()

root
 |-- article: string (nullable = true)
 |-- highlights: string (nullable = true)
 |-- article_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- highlight_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_article_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_highlight_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- article_word_vectors: vector (nullable = true)
 |-- highlight_word_vectors: vector (nullable = true)



In [11]:
# Check if word vectors are present in train_data
if "article_word_vectors" in train_data.columns and "highlight_word_vectors" in train_data.columns:
    print("Word vectors are present in train_data.")
else:
    print("Word vectors are not present in train_data.")

# Check if word vectors are present in test_data
if "article_word_vectors" in test_data.columns and "highlight_word_vectors" in test_data.columns:
    print("Word vectors are present in test_data.")
else:
    print("Word vectors are not present in test_data.")

# Check if word vectors are present in validation_data
if "article_word_vectors" in validation_data.columns and "highlight_word_vectors" in validation_data.columns:
    print("Word vectors are present in validation_data.")
else:
    print("Word vectors are not present in validation_data.")


Word vectors are present in train_data.
Word vectors are present in test_data.
Word vectors are present in validation_data.


In [None]:
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Define the maximum length for padding
max_length = optimal_length

# Define a function to pad sequences
def pad_sequence(tokens, length=max_length):
    if len(tokens) < length:
        return tokens + ['<PAD>'] * (length - len(tokens))
    else:
        return tokens[:length]

# Create a user-defined function (UDF) for padding
pad_udf = udf(pad_sequence, ArrayType(StringType()))

# Apply padding to the train data
train_data = train_data.withColumn("padded_article_tokens", pad_udf(train_data["filtered_article_tokens"]))
train_data = train_data.withColumn("padded_highlight_tokens", pad_udf(train_data["filtered_highlight_tokens"]))

# Apply padding to the test data
test_data = test_data.withColumn("padded_article_tokens", pad_udf(test_data["filtered_article_tokens"]))
test_data = test_data.withColumn("padded_highlight_tokens", pad_udf(test_data["filtered_highlight_tokens"]))

# Apply padding to the validation data
validation_data = validation_data.withColumn("padded_article_tokens", pad_udf(validation_data["filtered_article_tokens"]))
validation_data = validation_data.withColumn("padded_highlight_tokens", pad_udf(validation_data["filtered_highlight_tokens"]))

# Convert padded sequences to NumPy arrays
train_article_sequences = np.array(train_data.select("padded_article_tokens").collect())
train_highlight_sequences = np.array(train_data.select("padded_highlight_tokens").collect())

test_article_sequences = np.array(test_data.select("padded_article_tokens").collect())
test_highlight_sequences = np.array(test_data.select("padded_highlight_tokens").collect())

val_article_sequences = np.array(validation_data.select("padded_article_tokens").collect())
val_highlight_sequences = np.array(validation_data.select("padded_highlight_tokens").collect())

# Verify shapes and data types
print("Train Article Sequences Shape:", train_article_sequences.shape)
print("Train Highlight Sequences Shape:", train_highlight_sequences.shape)

print("Test Article Sequences Shape:", test_article_sequences.shape)
print("Test Highlight Sequences Shape:", test_highlight_sequences.shape)

print("Validation Article Sequences Shape:", val_article_sequences.shape)
print("Validation Highlight Sequences Shape:", val_highlight_sequences.shape)


In [13]:
from pyspark.sql.functions import size

# Calculate the lengths of arrays in the "filtered_article_tokens" column
lengths = train_data.withColumn("token_length", size(train_data["filtered_article_tokens"])).select("token_length").rdd.flatMap(lambda x: x).collect()

# Sort the array lengths
sorted_lengths = sorted(lengths)

# Find the array length corresponding to the 95th percentile
threshold = 0.95
index = int(threshold * len(sorted_lengths))
optimal_length = sorted_lengths[index]

print(f"Optimal array length (at the 95th percentile): {optimal_length}")


[Stage 77:>                                                         (0 + 1) / 1]

Optimal array length (at the 95th percentile): 847


                                                                                

In [140]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Define the maximum length for padding
max_length = optimal_length

# Define a function to pad sequences
def pad_sequence(tokens, length=max_length):
    if len(tokens) < length:
        return tokens + ['<PAD>'] * (length - len(tokens))
    else:
        return tokens[:length]

# Create a user-defined function (UDF) for padding
pad_udf = udf(pad_sequence, ArrayType(StringType()))

# Apply padding to the train data
train_data = train_data.withColumn("padded_article_tokens", pad_udf(train_data["filtered_article_tokens"]))
train_data = train_data.withColumn("padded_highlight_tokens", pad_udf(train_data["filtered_highlight_tokens"]))

# Apply padding to the test data
test_data = test_data.withColumn("padded_article_tokens", pad_udf(test_data["filtered_article_tokens"]))
test_data = test_data.withColumn("padded_highlight_tokens", pad_udf(test_data["filtered_highlight_tokens"]))

# Apply padding to the validation data
validation_data = validation_data.withColumn("padded_article_tokens", pad_udf(validation_data["filtered_article_tokens"]))
validation_data = validation_data.withColumn("padded_highlight_tokens", pad_udf(validation_data["filtered_highlight_tokens"]))

In [54]:
# Select the "padded_article_tokens" column and extract lengths
distinct_lengths = train_data.select("padded_article_tokens") \
                            .rdd \
                            .map(lambda x: len(x[0])) \
                            .distinct() \
                            .collect()

# Print the distinct lengths
print("Distinct Padded Article Token Lengths:", distinct_lengths)

# Select the "padded_article_tokens" column and retrieve the first row
sample_padded_tokens = train_data.select("padded_article_tokens").first()[0]

# Print the contents of the "padded_article_tokens" for the first row
print(sample_padded_tokens)

                                                                                

Distinct Padded Article Token Lengths: [847]




['', '', 'associated', 'press', '', '', 'published', '', '', '', '14', '11', 'est', '', '25', 'october', '2013', '', '', '', '', '', '', 'updated', '', '', '', '15', '36', 'est', '', '25', 'october', '2013', '', '', 'bishop', 'fargo', 'catholic', 'diocese', 'north', 'dakota', 'exposed', 'potentially', 'hundreds', 'church', 'members', 'fargo', '', 'grand', 'forks', 'jamestown', 'hepatitis', 'virus', 'late', 'september', 'early', 'october', '', 'state', 'health', 'department', 'issued', 'advisory', 'exposure', 'anyone', 'attended', 'five', 'churches', 'took', 'communion', '', 'bishop', 'john', 'folda', '', 'pictured', '', 'fargo', 'catholic', 'diocese', 'north', 'dakota', 'exposed', 'potentially', 'hundreds', 'church', 'members', 'fargo', '', 'grand', 'forks', 'jamestown', 'hepatitis', '', '', 'state', 'immunization', 'program', 'manager', 'molly', 'howell', 'says', 'risk', 'low', '', 'officials', 'feel', 'important', 'alert', 'people', 'possible', 'exposure', '', 'diocese', 'announced',

                                                                                

In [141]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Embedding

In [142]:
# Extract the distinct tokens from the "filtered_article_tokens" column across the entire training dataset
article_vocab = train_data.rdd.flatMap(lambda x: x['filtered_article_tokens']).distinct().collect()

# Calculate the size of the unique vocabulary for articles
article_vocab_size = len(article_vocab)

# Extract the distinct tokens from the "filtered_highlight_tokens" column across the entire training dataset
highlight_vocab = train_data.rdd.flatMap(lambda x: x['filtered_highlight_tokens']).distinct().collect()

# Calculate the size of the unique vocabulary for highlights
highlight_vocab_size = len(highlight_vocab)

# Print the vocabulary sizes
print(f"Article Vocabulary Size: {article_vocab_size}")
print(f"Highlight Vocabulary Size: {highlight_vocab_size}")


[Stage 457:>                                                        (0 + 1) / 1]

Article Vocabulary Size: 24547
Highlight Vocabulary Size: 4565


                                                                                

In [143]:
# Define embedding dimensions
embedding_dim = 100  # You can adjust this based on your Word2Vec embeddings

# Create embedding layers for articles and highlights
article_embedding_layer = Embedding(input_dim=article_vocab_size, output_dim=embedding_dim, input_length=max_length, name="article_embedding")
highlight_embedding_layer = Embedding(input_dim=highlight_vocab_size, output_dim=embedding_dim, input_length=max_length, name="highlight_embedding")


In [144]:
# Define the input layers for articles and highlights
article_input = Input(shape=(max_length,), name="article_input")
highlight_input = Input(shape=(max_length,), name="highlight_input")

# Apply embedding layers to inputs
article_embedding = article_embedding_layer(article_input)
highlight_embedding = highlight_embedding_layer(highlight_input)

# Define the encoder LSTM layer
encoder_lstm = LSTM(256, return_state=True)
_, state_h, state_c = encoder_lstm(article_embedding)

# Use the encoder states as initial states for the decoder LSTM layer
decoder_lstm = LSTM(256, return_sequences=True, return_state=True)
decoder_output, _, _ = decoder_lstm(highlight_embedding, initial_state=[state_h, state_c])

# Define the dense layer for output
output_layer = Dense(highlight_vocab_size, activation="softmax")
output = output_layer(decoder_output)

# Create the seq2seq model
model = Model([article_input, highlight_input], output)

# Compile the model
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

# Print the model summary
model.summary()


Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 article_input (InputLayer)     [(None, 847)]        0           []                               
                                                                                                  
 highlight_input (InputLayer)   [(None, 847)]        0           []                               
                                                                                                  
 article_embedding (Embedding)  (None, 847, 100)     2454700     ['article_input[0][0]']          
                                                                                                  
 highlight_embedding (Embedding  (None, 847, 100)    456500      ['highlight_input[0][0]']        
 )                                                                                          

In [149]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np

In [150]:
# Assuming train_highlights is a list of text summaries
tokenizer = Tokenizer(num_words=highlight_vocab_size, oov_token="<OOV>")
tokenizer.fit_on_texts(train_highlights)
train_highlight_sequences = tokenizer.texts_to_sequences(train_highlights)
train_summary_one_hot_encoded = tokenizer.sequences_to_matrix(train_highlight_sequences, mode="binary")


In [151]:
# Pad sequences if necessary (you've already done this for the article sequences)
max_summary_length = max(len(seq) for seq in train_highlight_sequences)
train_summary_one_hot_encoded = pad_sequences(train_summary_one_hot_encoded, maxlen=max_summary_length, padding="post")

In [154]:
# Convert the 'filtered_article_tokens' column to a list of strings
article_tokens_list = validation_data.select("filtered_article_tokens").rdd.flatMap(lambda x: x).collect()


                                                                                

In [158]:
# Convert the 'filtered_highlight_tokens' column to a list of strings
highlight_tokens_list = validation_data.select("filtered_highlight_tokens").rdd.flatMap(lambda x: x).collect()

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [161]:
# Tokenize and pad the highlight sequences for validation data
val_highlight_sequences = tokenizer.texts_to_sequences(highlight_tokens_list)
val_highlight_sequences = pad_sequences(val_highlight_sequences, maxlen=max_length, padding='post')

In [162]:
# Tokenize and pad the article sequences for validation data
val_article_sequences = tokenizer.texts_to_sequences(article_tokens_list)
val_article_sequences = pad_sequences(val_article_sequences, maxlen=max_length, padding='post')

In [164]:
from keras.utils import to_categorical

# Assuming you have a DataFrame called 'validation_data' with a column 'filtered_highlight_tokens'
# Convert the 'filtered_highlight_tokens' column to a list of strings
highlight_tokens_list = validation_data.select("filtered_highlight_tokens").rdd.flatMap(lambda x: x).collect()

# Tokenize and pad the highlight sequences for validation data
val_highlight_sequences = tokenizer.texts_to_sequences(highlight_tokens_list)
val_highlight_sequences = pad_sequences(val_highlight_sequences, maxlen=max_length, padding='post')

# One-hot encode the validation summaries
val_summary_one_hot_encoded = to_categorical(val_highlight_sequences, num_classes=highlight_vocab_size)


Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [170]:
num_epochs = 10  # You can adjust the number of epochs as needed
batch_size = 32  # You can adjust the batch size as needed

import numpy as np

# Convert input data to NumPy arrays
train_article_sequences = np.array(train_article_sequences)
train_highlight_sequences = np.array(train_highlight_sequences)

# Ensure output data is also in NumPy array format
train_summary_one_hot_encoded = np.array(train_summary_one_hot_encoded)

# Train your model
model.fit(
    [train_article_sequences, train_highlight_sequences],  # Input data
    train_summary_one_hot_encoded,  # Output data
    validation_data=([val_article_sequences, val_highlight_sequences], val_summary_one_hot_encoded),
    epochs=num_epochs,
    batch_size=batch_size
)



ValueError: Failed to convert a NumPy array to a Tensor (Unsupported object type list).

In [147]:
import gensim
from gensim.models import Word2Vec


In [145]:
from pyspark.ml.feature import Word2VecModel
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType

# Load the Word2Vec models for articles and highlights
article_word2vec_model = Word2VecModel.load("article_word2vec_model_path")  # Replace with the actual path
highlight_word2vec_model = Word2VecModel.load("highlight_word2vec_model_path")  # Replace with the actual path

# Define UDFs for converting tokens to word indices
article_tokens_to_indices_udf = udf(lambda tokens: [article_word2vec_model.getVectors().indexWhere(lambda v: v.word == token) for token in tokens], ArrayType(IntegerType()))
highlight_tokens_to_indices_udf = udf(lambda tokens: [highlight_word2vec_model.getVectors().indexWhere(lambda v: v.word == token) for token in tokens], ArrayType(IntegerType()))

# Apply UDFs to the data
train_data = train_data.withColumn("article_word_indices", article_tokens_to_indices_udf(train_data["filtered_article_tokens"]))
train_data = train_data.withColumn("highlight_word_indices", highlight_tokens_to_indices_udf(train_data["filtered_highlight_tokens"]))

test_data = test_data.withColumn("article_word_indices", article_tokens_to_indices_udf(test_data["filtered_article_tokens"]))
test_data = test_data.withColumn("highlight_word_indices", highlight_tokens_to_indices_udf(test_data["filtered_highlight_tokens"]))

validation_data = validation_data.withColumn("article_word_indices", article_tokens_to_indices_udf(validation_data["filtered_article_tokens"]))
validation_data = validation_data.withColumn("highlight_word_indices", highlight_tokens_to_indices_udf(validation_data["filtered_highlight_tokens"]))


Py4JJavaError: An error occurred while calling o4748.load.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hduser/article_word2vec_model_path/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1428)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
	at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1463)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1463)
	at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:587)
	at org.apache.spark.ml.feature.Word2VecModel$Word2VecModelReader.load(Word2Vec.scala:407)
	at org.apache.spark.ml.feature.Word2VecModel$Word2VecModelReader.load(Word2Vec.scala:399)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Input path does not exist: hdfs://localhost:9000/user/hduser/article_word2vec_model_path/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 35 more


In [19]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Embedding

def build_seq2seq_model(article_vocab_size, highlight_vocab_size, embedding_dim, units):
    # Encoder
    encoder_inputs = Input(shape=(None,))
    encoder_embedding = Embedding(article_vocab_size, embedding_dim)(encoder_inputs)
    encoder_lstm = LSTM(units, return_state=True)
    _, state_h, state_c = encoder_lstm(encoder_embedding)
    encoder_states = [state_h, state_c]

    # Decoder
    decoder_inputs = Input(shape=(None,))
    decoder_embedding = Embedding(highlight_vocab_size, embedding_dim)
    decoder_lstm = LSTM(units, return_sequences=True, return_state=True)
    decoder_lstm_out, _, _ = decoder_lstm(decoder_embedding(decoder_inputs), initial_state=encoder_states)
    decoder_dense = Dense(highlight_vocab_size, activation='softmax')
    decoder_outputs = decoder_dense(decoder_lstm_out)

    model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
    return model


In [20]:
model = build_seq2seq_model(article_vocab_size, highlight_vocab_size, embedding_dim=256, units=512)

2023-10-06 09:38:32.795032: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-10-06 09:38:32.795396: W tensorflow/stream_executor/cuda/cuda_driver.cc:263] failed call to cuInit: UNKNOWN ERROR (303)
2023-10-06 09:38:32.795442: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (muhammad-VM): /proc/driver/nvidia/version does not exist
2023-10-06 09:38:32.796476: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-10-06 09:38:32.886353: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 46826496 exceeds 10

In [21]:
model.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy')

In [22]:
import numpy as np

# Convert 'padded_article_tokens' from train_data to numpy array
train_article_list = train_data.select('padded_article_tokens').rdd.flatMap(lambda x: x).collect()
train_article_array = np.array(train_article_list)

                                                                                

In [23]:
# Convert 'padded_highlight_tokens' from train_data to numpy array
train_highlight_list = train_data.select('padded_highlight_tokens').rdd.flatMap(lambda x: x).collect()
train_highlight_array = np.array(train_highlight_list)

                                                                                

In [24]:
# Repeat for test_data
test_article_list = test_data.select('padded_article_tokens').rdd.flatMap(lambda x: x).collect()
test_article_array = np.array(test_article_list)

test_highlight_list = test_data.select('padded_highlight_tokens').rdd.flatMap(lambda x: x).collect()
test_highlight_array = np.array(test_highlight_list)

In [25]:
# Repeat for validation_data
validation_article_list = validation_data.select('padded_article_tokens').rdd.flatMap(lambda x: x).collect()
validation_article_array = np.array(validation_article_list)

validation_highlight_list = validation_data.select('padded_highlight_tokens').rdd.flatMap(lambda x: x).collect()
validation_highlight_array = np.array(validation_highlight_list)

In [27]:
print(train_article_array.dtype)
print(decoder_input_data.dtype)
print(decoder_target_data.dtype)

<U917
<U32
<U30


In [26]:
# Shift the highlight for "teacher forcing" technique
decoder_input_data = np.hstack([np.zeros((train_highlight_array.shape[0], 1)), train_highlight_array[:, :-1]])
decoder_target_data = train_highlight_array

# Train the model
batch_size = 64  # Adjust based on your machine's capability
epochs = 30  # Adjust based on desired training duration

history = model.fit(
    [train_article_array, decoder_input_data], decoder_target_data,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2
)


Epoch 1/30


2023-10-06 09:45:41.842500: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 46826496 exceeds 10% of free system memory.
2023-10-06 09:45:43.782770: W tensorflow/core/framework/op_kernel.cc:1757] OP_REQUIRES failed at cast_op.cc:121 : UNIMPLEMENTED: Cast string to float is not supported


UnimplementedError: Graph execution error:

Detected at node 'model/Cast' defined at (most recent call last):
    File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
      return _run_code(code, main_globals, None,
    File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
      exec(code, run_globals)
    File "/usr/lib/python3/dist-packages/ipykernel_launcher.py", line 16, in <module>
      app.launch_new_instance()
    File "/usr/lib/python3/dist-packages/traitlets/config/application.py", line 846, in launch_instance
      app.start()
    File "/usr/lib/python3/dist-packages/ipykernel/kernelapp.py", line 677, in start
      self.io_loop.start()
    File "/usr/lib/python3/dist-packages/tornado/platform/asyncio.py", line 199, in start
      self.asyncio_loop.run_forever()
    File "/usr/lib/python3.10/asyncio/base_events.py", line 603, in run_forever
      self._run_once()
    File "/usr/lib/python3.10/asyncio/base_events.py", line 1909, in _run_once
      handle._run()
    File "/usr/lib/python3.10/asyncio/events.py", line 80, in _run
      self._context.run(self._callback, *self._args)
    File "/usr/lib/python3/dist-packages/ipykernel/kernelbase.py", line 461, in dispatch_queue
      await self.process_one()
    File "/usr/lib/python3/dist-packages/ipykernel/kernelbase.py", line 450, in process_one
      await dispatch(*args)
    File "/usr/lib/python3/dist-packages/ipykernel/kernelbase.py", line 357, in dispatch_shell
      await result
    File "/usr/lib/python3/dist-packages/ipykernel/kernelbase.py", line 652, in execute_request
      reply_content = await reply_content
    File "/usr/lib/python3/dist-packages/ipykernel/ipkernel.py", line 353, in do_execute
      res = shell.run_cell(code, store_history=store_history, silent=silent)
    File "/usr/lib/python3/dist-packages/ipykernel/zmqshell.py", line 532, in run_cell
      return super().run_cell(*args, **kwargs)
    File "/usr/lib/python3/dist-packages/IPython/core/interactiveshell.py", line 2914, in run_cell
      result = self._run_cell(
    File "/usr/lib/python3/dist-packages/IPython/core/interactiveshell.py", line 2960, in _run_cell
      return runner(coro)
    File "/usr/lib/python3/dist-packages/IPython/core/async_helpers.py", line 78, in _pseudo_sync_runner
      coro.send(None)
    File "/usr/lib/python3/dist-packages/IPython/core/interactiveshell.py", line 3185, in run_cell_async
      has_raised = await self.run_ast_nodes(code_ast.body, cell_name,
    File "/usr/lib/python3/dist-packages/IPython/core/interactiveshell.py", line 3377, in run_ast_nodes
      if (await self.run_code(code, result,  async_=asy)):
    File "/usr/lib/python3/dist-packages/IPython/core/interactiveshell.py", line 3457, in run_code
      exec(code_obj, self.user_global_ns, self.user_ns)
    File "/tmp/ipykernel_3880/4274283197.py", line 9, in <module>
      history = model.fit(
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/utils/traceback_utils.py", line 65, in error_handler
      return fn(*args, **kwargs)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/engine/training.py", line 1564, in fit
      tmp_logs = self.train_function(iterator)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/engine/training.py", line 1160, in train_function
      return step_function(self, iterator)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/engine/training.py", line 1146, in step_function
      outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/engine/training.py", line 1135, in run_step
      outputs = model.train_step(data)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/engine/training.py", line 993, in train_step
      y_pred = self(x, training=True)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/utils/traceback_utils.py", line 65, in error_handler
      return fn(*args, **kwargs)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/engine/training.py", line 557, in __call__
      return super().__call__(*args, **kwargs)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/utils/traceback_utils.py", line 65, in error_handler
      return fn(*args, **kwargs)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/engine/base_layer.py", line 1097, in __call__
      outputs = call_fn(inputs, *args, **kwargs)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/utils/traceback_utils.py", line 96, in error_handler
      return fn(*args, **kwargs)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/engine/functional.py", line 510, in call
      return self._run_internal_graph(inputs, training=training, mask=mask)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/engine/functional.py", line 649, in _run_internal_graph
      y = self._conform_to_reference_input(y, ref_input=x)
    File "/home/hduser/.local/lib/python3.10/site-packages/keras/engine/functional.py", line 761, in _conform_to_reference_input
      tensor = tf.cast(tensor, dtype=ref_input.dtype)
Node: 'model/Cast'
Cast string to float is not supported
	 [[{{node model/Cast}}]] [Op:__inference_train_function_5775]

In [None]:
model.save('seq2seq_summary_model.h5')


In [None]:
# Create the test data for evaluation
decoder_input_test = np.hstack([np.zeros((test_highlight_array.shape[0], 1)), test_highlight_array[:, :-1]])
decoder_target_test = test_highlight_array

# Evaluate
test_loss = model.evaluate([test_article_array, decoder_input_test], decoder_target_test, batch_size=batch_size)
print(f"Test loss: {test_loss}")


In [103]:
# Hyperparameters
embedding_dim = 256
lstm_units = 512

In [104]:
# Define the encoder
encoder_inputs = Input(shape=(None,))
encoder_embedding = Embedding(input_dim=article_vocab_size+1, output_dim=embedding_dim)(encoder_inputs)
encoder_lstm = LSTM(lstm_units, return_state=True)
encoder_outputs, encoder_state_h, encoder_state_c = encoder_lstm(encoder_embedding)
encoder_states = [encoder_state_h, encoder_state_c]

2023-10-06 09:08:03.294164: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 46827520 exceeds 10% of free system memory.
2023-10-06 09:08:03.369852: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 46827520 exceeds 10% of free system memory.
2023-10-06 09:08:03.382234: W tensorflow/core/framework/cpu_allocator_impl.cc:82] Allocation of 46827520 exceeds 10% of free system memory.


In [105]:
# Define the decoder
decoder_inputs = Input(shape=(None,))
decoder_embedding = Embedding(input_dim=highlight_vocab_size+1, output_dim=embedding_dim)(decoder_inputs)
decoder_lstm = LSTM(lstm_units, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)
decoder_dense = Dense(highlight_vocab_size+1, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

In [106]:
# Construct the full model
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [107]:
# Compile the model
model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])

# Model Summary
model.summary()


Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_4 (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 input_5 (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 embedding_2 (Embedding)        (None, None, 256)    11706880    ['input_4[0][0]']                
                                                                                                  
 embedding_3 (Embedding)        (None, None, 256)    1293056     ['input_5[0][0]']                
                                                                                            

In [60]:
# Assuming `train_data` is your dataframe:

columns = train_data.columns
columns1 = test_data.columns
columns2 = validation_data.columns
print(columns)
print(columns1)
print(columns2)

['article', 'highlights', 'article_tokens', 'highlight_tokens', 'filtered_article_tokens', 'filtered_highlight_tokens', 'padded_article_tokens', 'padded_highlight_tokens']
['article', 'highlights', 'article_tokens', 'highlight_tokens', 'filtered_article_tokens', 'filtered_highlight_tokens', 'article_word_vectors', 'highlight_word_vectors', 'padded_article_tokens', 'padded_highlight_tokens']
['article', 'highlights', 'article_tokens', 'highlight_tokens', 'filtered_article_tokens', 'filtered_highlight_tokens', 'article_word_vectors', 'highlight_word_vectors', 'padded_article_tokens', 'padded_highlight_tokens']


In [81]:
# Check if word vectors are present in train_data
if "article_word_vectors" in train_data.columns and "highlight_word_vectors" in train_data.columns:
    print("Word vectors are present in train_data.")
else:
    print("Word vectors are not present in train_data.")

# Check if word vectors are present in test_data
if "article_word_vectors" in test_data.columns and "highlight_word_vectors" in test_data.columns:
    print("Word vectors are present in test_data.")
else:
    print("Word vectors are not present in test_data.")

# Check if word vectors are present in validation_data
if "article_word_vectors" in validation_data.columns and "highlight_word_vectors" in validation_data.columns:
    print("Word vectors are present in validation_data.")
else:
    print("Word vectors are not present in validation_data.")


Word vectors are not present in train_data.
Word vectors are not present in test_data.
Word vectors are not present in validation_data.
