In [36]:
import pyspark
import nltk
import config

nltk.download('punkt')
nltk.download("stopwords")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

# Creating Spark session

In [37]:
# creating spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()

# Fetching Data

In [53]:
# reading data from the source database
df = spark.read.jdbc(
    url        = f'jdbc:mysql://source-db:{config.MySQL.port}/{config.MySQL.database}',
    table      = config.MySQL.table,
    properties = {
        'user'    : config.MySQL.user,
        'password': config.MySQL.password,
        'driver'  : 'com.mysql.cj.jdbc.Driver'
    }
)

# df = df.limit(1_000_000)
df.show(10)

[Stage 37:>                                                         (0 + 1) / 1]

+---+----------+-------------------+--------+---------------+--------------------+
| id|    number|          createdAt|    flag|       userName|                text|
+---+----------+-------------------+--------+---------------+--------------------+
|  0|1467810369|2009-04-06 22:19:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|2009-04-06 22:19:49|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|2009-04-06 22:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|2009-04-06 22:19:57|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|2009-04-06 22:19:57|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|2009-04-06 22:20:00|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|2009-04-06 22:20:03|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|2009-04-06 22:20:03|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|2009-04-06 22:20:05|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|  9

                                                                                

# Analyzing data schema

In [47]:
# observing schema of the data
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- number: integer (nullable = true)
 |-- createdAt: timestamp (nullable = true)
 |-- flag: string (nullable = true)
 |-- userName: string (nullable = true)
 |-- text: string (nullable = true)



In [48]:
# describing the dataset
df.describe().show()

[Stage 29:>                                                         (0 + 1) / 1]

+-------+-----------------+--------------------+--------+--------------------+--------------------+
|summary|               id|              number|    flag|            userName|                text|
+-------+-----------------+--------------------+--------+--------------------+--------------------+
|  count|          1599488|             1599488| 1599488|             1599488|             1599488|
|   mean| 800030.493971821| 1.978032138980347E9|    NULL| 4.325887521835714E9|                NULL|
| stddev|461875.0173308443|1.6960269568956092E8|    NULL|5.162733218454889E10|                NULL|
|    min|                0|          1467810369|NO_QUERY|        000catnap000|                 ...|
|    max|          1599999|          2147483647|NO_QUERY|          zzzzeus111|ï¿½ï¿½ï¿½ï¿½ï¿½ß§...|
+-------+-----------------+--------------------+--------+--------------------+--------------------+



                                                                                

# Removing missing values

In [49]:
# removing missing values
df = df.dropna()
df.count()

                                                                                

1599488

# filtering out feature subset

In [54]:
# selecting only the useful columns
df = df.select(['createdAt', 'userName', 'text'])
df.show(10, truncate= 100)

[Stage 38:>                                                         (0 + 1) / 1]

+-------------------+---------------+----------------------------------------------------------------------------------------------------+
|          createdAt|       userName|                                                                                                text|
+-------------------+---------------+----------------------------------------------------------------------------------------------------+
|2009-04-06 22:19:45|_TheSpecialOne_|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Thir...|
|2009-04-06 22:19:49|  scotthamilton|is upset that he can't update his Facebook by texting it... and might cry as a result  School tod...|
|2009-04-06 22:19:53|       mattycus|           @Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds|
|2009-04-06 22:19:57|        ElleCTF|                                                     my whole body feels itchy and like its on fire |
|2009-04-06 22:19:57|      

                                                                                

# Cleaning and preprocessing the data

## Replacing usernames with placeholder 'user'

In [57]:
df = df.withColumn(
    # replacing every username with '@user'
    colName= 'text',
    col    = pyspark.sql.functions.regexp_replace(
        pyspark.sql.functions.col('text'), 
        pattern    = r'@\S*',
        replacement= 'user'
    )
)
df.show(10, truncate= 100)

[Stage 40:>                                                         (0 + 1) / 1]

