In [1]:
# Import the PySpark module
import pyspark
from pyspark.sql import SparkSession

In [2]:
# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()

In [3]:
# What version of Spark?
print(pyspark.__version__)

2.3.2


In [4]:
flights = spark.read.csv('data/flights-larger.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

In [None]:
# Get number of records
print("The data contain %d records." % flights.count())

# View the first five records
flights.show(5)

# Check column data types
flights.dtypes
# flights.printSchema()

In [30]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file (modified in Excel for numbering)
sms = spark.read.csv('data/smsspamcollection/SMSSpamCollection.csv', header=False, schema=schema)

# Print schema of DataFrame
print(sms.printSchema())

sms.show()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)

None
+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|	Go until jurong ...|    0|
|  2|	Ok lar... Joking...|    0|
|  3|	Free entry in 2 ...|    1|
|  4|	U dun say so ear...|    0|
|  5|	Nah I don't thin...|    0|
|  6|	FreeMsg Hey ther...|    1|
|  7|	Even my brother ...|    0|
|  8|	As per your requ...|    0|
|  9|	WINNER!! As a va...|    1|
| 10|	Had your mobile ...|    1|
| 11|	I'm gonna be hom...|    0|
| 12|	SIX chances to w...|    1|
| 13|	URGENT! You have...|    1|
| 14|	I've been search...|    0|
| 15|	I HAVE A DATE ON...|    0|
| 16|	XXXMobileMovieCl...|    1|
| 17|	Oh k...i'm watch...|    0|
| 18|	Eh u remember ho...|    0|
| 19|	Fine if that¬ís ...|    0|
| 20|	England v Macedo...|    1|
+---+--------------------+-----+
only showing top 20 rows



In [8]:
# Terminate the cluster
# spark.stop()

In [7]:
# Removing columns and rows

# Remove the 'flight' column
flights = flights.drop("flight")

# Number of records with missing 'delay' values
flights.filter('delay IS NULL').count()

# Remove records with missing 'delay' values
flights = flights.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights = flights.dropna()
print(flights.count())

258289


In [8]:
# Column manipulation

# Import the required function
from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column
flights_km = flights.withColumn('km', round(flights.mile * 1.60934, 0)).drop('mile')

# Create 'label' column indicating whether flight delayed (1) or not (0)
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))

# Check first five records
flights_km.show(5)

flights = flights_km

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



In [9]:
# Categorical columns

from pyspark.ml.feature import StringIndexer as si

# Create an indexer
indexer = si(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flights)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights)

# Repeat the process for the other categorical feature
flights_indexed = si(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)

In [10]:
flights_indexed.head()

Row(mon=10, dom=10, dow=1, carrier='OO', org='ORD', depart=8.18, duration=51, delay=27, km=253.0, label=1, carrier_idx=2.0, org_idx=0.0)

In [11]:
# Assembling columns

# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=['mon', 'dom', 'dow', 'carrier_idx',
'org_idx', 'km', 'depart', 'duration'
], outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(flights_indexed)

# Check the resulting column
flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0]  |27   |
|[11.0,22.0,1.0,2.0,0.0,1188.0,7.17,127.0]|-19  |
|[2.0,14.0,5.0,4.0,2.0,3618.0,21.17,365.0]|60   |
|[5.0,25.0,3.0,3.0,5.0,621.0,12.92,85.0]  |22   |
|[3.0,28.0,1.0,4.0,3.0,1732.0,13.33,182.0]|70   |
+-----------------------------------------+-----+
only showing top 5 rows



In [16]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio = flights_train.count() / flights.count()
print(training_ratio)

0.7989887296787707


In [17]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()

In [18]:
tree_model = tree.fit(flights_train)

In [19]:
# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|1    |1.0       |[0.28382630585273755,0.7161736941472624]|
|1    |1.0       |[0.28382630585273755,0.7161736941472624]|
|0    |0.0       |[0.6484126047444825,0.3515873952555175] |
|1    |0.0       |[0.5074837310195228,0.49251626898047723]|
|1    |0.0       |[0.5074837310195228,0.49251626898047723]|
+-----+----------+----------------------------------------+
only showing top 5 rows



In [20]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|10133|
|    0|       0.0|16633|
|    1|       1.0|16301|
|    0|       1.0| 8852|
+-----+----------+-----+



In [21]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP)/(TN + TP + FN + FP)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|10133|
|    0|       0.0|16633|
|    1|       1.0|16301|
|    0|       1.0| 8852|
+-----+----------+-----+

0.6343342514301122


Accuracy is not great, we should try to improve.

In [22]:
# Import the logistic regression class
from pyspark.ml.classification import LogisticRegression

# Create a classifier object and train on training data
logistic = LogisticRegression().fit(flights_train)

# Create predictions for the testing data and show confusion matrix
prediction = logistic.transform(flights_test)
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9633|
|    0|       0.0|14876|
|    1|       1.0|16801|
|    0|       1.0|10609|
+-----+----------+-----+



In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Calculate precision and recall
precision = TP/(TP + FP)
recall = TP/(TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})
# print(weighted_precision)

# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: "areaUnderROC"})
print('weighted_precision = {:.2f}\nauc   = {:.2f}'.format(weighted_precision, auc))

precision = 0.65
recall    = 0.62
0.6100108439918015
weighted_precision = 0.61
auc   = 0.65


In [35]:
import re
s = "This must not b3 delet3d, but the number at the end yes 134411"
s = re.sub("\d+", "", s)
print(s)

This must not b deletd, but the number at the end yes 


In [37]:
# Import the necessary functions
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer

# Remove punctuation (REGEX provided) and numbers
wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))

# Merge multiple spaces
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))

# Split the text into words
wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)

wrangled.show(4, truncate=False)

+---+--------------------------------------------------------------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |text                                                                                                                            |label|words                                                                                                                                                         |
+---+--------------------------------------------------------------------------------------------------------------------------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1  |	Go until jurong point crazy Available only in bugis n great world la e buffet Cine there got a

In [39]:
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF

# Remove stop words.
wrangled = StopWordsRemover(inputCol='words', outputCol='terms')\
      .transform(wrangled)

# Apply the hashing trick
wrangled = HashingTF(inputCol = 'terms', outputCol='hash', numFeatures=1024)\
      .transform(wrangled)

# Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol='hash', outputCol='features')\
      .fit(wrangled).transform(wrangled)
      
tf_idf.select('terms', 'features').show(4, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|terms                                                                                                                                |features                                                                                                                                                                                                                                               