**NPR SPEAKER IDENTIFICATION WITH NLP METHODS**

In [0]:
pip install gensim

Python interpreter will be restarted.
Collecting gensim
  Downloading gensim-4.2.0-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (24.1 MB)
Collecting smart-open>=1.8.1
  Downloading smart_open-6.2.0-py3-none-any.whl (58 kB)
Installing collected packages: smart-open, gensim
Successfully installed gensim-4.2.0 smart-open-6.2.0
Python interpreter will be restarted.


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
import pandas as pd
import numpy as np

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import gensim.parsing.preprocessing as gsp
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from gensim import utils

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report, accuracy_score

In [0]:
# File location and type
file_location = "/FileStore/tables/utterances.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df.limit(5))

episode,episode_order,speaker,utterance
57264,9,"Ms. LOREN MOONEY (Editor-in-Chief, Bicycling Magazine)","It's a 2,200-mile race. To give some sense of perspective, that's roughly the distance between Washington, D.C. and Las Vegas. They do it over the course of three weeks at very fast speeds. But incredibly, oftentimes the distance between first and second is somewhere between and one and three minutes."
57264,10,"Ms. LOREN MOONEY (Editor-in-Chief, Bicycling Magazine)","So for a top competitor like Lance to try to make up that much time -he's now 13 minutes, 26 seconds behind the current race leader, Cadel Evans of Australia. And even Lance said yesterday that for him, the -any chance of winning the tour has gone out the window. He still does have a teammate on his team, RadioShack team, American Levi Leipheimer currently in eighth place, two minutes, 14 seconds back. And Lance is going to do what he can to help Leipheimer do well."
57264,11,"NEAL CONAN, host","So in every team, presumably there's one star, one equivalent to Lance Armstrong and the other team - the rest of the teammates project him, do what they can to help him succeed?"
57264,12,"Ms. LOREN MOONEY (Editor-in-Chief, Bicycling Magazine)","That's right. Each team has nine riders. And what you have is basically the team leader, sort of your queen bee, and then eight worker bees working to keep that queen safe and happy for when the decisive moments of the race come. In cycling, a lot of the energy goes toward actually battling wind resistance. So if you're riding behind another rider, you can be using up to 30 percent less energy because you're in that slipstream of the rider who's battling the wind. So team riders will ride on the front, protect their leader. You'll often see a team member drift back to the team car to fetch water bottles to keep their leader well hydrated."
57264,13,"NEAL CONAN, host","So slipstream, this is like drafting in car racing, right?"


In [0]:
print(f"There are {df.count()} rows in the dataset")

There are 3200190 rows in the dataset


In [0]:
df.printSchema()

root
 |-- episode: string (nullable = true)
 |-- episode_order: string (nullable = true)
 |-- speaker: string (nullable = true)
 |-- utterance: string (nullable = true)



In [0]:
#Make the dataframe all lowercase for uniformity
for col in df.columns:
    df = df.withColumn(col, F.lower(F.col(col)))

In [0]:
#The speaker column consists of "name, host" and we only have the name to map it with the host_id file
df = df.withColumn('speaker', split(df['speaker'], ', h').getItem(0))

In [0]:
#Create a utterance len column to later get rid off any utterances with less than 100 characters
df = df.withColumn('utterance_len', F.length('utterance'))

In [0]:
df = df.na.drop(how="any")

**HOST ID FILE HAS NAMES OF THE NPR RADIO HOSTS/SPEAKERS THAT ACTUALLY MATTERS**

In [0]:
file_location = "/FileStore/tables/host_id.csv"
host_id = spark.read\
    .option("header", "true")\
    .csv(file_location)
display(host_id.limit(5))

speaker,host_id
lulu garcia-navarro,0
melissa block,1
leila fadel,2
susan davis,3
sacha pfeiffer,4


In [0]:
merged_df = df.join(host_id,['speaker'])
display(merged_df.limit(5))