+-------------------+---------------+----------------------------------------------------------------------------------------------------+
|          createdAt|       userName|                                                                                                text|
+-------------------+---------------+----------------------------------------------------------------------------------------------------+
|2009-04-06 22:19:45|_TheSpecialOne_|            user http - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|
|2009-04-06 22:19:49|  scotthamilton|is upset that he can't update his Facebook by texting it... and might cry as a result  School tod...|
|2009-04-06 22:19:53|       mattycus|                user I dived many times for the ball. Managed to save 50%  The rest go out of bounds|
|2009-04-06 22:19:57|        ElleCTF|                                                     my whole body feels itchy and like its on fire |
|2009-04-06 22:19:57|      

                                                                                

## Replacing urls/links with the placeholder 'https'

In [58]:
df = df.withColumn(
    # replacing http/https urls with 'http'
    colName = 'text',
    col     = pyspark.sql.functions.regexp_replace(
        pyspark.sql.functions.col('text'), 
        pattern    = r'http\S*',
        replacement= 'http'
    )
)
df.show(10, truncate= 100)

[Stage 41:>                                                         (0 + 1) / 1]

+-------------------+---------------+----------------------------------------------------------------------------------------------------+
|          createdAt|       userName|                                                                                                text|
+-------------------+---------------+----------------------------------------------------------------------------------------------------+
|2009-04-06 22:19:45|_TheSpecialOne_|            user http - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|
|2009-04-06 22:19:49|  scotthamilton|is upset that he can't update his Facebook by texting it... and might cry as a result  School tod...|
|2009-04-06 22:19:53|       mattycus|                user I dived many times for the ball. Managed to save 50%  The rest go out of bounds|
|2009-04-06 22:19:57|        ElleCTF|                                                     my whole body feels itchy and like its on fire |
|2009-04-06 22:19:57|      

                                                                                

## Tokenizing the text using nltk

In [None]:
# defining pyspark tokenizer function
@pyspark.sql.functions.udf(
    returnType= pyspark.sql.types.ArrayType(
        pyspark.sql.types.StringType()))
def tokenize(text):
    return nltk.tokenize.word_tokenize(text)

In [None]:
df = df.withColumn(
    # tokenizing the text
    colName= 'text',
    col    = tokenize('text')
)
df.show(10, truncate= 100)

[Stage 42:>                                                         (0 + 1) / 1]

