In [1]:
sc

In [2]:
from pyspark import pandas as ps
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover, IDF, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import split, explode, col, lower



In [3]:
df = ps.read_csv('hdfs://localhost:9000/CA1/SMSSpamCollection.tsv',
                 sep='\t', 
                 header=None,
                 dtype=str)
df = df.rename(columns={0:'Catagory', 1:'Text'})




In [4]:
df.dtypes

Catagory    <U0
Text        <U0
dtype: object

In [5]:
df.head(5)

Unnamed: 0,Catagory,Text
0,ham,"Go until jurong point, crazy.. Available only ..."
1,ham,Ok lar... Joking wif u oni...
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...
3,ham,U dun say so early hor... U c already then say...
4,ham,"Nah I don't think he goes to usf, he lives aro..."


In [6]:
df.shape

(5574, 2)

In [7]:
#Check for nulls in the dataframe
null_df = df[df['Catagory'].isnull() | df['Text'].isnull()]
null_df

Unnamed: 0,Catagory,Text


In [8]:
#Get the length of each row of text
df['text_length'] = df['Text'].map(lambda x: len(x))
df

                                                                                

Unnamed: 0,Catagory,Text,text_length
0,ham,"Go until jurong point, crazy.. Available only ...",111
1,ham,Ok lar... Joking wif u oni...,29
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...,155
3,ham,U dun say so early hor... U c already then say...,49
4,ham,"Nah I don't think he goes to usf, he lives aro...",61
5,spam,FreeMsg Hey there darling it's been 3 week's n...,147
6,ham,Even my brother is not like to speak with me. ...,77
7,ham,As per your request 'Melle Melle (Oru Minnamin...,160
8,spam,WINNER!! As a valued network customer you have...,157
9,spam,Had your mobile 11 months or more? U R entitle...,154


In [9]:
mean_len_by_catagory = df.groupby('Catagory').mean()
mean_len_by_catagory

                                                                                

Unnamed: 0_level_0,text_length
Catagory,Unnamed: 1_level_1
ham,71.454319
spam,138.670683


In [10]:
df.dtypes

Catagory         <U0
Text             <U0
text_length    int64
dtype: object

In [11]:
tokenizer = Tokenizer(inputCol = 'Text', outputCol = 'token_text')
stop_remove = StopWordsRemover(inputCol = 'token_text', outputCol = 'stop_token')
count_vect = CountVectorizer(inputCol = 'stop_token', outputCol = 'count_vect')
idf = IDF(inputCol = 'count_vect', outputCol = 'tf_idf')
label_indexer = StringIndexer(inputCol = 'Catagory', outputCol = 'label')
scaler = StandardScaler(inputCol='text_length', outputCol='scaled_text_len')

In [12]:
clean_up = VectorAssembler(inputCols = ["tf_idf"], outputCol = "features")

In [13]:
preprocess_pipeline = Pipeline(stages = [tokenizer, stop_remove, count_vect, idf, label_indexer, clean_up])

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession(sc)

customSchema = StructType([
    StructField("Catagory", StringType(), True), 
    StructField("Text", StringType(), True), 
    StructField("text_length", IntegerType(), True)])

#Working around an annoying error creating a series of tuples from rows and using that to create the spark dataframe
tuples_list = [tuple(x) for x in df.itertuples(index=False)]

spark_df = spark.createDataFrame(tuples_list, schema = customSchema)

#spark_df = spark.createDataFrame([1.0, 2.0, 3.0], FloatType())

[Stage 12:>                                                         (0 + 1) / 1]

In [15]:
spark_df.show()

[Stage 13:>                                                         (0 + 1) / 1]

+--------+--------------------+-----------+
|Catagory|                Text|text_length|
+--------+--------------------+-----------+
|     ham|Go until jurong p...|        111|
|     ham|Ok lar... Joking ...|         29|
|    spam|Free entry in 2 a...|        155|
|     ham|U dun say so earl...|         49|
|     ham|Nah I don't think...|         61|
|    spam|FreeMsg Hey there...|        147|
|     ham|Even my brother i...|         77|
|     ham|As per your reque...|        160|
|    spam|WINNER!! As a val...|        157|
|    spam|Had your mobile 1...|        154|
|     ham|I'm gonna be home...|        109|
|    spam|SIX chances to wi...|        136|
|    spam|URGENT! You have ...|        155|
|     ham|I've been searchi...|        196|
|     ham|I HAVE A DATE ON ...|         35|
|    spam|XXXMobileMovieClu...|        149|
|     ham|Oh k...i'm watchi...|         26|
|     ham|Eh u remember how...|         81|
|     ham|Fine if thats th...|         56|
|    spam|England v Macedon...| 

                                                                                