speaker,episode,episode_order,utterance,utterance_len,host_id
neal conan,57264,11,"so in every team, presumably there's one star, one equivalent to lance armstrong and the other team - the rest of the teammates project him, do what they can to help him succeed?",178,7
neal conan,57264,13,"so slipstream, this is like drafting in car racing, right?",58,7
neal conan,57264,15,"and so the guy who's in back has an easier time. and even when there are people in front on a breakaway, for example, who are from different teams, three or four riders, you will see them switch back and forth rather congenially so the same person isn't in front all the time.",276,7
neal conan,57264,17,"we're talking with loren mooney, the editor-in-chief of bicycling magazine. she's at our bureau in new york. why do you watch the tour de france? so we want to hear from our bicycling audience today. 800-989-8255. email us: talk@npr.org. and we'll start with john(ph), john with us from san francisco.",301,7
neal conan,57264,19,"hi, john.",9,7


In [0]:
print(f"There are {merged_df.count()} rows in the dataset")

There are 1174823 rows in the dataset


In [0]:
from functools import reduce
from pyspark.sql.functions import col, lit

In [0]:
merged_df.where(col("speaker").isNull()).show()

+-------+-------+-------------+---------+-------------+-------+
|speaker|episode|episode_order|utterance|utterance_len|host_id|
+-------+-------+-------------+---------+-------------+-------+
+-------+-------+-------------+---------+-------------+-------+



In [0]:
merged_df.where(col("utterance").isNull()).show()

+-------+-------+-------------+---------+-------------+-------+
|speaker|episode|episode_order|utterance|utterance_len|host_id|
+-------+-------+-------------+---------+-------------+-------+
+-------+-------+-------------+---------+-------------+-------+



In [0]:
#only keep utterances with less than or equal too 100 len/characters
merged_df = merged_df.where(merged_df.utterance_len >= 100)

In [0]:
#unnecessary columns 
preproc_df = merged_df.drop('episode','episode_order','host_id','utterance_len')

In [0]:
#check for the top 5 speakers to focus on
preproc_df.groupBy('speaker').agg(count('speaker').alias('speaker_count')).orderBy('speaker_count', ascending = False).show(n=5)

+-------------+-------------+
|      speaker|speaker_count|
+-------------+-------------+
|   neal conan|       125249|
|steve inskeep|        33679|
|robert siegel|        29769|
|   ira flatow|        29610|
|farai chideya|        23727|
+-------------+-------------+
only showing top 5 rows



In [0]:
#Only keep top 5 speakers 
top_5_speaker = ['neal conan', 'steve inskeep','robert siegel','ira flatow','farai chideya']
preproc_df = preproc_df[(preproc_df['speaker'].isin(top_5_speaker))]

In [0]:
#remove any lines that starts with 'unidentified' because it means that the speaker/host is not the one speaking
string_list = ['unidentified']
starts_with = reduce(
    lambda x, y: x | y,
    [~col("utterance").startswith(s) for s in string_list], 
    lit(False))
preproc_df = preproc_df.where(starts_with)

In [0]:
#reduce speaker count size of neal conan to balance the data
preproc_df = preproc_df.sampleBy('speaker',
                                 fractions = {'neal conan': 0.3, 'steve inskeep': 1.0 ,'robert siegel': 1.0 ,'ira flatow': 1.0,'farai chideya':1.0},
                                 seed = 1234)
preproc_df.groupBy('speaker').count().orderBy('speaker').show()

+-------------+-----+
|      speaker|count|
+-------------+-----+
|farai chideya|23648|
|   ira flatow|29448|
|   neal conan|37648|
|robert siegel|29634|
|steve inskeep|33527|
+-------------+-----+



In [0]:
print(f"There are {preproc_df.count()} rows in the dataset")

There are 153905 rows in the dataset


**PREPROCESSING STAGE**

In [0]:
filters = [
           gsp.strip_tags, 
           gsp.strip_punctuation,
           gsp.strip_multiple_whitespaces,
           gsp.strip_numeric,
           gsp.remove_stopwords, 
           gsp.strip_short, 
           gsp.stem_text
          ]

def clean_text(x):
    s = x[1]
    s = s.lower()
    s = utils.to_unicode(s)
    for f in filters:
        s = f(s)
    return (x[0],s)

In [0]:
preproc_rdd = preproc_df.rdd
preproc_rdd.take(1)[0][1]
clean_text(preproc_rdd.take(1)[0])[1]