+-------------------+---------------+----------------------------------------------------------------------------------------------------+
|          createdAt|       userName|                                                                                                text|
+-------------------+---------------+----------------------------------------------------------------------------------------------------+
|2009-04-06 22:19:45|_TheSpecialOne_|[user, http, -, Awww, ,, that, 's, a, bummer, ., You, shoulda, got, David, Carr, of, Third, Day, ...|
|2009-04-06 22:19:49|  scotthamilton|[is, upset, that, he, ca, n't, update, his, Facebook, by, texting, it, ..., and, might, cry, as, ...|
|2009-04-06 22:19:53|       mattycus|[user, I, dived, many, times, for, the, ball, ., Managed, to, save, 50, %, The, rest, go, out, of...|
|2009-04-06 22:19:57|        ElleCTF|                                           [my, whole, body, feels, itchy, and, like, its, on, fire]|
|2009-04-06 22:19:57|      

                                                                                

# Removing stopwords

In [60]:
# pyspark stopword remover
@pyspark.sql.functions.udf(
    returnType= pyspark.sql.types.ArrayType(
        pyspark.sql.types.StringType()))
def removeStopWords(text):
    return list(filter(
        lambda w: w not in nltk.corpus.stopwords.words('english'), 
        text
    ))

In [61]:
df = df.withColumn(
    # removing stopwords
    colName= 'text',
    col    = removeStopWords('text')
)
df.show(10, truncate= 100)

[Stage 43:>                                                         (0 + 1) / 1]

+-------------------+---------------+----------------------------------------------------------------------------------------------------+
|          createdAt|       userName|                                                                                                text|
+-------------------+---------------+----------------------------------------------------------------------------------------------------+
|2009-04-06 22:19:45|_TheSpecialOne_|        [user, http, -, Awww, ,, 's, bummer, ., You, shoulda, got, David, Carr, Third, Day, ., ;, D]|
|2009-04-06 22:19:49|  scotthamilton|[upset, ca, n't, update, Facebook, texting, ..., might, cry, result, School, today, also, ., Blah...|
|2009-04-06 22:19:53|       mattycus|                 [user, I, dived, many, times, ball, ., Managed, save, 50, %, The, rest, go, bounds]|
|2009-04-06 22:19:57|        ElleCTF|                                                             [whole, body, feels, itchy, like, fire]|
|2009-04-06 22:19:57|      

                                                                                

## Stemming

In [62]:
# defining the stemmer function
stemmer = nltk.stem.PorterStemmer()

@pyspark.sql.functions.udf(
    returnType= pyspark.sql.types.ArrayType(
        pyspark.sql.types.StringType()))
def stem(text):
    return list(map(
        stemmer.stem, 
        text
    ))

In [63]:
df = df.withColumn(
    # stemming words
    colName= 'text',
    col    = stem('text')
)
df.show(10, truncate= 100)

[Stage 44:>                                                         (0 + 1) / 1]

+-------------------+---------------+-------------------------------------------------------------------------------------------------+
|          createdAt|       userName|                                                                                             text|
+-------------------+---------------+-------------------------------------------------------------------------------------------------+
|2009-04-06 22:19:45|_TheSpecialOne_|     [user, http, -, awww, ,, 's, bummer, ., you, shoulda, got, david, carr, third, day, ., ;, d]|
|2009-04-06 22:19:49|  scotthamilton|[upset, ca, n't, updat, facebook, text, ..., might, cri, result, school, today, also, ., blah, !]|
|2009-04-06 22:19:53|       mattycus|                   [user, i, dive, mani, time, ball, ., manag, save, 50, %, the, rest, go, bound]|
|2009-04-06 22:19:57|        ElleCTF|                                                           [whole, bodi, feel, itchi, like, fire]|
|2009-04-06 22:19:57|         Karoli|           

                                                                                

## joining the processed words back into a string

In [64]:
df = df.withColumn(
    colName= 'text',
    col    = pyspark.sql.functions.concat_ws(' ', 'text')
)
df.show(10, truncate= 100)

[Stage 45:>                                                         (0 + 1) / 1]

+-------------------+---------------+--------------------------------------------------------------------------------+
|          createdAt|       userName|                                                                            text|
+-------------------+---------------+--------------------------------------------------------------------------------+
|2009-04-06 22:19:45|_TheSpecialOne_|       user http - awww , 's bummer . you shoulda got david carr third day . ; d|
|2009-04-06 22:19:49|  scotthamilton|upset ca n't updat facebook text ... might cri result school today also . blah !|
|2009-04-06 22:19:53|       mattycus|                  user i dive mani time ball . manag save 50 % the rest go bound|
|2009-04-06 22:19:57|        ElleCTF|                                                 whole bodi feel itchi like fire|
|2009-04-06 22:19:57|         Karoli|                                     user , 's behav . 'm mad . ? i ca n't see .|
|2009-04-06 22:20:00|       joy_wolf|           

                                                                                

# Writing data to mongodb

In [45]:
( df.write
    .format('mongodb')
    .option('database'      , f'{config.MongoDb.database}')
    .option('collection'    , f'{config.MongoDb.collection}')
    .option('connection.uri', f'mongodb://{config.MongoDb.user}:{config.MongoDb.password}@{config.MongoDb.host}:{config.MongoDb.port}')
    .mode('overwrite')
	.save()
)

                                                                                