In [16]:
df_tokens = tokenizer.transform(spark_df)
df_tokens_exploded = df_tokens.select(explode('token_text').alias('tokens'))
df_tokens_cleaned = df_tokens_exploded.withColumn('tokens', lower(col('tokens'))).where(col('tokens').rlike('\\w'))
calc_num_embedding = df_tokens_cleaned.select('tokens').distinct().count()
calc_num_embedding

                                                                                

13508

In [17]:
cleaner = preprocess_pipeline.fit(spark_df)
clean_df = cleaner.transform(spark_df)
clean_df = clean_df[['features', 'label']]
clean_df.show()

                                                                                

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(13423,[7,11,31,6...|  0.0|
|(13423,[0,24,299,...|  0.0|
|(13423,[2,13,19,3...|  1.0|
|(13423,[0,70,80,1...|  0.0|
|(13423,[36,134,31...|  0.0|
|(13423,[10,60,140...|  1.0|
|(13423,[10,53,104...|  0.0|
|(13423,[127,184,4...|  0.0|
|(13423,[1,47,118,...|  1.0|
|(13423,[0,1,13,27...|  1.0|
|(13423,[18,43,119...|  0.0|
|(13423,[8,16,37,8...|  1.0|
|(13423,[13,30,47,...|  1.0|
|(13423,[38,95,222...|  0.0|
|(13423,[548,1665,...|  0.0|
|(13423,[30,109,11...|  1.0|
|(13423,[82,216,36...|  0.0|
|(13423,[0,2,49,13...|  0.0|
|(13423,[0,74,105,...|  0.0|
|(13423,[4,30,33,5...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [18]:

def min_max_batch_size(dataset, start_number, max_number):
    storage = []
    pass_count = 0
    
    dataset_count = dataset.count()
    
    while start_number <= max_number or pass_count < 2:
        if dataset_count % start_number == 0:
            pass_count = 0
            storage.append(start_number)
            start_number = start_number * 2
        else:
            pass_count += 1
            pass
    return storage
        
#batch_list = min_max_batch_size(clean_df, start_number=2, max_number=128)
#batch_list

In [23]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset

In [31]:
import pandas as pd
import numpy as np

pandas_df = clean_df.toPandas()

print('List Values')
features = pandas_df['features'].values
labels = pandas_df['label'].values.astype(np.int64)

print('create array')
feature_array = np.array([v.toArray() for v in pandas_df['features']], dtype=np.float32)
labels_array = labels

print('Create tensors')
feature_tensor = torch.from_numpy(feature_array)
labels_tensor = torch.from_numpy(labels_array)

dataset = TensorDataset(feature_tensor, labels_tensor)

dataset

[Stage 33:>                                                         (0 + 2) / 2]                                                                                

List Values
create array
Create tensors


<torch.utils.data.dataset.TensorDataset at 0x746047c96d40>

In [33]:
class TextClassifier(nn.Module):
    def __init__(self):
        super(TextClassifier, self).__init__()
        self.fc1 = nn.Linear(len(features[0]), 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 2)
        
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    
model = TextClassifier()

crieterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

batch_size = 2

dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

epochs = 10

for epoch in range(epochs):
    for inputs, labels in dataloader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = crieterion(outputs, labels)
        loss.backward()
        optimizer.step()
    print(f'Epoch {epoch+1}/{epochs}, Loss:{loss.item()}')

Epoch 1/10, Loss:0.003964816220104694
Epoch 2/10, Loss:8.344643447344424e-07
Epoch 3/10, Loss:8.106166205834597e-06
Epoch 4/10, Loss:5.9604641222676946e-08
Epoch 5/10, Loss:0.0
Epoch 6/10, Loss:0.0
Epoch 7/10, Loss:0.0
Epoch 8/10, Loss:0.0
Epoch 9/10, Loss:0.0
Epoch 10/10, Loss:0.0


In [37]:
model.eval()

with torch.no_grad():
    outputs = model(feature_tensor)
    _, predicted = torch.max(outputs, 1)
    
correct = (predicted == labels_tensor).sum().item()
total = labels_tensor.size(0)
accuracy = correct / total
print('Accuracy:', accuracy)

Accuracy: 1.0