Out[23]: 'talk loren moonei editor chief bicycl magazin bureau new york watch tour franc want hear bicycl audienc todai email talk npr org start john john san francisco'

In [0]:
cleaned_rdd = preproc_rdd.map(lambda x : clean_text(x))
cleaned_df = cleaned_rdd.toDF()
display(cleaned_df.limit(5))

_1,_2
neal conan,talk loren moonei editor chief bicycl magazin bureau new york watch tour franc want hear bicycl audienc todai email talk npr org start john john san francisco
neal conan,ultim express train john pretti feroci
neal conan,suprem court hand rule import case earlier todai seven decis high court agre govern indefinit hold convict consid sexual danger prison term complet
neal conan,later program comic book ran sprint movi box offic chart glen weldon join iron man
ira flatow,talk nation scienc fridai ira flatow tucson arizona visit countri know hot talk weather talk food jalapeno anaheim poblano folk world spice food chili centuri


**USE NLP PIPELINES TO CREATE A PROCESSED DATAFRAME FOR TRAINING AND TESTING SPLIT**

In [0]:
regex_tokenizer = RegexTokenizer(inputCol="_2", outputCol="text", pattern="\\W")
count_vector = CountVectorizer(inputCol="text", outputCol="features", vocabSize=10000, minDF=5)
label_stringindex = StringIndexer(inputCol = "_1", outputCol = "label")

In [0]:
pipeline = Pipeline(stages=[regex_tokenizer,count_vector, label_stringindex])
pipeline_fit = pipeline.fit(cleaned_df)
processed_df = pipeline_fit.transform(cleaned_df)
processed_df.show(5)

