In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
import string
import pickle

In [9]:


# 2.1 Start Spark
spark = SparkSession.builder \
    .appName("AGNewsTextClassification") \
    .getOrCreate()

# 2.2 Define the schema
schema = StructType([
    StructField("label",       IntegerType(), nullable=False),
    StructField("title",       StringType(),  nullable=True),
    StructField("description", StringType(),  nullable=True),
])

# 2.3 Read the CSVs
train_df = spark.read.csv(
    "D:/AIML/data/ag_news_train.csv",
    schema=schema,
    header=False
)
test_df = spark.read.csv(
    "D:/AIML/data/ag_news_test.csv",
    schema=schema,
    header=False
)

# 2.4 Quick sanity-check
print("Train set count:", train_df.count())
train_df.show(5, truncate=100)

print("Test set count:", test_df.count())
test_df.show(5, truncate=100)


Train set count: 120000
+-----+-------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
|label|                                                                    title|                                                                                         description|
+-----+-------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
|    3|                        Wall St. Bears Claw Back Into the Black (Reuters)|      Reuters - Short-sellers, Wall Street's dwindling\band of ultra-cynics, are seeing green again.|
|    3|                      Carlyle Looks Toward Commercial Aerospace (Reuters)|Reuters - Private investment firm Carlyle Group,\which has a reputation for making well-timed and...|
|    3|                          Oil and Economy Cloud Stocks

In [10]:
# (train_df.groupby("label").count()).show()
train_df.groupBy("label").count().orderBy("label").show()

+-----+-----+
|label|count|
+-----+-----+
|    1|30000|
|    2|30000|
|    3|30000|
|    4|30000|
+-----+-----+



In [11]:
# def preprocess_text(text):
#     text = text.lower()
#     stop_word = ["false", "true"]
#     text = text.replace('\n', ' ').replace('\\u', ' ').replace('\\', ' ')
#     text = text.replace('  ', ' ')
#     text = text.translate(str.maketrans('','',string.punctuation))

#     result_words = ' '.join([x for x in text.split() if len(x) >=1 and x in stop_word])
#     return result_words
# preprocess_text_udf = udf(preprocess_text, StringType())

# train_text_df = train_df.withColumn('preprocessed_text', preprocess_text_udf('description'))


# print(train_text_df.columns)
# print("Train set count:", train_text_df.count())
# train_text_df.show(5)

In [12]:
from pyspark.sql.functions import col, lower, regexp_replace, trim

train_text_df = train_df.withColumn(
    "preprocessed_text",
    trim(
      # collapse multiple spaces after all replacements
      regexp_replace(
        # remove the words “false” or “true”
        regexp_replace(
          # collapse repeating whitespace
          regexp_replace(
            # remove all punctuation (anything not a word character or whitespace)
            regexp_replace(lower(col("description")),
                           r"[^\w\s]", " "),
            r"\s+", " "
          ),
          r"\b(false|true)\b", ""
        ),
        r"\s+", " "
      )
    )
)

test_text_df = test_df.withColumn(
    "preprocessed_text",
    trim(
      # collapse multiple spaces after all replacements
      regexp_replace(
        # remove the words “false” or “true”
        regexp_replace(
          # collapse repeating whitespace
          regexp_replace(
            # remove all punctuation (anything not a word character or whitespace)
            regexp_replace(lower(col("description")),
                           r"[^\w\s]", " "),
            r"\s+", " "
          ),
          r"\b(false|true)\b", ""
        ),
        r"\s+", " "
      )
    )
)

train_text_df.select("description", "preprocessed_text").show(5, truncate=80)
test_text_df.select("description", "preprocessed_text").show(5, truncate=80)


+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|                                                                     description|                                                               preprocessed_text|
+--------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|Reuters - Short-sellers, Wall Street's dwindling\band of ultra-cynics, are se...|reuters short sellers wall street s dwindling band of ultra cynics are seeing...|
|Reuters - Private investment firm Carlyle Group,\which has a reputation for m...|reuters private investment firm carlyle group which has a reputation for maki...|
|Reuters - Soaring crude prices plus worries\about the economy and the outlook...|reuters soaring crude prices plus worries about the economy and the outlook f...|
|Reuters - Autho

In [13]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import ArrayType, StringType

# # 0) Define your maximum token length
# MAX_LEN = 512

