In [37]:
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as F

from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel


from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import LDA

In [3]:
spark = SparkSession.builder.appName("M8-CA2-Fraud-TGA").getOrCreate()

sparkContext = spark.sparkContext
sqlContext = SQLContext(sparkContext)

### 1. Load data into Spark DataFrame

In [4]:
# Download the dataset

# https://www.cs.cmu.edu/~./enron/enron_mail_20150507.tar.gz

# Extract, maildir will be created

# tar -zxvf enron_mail_20150507.tar.gz

# mkdir mail_dataset

# cd mail_dataset

# Copy sent mails from names starting with letter c and d

# cp --backup=existing --suffix=.orig -r ../maildir/c*/sent*/ .
# cp --backup=existing --suffix=.orig -r ../maildir/d*/sent*/ .


# sed -i -e 's/\r//g' sent/*

# head -q -n 5 sent/* | paste - - - - - -d# > output.csv

location = "/user/edureka_672184/m8_datasets/maildir/output.csv"
raw = spark.read.option("delimiter", "#").csv(location).toDF("messageid", "date", "from_", "to_", "subject")
# withColumn creates a new dataframe after adding the given column
df = raw.withColumn("date", F.trim(F.split(raw.date, ":")[1])) \
.withColumn("from_", F.trim(F.split(raw.from_, ":")[1])) \
.withColumn("to_", F.trim(F.split(raw.to_, ":")[1])) \
.withColumn("subject",F.trim(F.split(raw.subject,"Subject:")[1]))

print(df.head(3))
print(df.count())

[Row(messageid=u'Message-ID: <7257046.1075853654621.JavaMail.evans@thyme>', date=u'Fri, 16 Feb 2001 10', from_=u'stacy.dickson@enron.com', to_=u'jeffrey.hodge@enron.com', subject=u'Re: GISB'), Row(messageid=u'Message-ID: <3576232.1075858078644.JavaMail.evans@thyme>', date=u'Fri, 8 Dec 2000 08', from_=u'martin.cuilla@enron.com', to_=u's.nadalin@thehiringspot.com', subject=u'Re: new years'), Row(messageid=u'Message-ID: <23635942.1075853654829.JavaMail.evans@thyme>', date=u'Mon, 12 Mar 2001 08', from_=u'stacy.dickson@enron.com', to_=u'russell.diamond@enron.com', subject=u'RE: WEPCO')]
772


### 2. Display the top 10 high-frequency users based on weekly numbers of emails sent.

In [5]:
# SEE THAT WEEKS ARE CORRECTLY EXTRACTED
df1 = df.withColumn("week", F.weekofyear(F.unix_timestamp(df.date, "EEE, dd MMM yyyy HH").cast("timestamp")))
df1.head(3)

[Row(messageid=u'Message-ID: <7257046.1075853654621.JavaMail.evans@thyme>', date=u'Fri, 16 Feb 2001 10', from_=u'stacy.dickson@enron.com', to_=u'jeffrey.hodge@enron.com', subject=u'Re: GISB', week=7),
 Row(messageid=u'Message-ID: <3576232.1075858078644.JavaMail.evans@thyme>', date=u'Fri, 8 Dec 2000 08', from_=u'martin.cuilla@enron.com', to_=u's.nadalin@thehiringspot.com', subject=u'Re: new years', week=49),
 Row(messageid=u'Message-ID: <23635942.1075853654829.JavaMail.evans@thyme>', date=u'Mon, 12 Mar 2001 08', from_=u'stacy.dickson@enron.com', to_=u'russell.diamond@enron.com', subject=u'RE: WEPCO', week=11)]

In [6]:
# GET MAXIMUM WEEK
maxweek = df1.agg(F.max(df1.week)).first()[0]
print(maxweek)
df1.agg(F.max(df1.week)).collect()

50


[Row(max(week)=50)]

In [7]:
df1.groupBy("from_").count().show()

+--------------------+-----+
|               from_|count|
+--------------------+-----+
|twanda.sweet@enro...|   27|
|martin.cuilla@enr...|   16|
|michelle.cash@enr...|  540|
|stacy.dickson@enr...|  189|
+--------------------+-----+



In [8]:
df1.groupBy("from_").count().withColumn("avgcount", F.col("count") / maxweek).sort(F.col("avgcount").desc()).show()