+----------+--------------------+--------------------+--------------------+-----+
|        _1|                  _2|                text|            features|label|
+----------+--------------------+--------------------+--------------------+-----+
|neal conan|talk loren moonei...|[talk, loren, moo...|(10000,[0,1,3,12,...|  0.0|
|neal conan|ultim express tra...|[ultim, express, ...|(10000,[94,446,51...|  0.0|
|neal conan|suprem court hand...|[suprem, court, h...|(10000,[12,49,72,...|  0.0|
|neal conan|later program com...|[later, program, ...|(10000,[17,40,47,...|  0.0|
|ira flatow|talk nation scien...|[talk, nation, sc...|(10000,[0,4,13,29...|  3.0|
+----------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [0]:
print(f"There are {processed_df.count()} rows in the dataset")

There are 153905 rows in the dataset


**TRAINING AND TESTING DATAFRAME SPLIT**

In [0]:
train_df, test_df = processed_df.randomSplit([0.8, 0.2], seed=42)
print(f"There are {train_df.count()} rows in the training set, and {test_df.count()} in the test set")

There are 123386 rows in the training set, and 30519 in the test set


**LOGISTIC REGRESSION MODEL**

In [0]:
log_reg = LogisticRegression(maxIter=20, regParam=0.3)
log_regmodel_pred = log_reg.fit(train_df).transform(test_df)

In [0]:
log_regmodel_pred.select("_1","probability","label","prediction").orderBy("probability", ascending=False).show(n=5)

+----------+--------------------+-----+----------+
|        _1|         probability|label|prediction|
+----------+--------------------+-----+----------+
|neal conan|[0.99991949518929...|  0.0|       0.0|
|neal conan|[0.99856683436250...|  0.0|       0.0|
|neal conan|[0.99664477402708...|  0.0|       0.0|
|neal conan|[0.99538751411717...|  0.0|       0.0|
|neal conan|[0.99508042301998...|  0.0|       0.0|
+----------+--------------------+-----+----------+
only showing top 5 rows



In [0]:
log_regmodel_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
log_regmodel_eval.evaluate(log_regmodel_pred)

Out[47]: 0.6652904747861987

In [0]:
log_regmodel_pred_pd = log_regmodel_pred.select("prediction", "label").toPandas()
print (classification_report(log_regmodel_pred_pd["label"], log_regmodel_pred_pd["prediction"]))
print (accuracy_score(log_regmodel_pred_pd["label"], log_regmodel_pred_pd["prediction"]))

              precision    recall  f1-score   support

         0.0       0.62      0.71      0.67      7535
         1.0       0.59      0.65      0.62      6665
         2.0       0.61      0.48      0.54      5896
         3.0       0.77      0.84      0.80      5805
         4.0       0.81      0.63      0.71      4618

    accuracy                           0.67     30519
   macro avg       0.68      0.66      0.67     30519
weighted avg       0.67      0.67      0.66     30519

0.6652904747861987


**NAIVE BAYES MODEL**

In [0]:
naive_bay = NaiveBayes(smoothing=1)
naive_baymodel_pred = naive_bay.fit(train_df).transform(test_df)

In [0]:
naive_baymodel_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
naive_baymodel_eval.evaluate(naive_baymodel_pred)

Out[50]: 0.6464825190864707

In [0]:
naive_baymodel_pred_pd = naive_baymodel_pred.select("prediction", "label").toPandas()
print (classification_report(naive_baymodel_pred_pd["label"], naive_baymodel_pred_pd["prediction"]))
print (accuracy_score(naive_baymodel_pred_pd["label"], naive_baymodel_pred_pd["prediction"]))

              precision    recall  f1-score   support

         0.0       0.69      0.59      0.64      7535
         1.0       0.57      0.62      0.59      6665
         2.0       0.57      0.49      0.53      5896
         3.0       0.74      0.85      0.79      5805
         4.0       0.65      0.73      0.69      4618

    accuracy                           0.65     30519
   macro avg       0.64      0.65      0.65     30519
weighted avg       0.65      0.65      0.64     30519

0.6464825190864707


**RANDOM FOREST CLASSIFIER**

In [0]:
from pyspark.ml.classification import RandomForestClassifier
rforest_model = RandomForestClassifier(labelCol="label", featuresCol="features")
rforest_model_pred = rforest_model.fit(train_df).transform(test_df)

In [0]:
rforest_model_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rforest_model_eval.evaluate(rforest_model_pred)

Out[54]: 0.33081031488580886

In [0]:
rforest_model_pred_pd = rforest_model_pred.select("prediction", "label").toPandas()
print (classification_report(rforest_model_pred_pd["label"], rforest_model_pred_pd["prediction"]))
print (accuracy_score(rforest_model_pred_pd["label"], rforest_model_pred_pd["prediction"]))

              precision    recall  f1-score   support

         0.0       0.28      0.92      0.43      7535
         1.0       0.44      0.26      0.33      6665
         2.0       0.89      0.01      0.02      5896
         3.0       0.83      0.15      0.26      5805
         4.0       0.85      0.10      0.18      4618

    accuracy                           0.33     30519
   macro avg       0.66      0.29      0.24     30519
weighted avg       0.62      0.33      0.26     30519

0.33081031488580886


**CROSS VALIDATION to find best parameters with the LOGISTIC REGRESSION MODEL**

In [0]:
param_grid = (ParamGridBuilder().addGrid(log_reg.regParam, [0.1, 0.3, 0.5]).addGrid(log_reg.elasticNetParam, [0.0, 0.1, 0.2]).build())
cross_val = CrossValidator(estimator=log_reg, estimatorParamMaps=param_grid, evaluator=log_regmodel_eval, numFolds=5)
cross_valmodel_pred = cross_val.fit(train_df).transform(test_df)

In [0]:
cross_valmodel_eval = MulticlassClassificationEvaluator(predictionCol="prediction")
cross_valmodel_eval.evaluate(cross_valmodel_pred)

Out[57]: 0.667979267087604

In [0]:
cross_valmodel_pred_pd = cross_valmodel_pred.select("prediction", "label").toPandas()
print (classification_report(cross_valmodel_pred_pd["label"], cross_valmodel_pred_pd["prediction"]))
print (accuracy_score(cross_valmodel_pred_pd["label"], cross_valmodel_pred_pd["prediction"]))

              precision    recall  f1-score   support

         0.0       0.65      0.69      0.67      7535
         1.0       0.60      0.63      0.61      6665
         2.0       0.59      0.52      0.55      5896
         3.0       0.77      0.84      0.80      5805
         4.0       0.78      0.67      0.72      4618

    accuracy                           0.67     30519
   macro avg       0.68      0.67      0.67     30519
weighted avg       0.67      0.67      0.67     30519

0.6695828827943249