# # 1) Define the splitting + padding/truncation function
# def get_split(text):
#     text_split = text.split()
#     if len(text_split) <= MAX_LEN:
#         text_split = text_split[:MAX_LEN] + ["BLANK"] * (MAX_LEN - len(text_split[:MAX_LEN]))
#         return text_split
#     else:
#         text_list = []
#         text_list.extend(text_split[:int(MAX_LEN/2)])
#         text_list.extend(text_split[-int(MAX_LEN/2):])

#     return text_list

# # 2) Wrap it as a Spark UDF returning ArrayType(StringType)
# get_split_udf = udf(get_split, ArrayType(StringType()))

# # 3) Apply to your DataFrames
# #    (assumes you already have `train_text_df`, `val_text_df`, `test_text_df`
# #     and each has a column "preprocessed_text" of type String)
# train_text_df = train_text_df.withColumn(
#     "tokenized_text",
#     get_split_udf("preprocessed_text")
# )
# test_text_df = test_text_df.withColumn(
#     "tokenized_text",
#     get_split_udf("preprocessed_text")
# )

# # 4) Peek at the results
# train_text_df.select("preprocessed_text", "tokenized_text").show(5, truncate=50)


In [None]:
from pyspark.sql.functions import (
    split, size, when, array_repeat, slice, col,
    concat, lit
)

MAX_LEN = 512
half    = MAX_LEN // 2

train_text_df = train_text_df.withColumn("tokens", split(col("preprocessed_text"), r"\s+")).withColumn("tokenized_text",
    when(
      size(col("tokens")) <= MAX_LEN,
      # pad the tokens array up to MAX_LEN with "BLANK"
      concat(
        col("tokens"),
        array_repeat(lit("BLANK"), MAX_LEN - size(col("tokens")))
      )
    ).otherwise(
      # if too long, take first half and last half
      concat(
        slice(col("tokens"), 1, half),
        slice(col("tokens"), -half, half)
      )
    )
  )

# inspect
train_text_df.select("preprocessed_text","tokenized_text").show(5, truncate=50)



test_text_df = test_text_df.withColumn("tokens", split(col("preprocessed_text"), r"\s+")).withColumn("tokenized_text",
    when(
      size(col("tokens")) <= MAX_LEN,
      # pad the tokens array up to MAX_LEN with "BLANK"
      concat(
        col("tokens"),
        array_repeat(lit("BLANK"), MAX_LEN - size(col("tokens")))
      )
    ).otherwise(
      # if too long, take first half and last half
      concat(
        slice(col("tokens"), 1, half),
        slice(col("tokens"), -half, half)
      )
    )
  )

# inspect
test_text_df.select("preprocessed_text","tokenized_text").show(5, truncate=50)

+--------------------------------------------------+--------------------------------------------------+
|                                 preprocessed_text|                                    tokenized_text|
+--------------------------------------------------+--------------------------------------------------+
|reuters short sellers wall street s dwindling b...|[reuters, short, sellers, wall, street, s, dwin...|
|reuters private investment firm carlyle group w...|[reuters, private, investment, firm, carlyle, g...|
|reuters soaring crude prices plus worries about...|[reuters, soaring, crude, prices, plus, worries...|
|reuters authorities have halted oil export flow...|[reuters, authorities, have, halted, oil, expor...|
|afp tearaway world oil prices toppling records ...|[afp, tearaway, world, oil, prices, toppling, r...|
+--------------------------------------------------+--------------------------------------------------+
only showing top 5 rows

+--------------------------------------

In [17]:
train_text_df.dtypes

[('label', 'int'),
 ('title', 'string'),
 ('description', 'string'),
 ('preprocessed_text', 'string'),
 ('tokens', 'array<string>'),
 ('tokenized_text', 'array<string>')]

In [None]:
text_indexer_path = "D:/AIML/data/glove.42B.300d.pickle"

with open(text_indexer_path, 'rb') as handle:
    word_index = pickle.load(handle)

with open("D:/AIML/data/text_mapping.pickle", 'wb') as handle:
    pickle.dump(word_index, handle, protocol=pickle.HIGHEST_PROTOCOL)
np.save("D:/AIML/data/text_mapping.npy", word_index, allow_pickle=True)

FileNotFoundError: [Errno 2] No such file or directory: 'D:/AIML/data/glove.42B.300d.pickle'