## Check PySpark and EMR cluster environment

In [1]:
sc.version

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1604531900131_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'2.4.6-amzn-0'

In [2]:
sc.master

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'yarn'

In [3]:
sc.install_pypi_package("pandas==0.25.1")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Downloading https://files.pythonhosted.org/packages/7e/ab/ea76361f9d3e732e114adcd801d2820d5319c23d0ac5482fa3b412db217e/pandas-0.25.1-cp37-cp37m-manylinux1_x86_64.whl (10.4MB)
Collecting python-dateutil>=2.6.1 (from pandas==0.25.1)
  Downloading https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl (227kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.1

In [4]:
import pandas as pd

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read data into spark dataframe

In [5]:
path = 's3://bauka-big-tweets/text.csv'
df = spark.read.csv(path, header=True, inferSchema=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
df.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+
|_c0| tokens_back_to_text|
+---+--------------------+
|  0|rudygiuliani comp...|
|  1|      trump machismo|
|  2|briantylercohen b...|
+---+--------------------+
only showing top 3 rows

## Pre-processing

In [7]:
# Convert the data type of tweets into an array type as this is what is required for creating word2vec
from pyspark.sql.functions import array
df_array = df.withColumn("text_array", array("tokens_back_to_text"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
df_array.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+--------------------+
|_c0| tokens_back_to_text|          text_array|
+---+--------------------+--------------------+
|  0|rudygiuliani comp...|[rudygiuliani com...|
|  1|      trump machismo|    [trump machismo]|
|  2|briantylercohen b...|[briantylercohen ...|
+---+--------------------+--------------------+
only showing top 3 rows

In [9]:
# TF with CountVectorizer
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="text_array", outputCol="token")
model = cv.fit(df_array)
result = model.transform(df_array)
result.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+--------------------+--------------------+
|_c0| tokens_back_to_text|          text_array|               token|
+---+--------------------+--------------------+--------------------+
|  0|rudygiuliani comp...|[rudygiuliani com...| (118918,[56],[1.0])|
|  1|      trump machismo|    [trump machismo]|(118918,[18838],[...|
|  2|briantylercohen b...|[briantylercohen ...|(118918,[291],[1.0])|
+---+--------------------+--------------------+--------------------+
only showing top 3 rows

In [10]:
# IDF
from pyspark.ml.feature import IDF
idf = IDF(inputCol='token', outputCol='features')
idfModel = idf.fit(result)
result_tfidf = idfModel.transform(result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
result_tfidf.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+--------------------+--------------------+--------------------+
|_c0| tokens_back_to_text|          text_array|               token|            features|
+---+--------------------+--------------------+--------------------+--------------------+
|  0|rudygiuliani comp...|[rudygiuliani com...| (118918,[56],[1.0])|(118918,[56],[6.8...|
|  1|      trump machismo|    [trump machismo]|(118918,[18838],[...|(118918,[18838],[...|
|  2|briantylercohen b...|[briantylercohen ...|(118918,[291],[1.0])|(118918,[291],[7....|
+---+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows

## Modeling - KMeans Clustering

Create labels for tweets using KMeans clustering. Once the data is labeled, we can further apply deep learning model to predict labels for tweets.

In [12]:
# Import Kmeans from MLib
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=2, seed=1) # instantiate KMeans with the desired number of clusters
model = kmeans.fit(result_tfidf.select('features'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Combine lables with the data set
transformed = model.transform(result_tfidf)
transformed.show(10)    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+--------------------+--------------------+--------------------+----------+
|_c0| tokens_back_to_text|          text_array|               token|            features|prediction|
+---+--------------------+--------------------+--------------------+--------------------+----------+
|  0|rudygiuliani comp...|[rudygiuliani com...| (118918,[56],[1.0])|(118918,[56],[6.8...|         0|
|  1|      trump machismo|    [trump machismo]|(118918,[18838],[...|(118918,[18838],[...|         0|
|  2|briantylercohen b...|[briantylercohen ...|(118918,[291],[1.0])|(118918,[291],[7....|         0|
|  3|bradleywhitford y...|[bradleywhitford ...|(118918,[5820],[1...|(118918,[5820],[1...|         0|
|  4|actbrigitte presi...|[actbrigitte pres...| (118918,[92],[1.0])|(118918,[92],[7.0...|         0|
|  5|timcast come neve...|[timcast come nev...|(118918,[1076],[1...|(118918,[1076],[8...|         0|
|  6|bkbaguley afcoory...|[bkbaguley afcoor...|(118918,[21096],[...|(118918,[21096],[...|  

In [14]:
data = transformed.toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
data['prediction'].value_counts()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0    407558
1       208
Name: prediction, dtype: int64

In [17]:
# Export the data into S3 in parqquet format to be further used for deep learning model in sagemaker
transformed.write.parquet("s3a://bauka-big-tweets/pyspark_output.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…