+--------------------+-----+--------+
|               from_|count|avgcount|
+--------------------+-----+--------+
|michelle.cash@enr...|  540|    10.8|
|stacy.dickson@enr...|  189|    3.78|
|twanda.sweet@enro...|   27|    0.54|
|martin.cuilla@enr...|   16|    0.32|
+--------------------+-----+--------+



### 3. Extract top 20 keywords from the subject text for both
• for the top 10 high-frequency users and

• for the non-high frequency users.

In [9]:
# create tokenizer(it is a transformer)
tokenizer = Tokenizer().setInputCol("subject").setOutputCol("words")
# then transform df1
transformed = tokenizer.transform(df1)

In [10]:
# groupby from and get count, sort by count descending and take top 10
top10  = df1.groupby("from_").count().sort(F.col("count").desc()).take(10)
top10

[Row(from_=u'michelle.cash@enron.com', count=540),
 Row(from_=u'stacy.dickson@enron.com', count=189),
 Row(from_=u'twanda.sweet@enron.com', count=27),
 Row(from_=u'martin.cuilla@enron.com', count=16)]

In [11]:
# convert Row(key1=val1, key2 = val2) to Dictionary form {key1:val1, key2:val2}
print(top10[0].asDict())

{'count': 540, 'from_': u'michelle.cash@enron.com'}


In [12]:
top_users = [v.asDict()["from_"] for v in top10]
top_users

[u'michelle.cash@enron.com',
 u'stacy.dickson@enron.com',
 u'twanda.sweet@enron.com',
 u'martin.cuilla@enron.com']

In [13]:
# filter out top users. 737 of 772 emails are sent by these users 
topuserdata = transformed.filter(transformed.subject != "").filter(transformed.from_.isin(top_users))
print(topuserdata.count())

737


In [14]:
# creates one row for one keyword
# so other attributes will be repeated many times
topuserdata.withColumn("keyword", F.explode("words")).groupBy("keyword").count().sort(F.col("count").desc()).show(20)

+----------+-----+
|   keyword|count|
+----------+-----+
|       re:|  412|
| agreement|   85|
|         -|   51|
|       and|   46|
|        of|   42|
|       for|   41|
|     enron|   33|
|   project|   30|
|       fw:|   26|
|        to|   25|
|    master|   23|
|      firm|   20|
|   meeting|   20|
|agreements|   20|
|  contract|   19|
|       nui|   19|
|         &|   18|
|employment|   18|
|       new|   17|
|        --|   17|
+----------+-----+
only showing top 20 rows



In [15]:
otheruserdata = transformed.filter(transformed.subject != "").filter(transformed.from_.isin(top_users) == False)
otheruserdata.withColumn("keyword",F.explode("words")).groupBy("keyword").count().sort(F.col("count").desc()).show(20)

+-------+-----+
|keyword|count|
+-------+-----+
+-------+-----+



### 4. Extract top 10 keywords by identifying removing the common stop words.

In [20]:
remover = StopWordsRemover().setInputCol("words").setOutputCol("filtered")
cleaned = remover.transform(transformed)
cleaned.filter(cleaned.subject != "").withColumn("keyword",F.explode(cleaned.words)).groupBy("keyword").count().sort(F.col("count").desc()).show()

+----------+-----+
|   keyword|count|
+----------+-----+
|       re:|  412|
| agreement|   85|
|         -|   51|
|       and|   46|
|        of|   42|
|       for|   41|
|     enron|   33|
|   project|   30|
|       fw:|   26|
|        to|   25|
|    master|   23|
|   meeting|   20|
|      firm|   20|
|agreements|   20|
|  contract|   19|
|       nui|   19|
|employment|   18|
|         &|   18|
|       new|   17|
|        --|   17|
+----------+-----+
only showing top 20 rows



### 5. Extend the stop words dictionary by adding your own stop words such as ‘— ‘

In [24]:
stopWords = StopWordsRemover().getStopWords() + ["-", "re:", "fw:"]
remover = StopWordsRemover().setStopWords(stopWords).setInputCol("words").setOutputCol("filtered")
cleaned = remover.transform(transformed)
# data grouped by set of filtered words
# this shows that some collection of words occur more frequently than others
cleaned.filter(cleaned.subject != "").withColumn("keyword",F.explode(cleaned.words)).groupBy("filtered").count().sort(F.col("count").desc()).show()

