In [1]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.2.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.2.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.2.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/114 kB 12%] [Connected to cloud.r                                                                               Hit:2 http://archive.ubuntu.com/ubuntu focal InRelease
0% [Waiting for headers] [1 InRelease 113 kB/114 kB 99%] [Connected to cloud.r-                                                                               Get:3 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
0% [3 InRelease 33.0 kB/114 kB 29%] [1 InRelease 113 kB/114 kB 99%] [Connected 0% [3 InRelease 43.1 kB/114 kB 38%] [Connected to cloud.r-project.org (108.138.                                                                               Hit:4 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
Get:5 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu f

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("basictestdata").getOrCreate()

In [3]:
from pyspark import SparkFiles
# url ="https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.1/22-big-data/day_2/yelp_reviews.csv"
spark.sparkContext.addFile("Resources/basictestdata.csv")
df = spark.read.csv(SparkFiles.get("basictestdata.csv"), sep=",", header=False)

# Show DataFrame
df.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
|spam|        this is spam|
| ham|         this is ham|
| ham|         ham is good|
|spam|         spam is bad|
|spam|most spam is irri...|
|spam|some spam is dang...|
| ham|         ham is nice|
| ham|ham comes from re...|
|spam|        no more spam|
| ham|       need more ham|
|spam|           stop spam|
| ham|         send me ham|
+----+--------------------+



In [4]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['_c1']))
data_df.show()

+----+--------------------+------+
| _c0|                 _c1|length|
+----+--------------------+------+
|spam|        this is spam|    12|
| ham|         this is ham|    11|
| ham|         ham is good|    11|
|spam|         spam is bad|    11|
|spam|most spam is irri...|    23|
|spam|some spam is dang...|    22|
| ham|         ham is nice|    11|
| ham|ham comes from re...|    28|
|spam|        no more spam|    12|
| ham|       need more ham|    13|
|spam|           stop spam|     9|
| ham|         send me ham|    11|
+----+--------------------+------+



In [5]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [6]:
tokenizer = Tokenizer(inputCol="_c1", outputCol="words")
tokenizer

Tokenizer_08bb81d452d1

In [7]:
tokenized = tokenizer.transform(data_df)
tokenized.show(truncate=False)

+----+----------------------------+------+----------------------------------+
|_c0 |_c1                         |length|words                             |
+----+----------------------------+------+----------------------------------+
|spam|this is spam                |12    |[this, is, spam]                  |
|ham |this is ham                 |11    |[this, is, ham]                   |
|ham |ham is good                 |11    |[ham, is, good]                   |
|spam|spam is bad                 |11    |[spam, is, bad]                   |
|spam|most spam is irritating     |23    |[most, spam, is, irritating]      |
|spam|some spam is dangerous      |22    |[some, spam, is, dangerous]       |
|ham |ham is nice                 |11    |[ham, is, nice]                   |
|ham |ham comes from real contacts|28    |[ham, comes, from, real, contacts]|
|spam|no more spam                |12    |[no, more, spam]                  |
|ham |need more ham               |13    |[need, more, ham]     

In [8]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

In [9]:
removed_df = remover.transform(tokenized)
removed_df.show()

+----+--------------------+------+--------------------+--------------------+
| _c0|                 _c1|length|               words|            filtered|
+----+--------------------+------+--------------------+--------------------+
|spam|        this is spam|    12|    [this, is, spam]|              [spam]|
| ham|         this is ham|    11|     [this, is, ham]|               [ham]|
| ham|         ham is good|    11|     [ham, is, good]|         [ham, good]|
|spam|         spam is bad|    11|     [spam, is, bad]|         [spam, bad]|
|spam|most spam is irri...|    23|[most, spam, is, ...|  [spam, irritating]|
|spam|some spam is dang...|    22|[some, spam, is, ...|   [spam, dangerous]|
| ham|         ham is nice|    11|     [ham, is, nice]|         [ham, nice]|
| ham|ham comes from re...|    28|[ham, comes, from...|[ham, comes, real...|
|spam|        no more spam|    12|    [no, more, spam]|              [spam]|
| ham|       need more ham|    13|   [need, more, ham]|         [need, ham]|

In [10]:
hashing = HashingTF(inputCol="filtered", outputCol="hashedValues", numFeatures=pow(2,4))

# Transform into a DF
hashed_df = hashing.transform(removed_df)

In [11]:
hashed_df.show()

+----+--------------------+------+--------------------+--------------------+--------------------+
| _c0|                 _c1|length|               words|            filtered|        hashedValues|
+----+--------------------+------+--------------------+--------------------+--------------------+
|spam|        this is spam|    12|    [this, is, spam]|              [spam]|      (16,[1],[1.0])|
| ham|         this is ham|    11|     [this, is, ham]|               [ham]|     (16,[14],[1.0])|
| ham|         ham is good|    11|     [ham, is, good]|         [ham, good]|(16,[8,14],[1.0,1...|
|spam|         spam is bad|    11|     [spam, is, bad]|         [spam, bad]|(16,[1,4],[1.0,1.0])|
|spam|most spam is irri...|    23|[most, spam, is, ...|  [spam, irritating]|(16,[1,8],[1.0,1.0])|
|spam|some spam is dang...|    22|[some, spam, is, ...|   [spam, dangerous]|(16,[1,8],[1.0,1.0])|
| ham|         ham is nice|    11|     [ham, is, nice]|         [ham, nice]|(16,[10,14],[1.0,...|
| ham|ham comes from

In [12]:
idf = IDF(inputCol="hashedValues", outputCol="features")
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)

In [18]:
rescaledData.show(truncate=False)

+----+----------------------------+------+----------------------------------+----------------------------+----------------------------------+----------------------------------------------------------------------------------------------+
|_c0 |_c1                         |length|words                             |filtered                    |hashedValues                      |features                                                                                      |
+----+----------------------------+------+----------------------------------+----------------------------+----------------------------------+----------------------------------------------------------------------------------------------+
|spam|this is spam                |12    |[this, is, spam]                  |[spam]                      |(16,[1],[1.0])                    |(16,[1],[0.4855078157817008])                                                                 |
|ham |this is ham                 |11    |[this, is,

In [13]:
indexer = StringIndexer(inputCol='_c0',outputCol='label')

In [14]:
indexed_df  = indexer.fit(rescaledData).transform(rescaledData)
indexed_df.show()

+----+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+
| _c0|                 _c1|length|               words|            filtered|        hashedValues|            features|label|
+----+--------------------+------+--------------------+--------------------+--------------------+--------------------+-----+
|spam|        this is spam|    12|    [this, is, spam]|              [spam]|      (16,[1],[1.0])|(16,[1],[0.485507...|  1.0|
| ham|         this is ham|    11|     [this, is, ham]|               [ham]|     (16,[14],[1.0])|(16,[14],[0.61903...|  0.0|
| ham|         ham is good|    11|     [ham, is, good]|         [ham, good]|(16,[8,14],[1.0,1...|(16,[8,14],[1.178...|  0.0|
|spam|         spam is bad|    11|     [spam, is, bad]|         [spam, bad]|(16,[1,4],[1.0,1.0])|(16,[1,4],[0.4855...|  1.0|
|spam|most spam is irri...|    23|[most, spam, is, ...|  [spam, irritating]|(16,[1,8],[1.0,1.0])|(16,[1,8],[0.4855...|  1.0|


In [38]:
X = indexed_df.drop('label')
y = indexed_df.select('label')

In [40]:
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler 

In [41]:
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state = 42)

TypeError: ignored

In [15]:
cleaned_df = indexed_df.select(['label','hashedValues','features'])
cleaned_df.show()

+-----+--------------------+--------------------+
|label|        hashedValues|            features|
+-----+--------------------+--------------------+
|  1.0|      (16,[1],[1.0])|(16,[1],[0.485507...|
|  0.0|     (16,[14],[1.0])|(16,[14],[0.61903...|
|  0.0|(16,[8,14],[1.0,1...|(16,[8,14],[1.178...|
|  1.0|(16,[1,4],[1.0,1.0])|(16,[1,4],[0.4855...|
|  1.0|(16,[1,8],[1.0,1.0])|(16,[1,8],[0.4855...|
|  1.0|(16,[1,8],[1.0,1.0])|(16,[1,8],[0.4855...|
|  0.0|(16,[10,14],[1.0,...|(16,[10,14],[1.87...|
|  0.0|(16,[1,7,11,14],[...|(16,[1,7,11,14],[...|
|  1.0|      (16,[1],[1.0])|(16,[1],[0.485507...|
|  0.0|(16,[9,14],[1.0,1...|(16,[9,14],[1.871...|
|  1.0|(16,[1,12],[1.0,1...|(16,[1,12],[0.485...|
|  0.0|(16,[3,14],[1.0,1...|(16,[3,14],[1.871...|
+-----+--------------------+--------------------+



In [16]:
from pyspark.sql import functions as F

In [17]:
columns = cleaned_df.drop(['labels'])
df_sizes = indexed_df.select(*[F.size(c).alias(c) for c in columns])
df_max = df_sizes.agg(*[F.max(c).alias(c) for c in columns])
max_dict = df_max.collect()[0].asDict()

df_result = df.select('label', *[df[c][i] for c in columns for i in range(max_dict[c])])
df_result.show()

TypeError: ignored

In [None]:
df_sizes = df2.select(F.size('col2').alias('col2'))
df_max = df_sizes.agg(F.max('col2'))
nb_columns = df_max.collect()[0][0]

df_result = df2.select('col1', *[df2['col2'][i] for i in range(nb_columns)])
df_result.show()

AnalysisException: ignored

In [29]:
clean_df = indexed_df.select(["label","features","length"])
clean_df.show()

+-----+--------------------+------+
|label|            features|length|
+-----+--------------------+------+
|  1.0|(16,[1],[0.485507...|    12|
|  0.0|(16,[14],[0.61903...|    11|
|  0.0|(16,[8,14],[1.178...|    11|
|  1.0|(16,[1,4],[0.4855...|    11|
|  1.0|(16,[1,8],[0.4855...|    23|
|  1.0|(16,[1,8],[0.4855...|    22|
|  0.0|(16,[10,14],[1.87...|    11|
|  0.0|(16,[1,7,11,14],[...|    28|
|  1.0|(16,[1],[0.485507...|    12|
|  0.0|(16,[9,14],[1.871...|    13|
|  1.0|(16,[1,12],[0.485...|     9|
|  0.0|(16,[3,14],[1.871...|    11|
+-----+--------------------+------+



In [34]:
from pyspark.sql.functions import col



In [45]:

import pyspark.sql.functions as sf
import pyspark.sql.types as sparktypes


In [36]:
split_df = clean_df.select(col("label"),col("features._1").alias("Tokens"), col("features._2").alias("weights"))

AnalysisException: ignored

In [32]:
import pandas as pd

In [33]:
clean_pd_df = pd.DataFrame(clean_df)
clean_pd_df.head()

ValueError: ignored

In [30]:
features_df = clean_df.select('features')
features_df.show()

+--------------------+
|            features|
+--------------------+
|(16,[1],[0.485507...|
|(16,[14],[0.61903...|
|(16,[8,14],[1.178...|
|(16,[1,4],[0.4855...|
|(16,[1,8],[0.4855...|
|(16,[1,8],[0.4855...|
|(16,[10,14],[1.87...|
|(16,[1,7,11,14],[...|
|(16,[1],[0.485507...|
|(16,[9,14],[1.871...|
|(16,[1,12],[0.485...|
|(16,[3,14],[1.871...|
+--------------------+



In [42]:
for row in features_df.itertuples(index=False):
    print(row)

AttributeError: ignored

In [37]:
features_df = features_df.withColumn('feature_list', list(col('features')))

TypeError: ignored

In [22]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned_df.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
model = NaiveBayes()
predictor = model.fit(training)

In [23]:
test_results = predictor.transform(testing)
test_results.show()

+-----+--------------------+--------------------+--------------------+--------------------+----------+
|label|        hashedValues|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+----------+
|  1.0|(16,[1,12],[1.0,1...|(16,[1,12],[0.485...|[-8.5018155317944...|[0.27471734930404...|       1.0|
+-----+--------------------+--------------------+--------------------+--------------------+----------+



In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 1.000000


In [27]:
model.save("basictestmodel1.h5")