+--------------------+-----+
|            filtered|count|
+--------------------+-----+
|[opportunities, ena]|   48|
| [enron, pdi, japan]|   34|
|[master, firm, pu...|   30|
|[ena, deals, move...|   25|
|[master, service,...|   24|
|[request, submitt...|   24|
|[recent, developm...|   22|
|[che, club, meeti...|   22|
|[payroll, company...|   21|
|[b, &, j, gas, &,...|   21|
|[data, protection...|   20|
|[master, agreemen...|   20|
|[section, 5.14, s...|   20|
|[enron, investmen...|   20|
|[triple, lutz, bi...|   20|
|[new, vendors, co...|   19|
|[project, triple,...|   19|
|[cargill, energy,...|   19|
|[georgia, pacific...|   18|
|[mission, impossi...|   18|
+--------------------+-----+
only showing top 20 rows




### 6. Introduce a new column label to identify new, replied, and forwarded messages.

In [30]:
df2 = cleaned.withColumn("msgtype", 
                         F.when(cleaned.subject.startswith("Re:"),1). \
                         otherwise(F.when(cleaned.subject.startswith("Fw:"),2). \
                         otherwise(0)))

df2.select("msgtype").head(5)

[Row(msgtype=1),
 Row(msgtype=1),
 Row(msgtype=0),
 Row(msgtype=0),
 Row(msgtype=0)]

### 7. Get the trend of the over mail activity using the pivot table from spark itself.

In [32]:
df2.groupBy("week").pivot("msgtype").count().show()

+----+---+----+----+
|week|  0|   1|   2|
+----+---+----+----+
|  31|  5|   2|null|
|  34|  5|   6|null|
|  28|  7|   3|null|
|  26| 18|  23|null|
|  27|  2|   1|null|
|  44| 10|   7|null|
|  12| 11|   9|null|
|  22|  1|   1|null|
|  47|  3|null|null|
|   1|  1|null|null|
|  13| 16|   5|null|
|  16|  9|   7|   1|
|   6|  3|   1|null|
|   3|  2|   1|null|
|  20| 11|  13|null|
|  40|  1|   4|null|
|  48|  1|   5|null|
|   5|  2|null|null|
|  19| 16|  24|null|
|  41|  6|  16|null|
+----+---+----+----+
only showing top 20 rows



### 8. Use k-means clustering to create 4 clusters from the extracted keywords.

In [38]:
# convert keywords to feature vector
df4 = df2.filter(df2.subject != "")
cvmodel =CountVectorizer().setInputCol("filtered").setOutputCol("features").fit(df4)
featured = cvmodel.transform(df4)
featured.head()

Row(messageid=u'Message-ID: <7257046.1075853654621.JavaMail.evans@thyme>', date=u'Fri, 16 Feb 2001 10', from_=u'stacy.dickson@enron.com', to_=u'jeffrey.hodge@enron.com', subject=u'Re: GISB', week=7, words=[u're:', u'gisb'], filtered=[u'gisb'], msgtype=1, features=SparseVector(942, {30: 1.0}))

In [40]:
# kmeans takes "features" columun as its feature-set without being explicitly told
kmeans = KMeans().setK(4).setSeed(1L)
model = kmeans.fit(featured)
predictions = model.transform(featured)

DataFrame[messageid: string, date: string, from_: string, to_: string, subject: string, week: int, words: array<string>, filtered: array<string>, msgtype: int, features: vector, prediction: int]

### 9. Use LDA to generate 4 topics from the extracted keywords

In [43]:
lda = LDA().setK(4).setMaxIter(10)
model = lda.fit(featured)
topics = model.describeTopics(4)
topics.show()

+-----+-----------------+--------------------+
|topic|      termIndices|         termWeights|
+-----+-----------------+--------------------+
|    0| [11, 15, 37, 47]|[0.00576072810041...|
|    1|    [2, 4, 8, 30]|[0.01015907726442...|
|    2|[39, 24, 21, 116]|[0.00509658536382...|
|    3|   [0, 3, 16, 13]|[0.02476422712367...|
+-----+-----------------+--------------------+



In [44]:
topic_indices = topics.select("termIndices").rdd.map(lambda x:x[0][0]).collect()
[cvmodel.vocabulary[v] for v in topic_indices]

[u'new', u'project', u'employee', u'agreement']

### 10.Can you identify top keywords in the spam messages across the organization?