In [1]:
!pip install pyspark



# Data preparation and processing 

Import libraries 

In [1]:
import logging
sc.setLogLevel("ERROR")

In [2]:
from pyspark.sql import SparkSession

In [3]:
#Spark session
spark = SparkSession.builder \
    .appName("Tweets") \
    .getOrCreate()

In [4]:
# CVS
path = "file:///home/hduser/myenv/ProjectTweets.csv"

df = spark.read.csv(path, header=True, inferSchema=True)

                                                                                

Analysis of the file format 

In [5]:
# DataFrame
df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- 1467810369: long (nullable = true)
 |-- Mon Apr 06 22:19:45 PDT 2009: string (nullable = true)
 |-- NO_QUERY: string (nullable = true)
 |-- _TheSpecialOne_: string (nullable = true)
 |-- @switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D: string (nullable = true)



In [6]:
df.show()

+---+----------+----------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------+
|  0|1467810369|Mon Apr 06 22:19:45 PDT 2009|NO_QUERY|_TheSpecialOne_|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|
+---+----------+----------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------+
|  1|1467810672|        Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|                                                                                               is upset that he ...|
|  2|1467810917|        Mon Apr 06 22:19:...|NO_QUERY|       mattycus|                                                                                               @Kenichan I dived...|
|  3|1467811184|        Mon Apr 06 22:19:...|NO_QUERY|        Ell

In [7]:
df.count()

                                                                                

1599999

#Create a new Dataframe from the original file  

In [8]:
from pyspark.sql.types import (IntegerType, StringType, 
                               TimestampType, StructType,
                               StructField, ArrayType,
                               TimestampType)

import pyspark.sql.functions as F

In [9]:
new_schema = StructType([
    StructField("new_number", StringType(), True),
    StructField("new__id", StringType(), True),
    StructField("new_date", StringType(), True),
    StructField("new_flag", StringType(), True),
    StructField("new_user", StringType(), True),
    StructField("new_tweet", StringType(), True)
])

In [10]:
df = spark.createDataFrame(df.rdd, schema=new_schema)
df.printSchema()

root
 |-- new_number: string (nullable = true)
 |-- new__id: string (nullable = true)
 |-- new_date: string (nullable = true)
 |-- new_flag: string (nullable = true)
 |-- new_user: string (nullable = true)
 |-- new_tweet: string (nullable = true)



In [11]:
column_names = df.columns

# Rename columns
for old_name in column_names:
    new_name = old_name.replace("new_", "")
    df = df.withColumnRenamed(old_name, new_name)

df.printSchema()

root
 |-- number: string (nullable = true)
 |-- _id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- tweet: string (nullable = true)



New data frame structure 

In [12]:
df.show()

[Stage 6:>                                                          (0 + 1) / 1]

+------+----------+--------------------+--------+---------------+--------------------+
|number|       _id|                date|    flag|           user|               tweet|
+------+----------+--------------------+--------+---------------+--------------------+
|     1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|     5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|     7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|     8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|     9|1467812025|Mon Apr 06 22:20:...|NO_

                                                                                

A future projection will be made so the time column is important, the format of that column is displayed to be able to separate it into different columns. 

In [13]:
df.select("date").take(2)

                                                                                

[Row(date='Mon Apr 06 22:19:49 PDT 2009'),
 Row(date='Mon Apr 06 22:19:53 PDT 2009')]

In [14]:
#Import libraries 
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

Day of the week 

In [15]:
def DayWeek(s : str) -> str:
    """
    Converts the string from the tweets to day of week by 
    extracting the first three characters from the string
    
    """
    day      =  s[:3]
    newday  = ""
    
    if day   == "Sun":
        newday = "Sunday"
    elif day == "Mon":
        newday = "Monday"
    elif day == "Tue":
        newday = "Tuesday"
    elif day == "Wed":
        newday = "Wednesday"
    elif day == "Thu":
        newday = "Thursday"
    elif day == "Fri":
        newday = "Friday"
    else:
        newday = "Saturday"
    
    return newday

In [16]:
# Define a User Defined Function (UDF) in Spark to apply the DayWeek function to the data in a DataFrame
DayDF = F.udf(DayWeek, StringType())

In [17]:
df = df.withColumn("DayWeek", DayDF(df["date"]))

In [18]:
df.show()

                                                                                

+------+----------+--------------------+--------+---------------+--------------------+-------+
|number|       _id|                date|    flag|           user|               tweet|DayWeek|
+------+----------+--------------------+--------+---------------+--------------------+-------+
|     1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...| Monday|
|     2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...| Monday|
|     3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...| Monday|
|     4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...| Monday|
|     5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...| Monday|
|     6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug | Monday|
|     7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...| Monday|
|     8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2

In [None]:
#Extract remaining date information in different columns from the original column 

In [19]:
spark = SparkSession.builder \
    .appName("Split Date Column") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

In [20]:
from pyspark.sql.functions import col, to_timestamp

# Timestamp format
df2 = df.withColumn("date", to_timestamp("date", "EEE MMM dd HH:mm:ss zzz yyyy"))

# Year
df2 = df.withColumn("year", df.date.substr(-4, 4))        

df2.show()

+------+----------+--------------------+--------+---------------+--------------------+-------+----+
|number|       _id|                date|    flag|           user|               tweet|DayWeek|year|
+------+----------+--------------------+--------+---------------+--------------------+-------+----+
|     1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...| Monday|2009|
|     2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...| Monday|2009|
|     3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...| Monday|2009|
|     4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...| Monday|2009|
|     5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...| Monday|2009|
|     6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug | Monday|2009|
|     7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...| Monday|2009|


                                                                                

In [21]:
# Day, Hour and Month
df2 = df2.withColumn("day", df2.date.substr(9, 2))          
df2 = df2.withColumn("hour", df2.date.substr(12, 8))       
df2 = df2.withColumn("month", df2.date.substr(6, 2))        

df2.show()

+------+----------+--------------------+--------+---------------+--------------------+-------+----+---+--------+-----+
|number|       _id|                date|    flag|           user|               tweet|DayWeek|year|day|    hour|month|
+------+----------+--------------------+--------+---------------+--------------------+-------+----+---+--------+-----+
|     1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...| Monday|2009| 06|22:19:49|   pr|
|     2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...| Monday|2009| 06|22:19:53|   pr|
|     3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...| Monday|2009| 06|22:19:57|   pr|
|     4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...| Monday|2009| 06|22:19:57|   pr|
|     5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...| Monday|2009| 06|22:20:00|   pr|
|     6|1467811592|Mon Apr 06 22:20:...|NO_QUERY

                                                                                

The month extraction was erroneously so the code is changed to correct it. 

In [22]:
from pyspark.sql.functions import month, to_timestamp

df2 = df2.withColumn("date", to_timestamp("date", "EEE MMM dd HH:mm:ss zzz yyyy"))

# Month
df2 = df2.withColumn("month", month("date"))

df2.show()

[Stage 11:>                                                         (0 + 1) / 1]

+------+----------+-------------------+--------+---------------+--------------------+-------+----+---+--------+-----+
|number|       _id|               date|    flag|           user|               tweet|DayWeek|year|day|    hour|month|
+------+----------+-------------------+--------+---------------+--------------------+-------+----+---+--------+-----+
|     1|1467810672|2009-04-07 06:19:49|NO_QUERY|  scotthamilton|is upset that he ...| Monday|2009| 06|22:19:49|    4|
|     2|1467810917|2009-04-07 06:19:53|NO_QUERY|       mattycus|@Kenichan I dived...| Monday|2009| 06|22:19:53|    4|
|     3|1467811184|2009-04-07 06:19:57|NO_QUERY|        ElleCTF|my whole body fee...| Monday|2009| 06|22:19:57|    4|
|     4|1467811193|2009-04-07 06:19:57|NO_QUERY|         Karoli|@nationwideclass ...| Monday|2009| 06|22:19:57|    4|
|     5|1467811372|2009-04-07 06:20:00|NO_QUERY|       joy_wolf|@Kwesidei not the...| Monday|2009| 06|22:20:00|    4|
|     6|1467811592|2009-04-07 06:20:03|NO_QUERY|        

                                                                                

In [23]:
# String
df2 = df2.withColumn("year", col("year").cast("string"))
df2 = df2.withColumn("day", col("day").cast("string"))
df2 = df2.withColumn("hour", col("hour").cast("string"))
df2 = df2.withColumn("month", col("month").cast("string"))

df2.show()

[Stage 12:>                                                         (0 + 1) / 1]

+------+----------+-------------------+--------+---------------+--------------------+-------+----+---+--------+-----+
|number|       _id|               date|    flag|           user|               tweet|DayWeek|year|day|    hour|month|
+------+----------+-------------------+--------+---------------+--------------------+-------+----+---+--------+-----+
|     1|1467810672|2009-04-07 06:19:49|NO_QUERY|  scotthamilton|is upset that he ...| Monday|2009| 06|22:19:49|    4|
|     2|1467810917|2009-04-07 06:19:53|NO_QUERY|       mattycus|@Kenichan I dived...| Monday|2009| 06|22:19:53|    4|
|     3|1467811184|2009-04-07 06:19:57|NO_QUERY|        ElleCTF|my whole body fee...| Monday|2009| 06|22:19:57|    4|
|     4|1467811193|2009-04-07 06:19:57|NO_QUERY|         Karoli|@nationwideclass ...| Monday|2009| 06|22:19:57|    4|
|     5|1467811372|2009-04-07 06:20:00|NO_QUERY|       joy_wolf|@Kwesidei not the...| Monday|2009| 06|22:20:00|    4|
|     6|1467811592|2009-04-07 06:20:03|NO_QUERY|        

                                                                                

Data cleaning

In [24]:
#Data in Column
df2.select("flag").describe().show()



+-------+--------+
|summary|    flag|
+-------+--------+
|  count| 1599999|
|   mean|    null|
| stddev|    null|
|    min|NO_QUERY|
|    max|NO_QUERY|
+-------+--------+



                                                                                

In [25]:
#Missing Data
Nulls = False
for column in df2.columns:
    if df2.where(col(column).isNull()).count() > 0:
        Nulls = True
        break

if Nulls:
    print("There are null values in the DataFrame.")
else:
    print("There are NO null values in the DataFrame.")



There are NO null values in the DataFrame.


                                                                                

In [26]:
# Drop the columns 
df2 = df2.drop("number","flag","date")

In [27]:
#Final dataframe
df2.show()

+----------+---------------+--------------------+-------+----+---+--------+-----+
|       _id|           user|               tweet|DayWeek|year|day|    hour|month|
+----------+---------------+--------------------+-------+----+---+--------+-----+
|1467810672|  scotthamilton|is upset that he ...| Monday|2009| 06|22:19:49|    4|
|1467810917|       mattycus|@Kenichan I dived...| Monday|2009| 06|22:19:53|    4|
|1467811184|        ElleCTF|my whole body fee...| Monday|2009| 06|22:19:57|    4|
|1467811193|         Karoli|@nationwideclass ...| Monday|2009| 06|22:19:57|    4|
|1467811372|       joy_wolf|@Kwesidei not the...| Monday|2009| 06|22:20:00|    4|
|1467811592|        mybirch|         Need a hug | Monday|2009| 06|22:20:03|    4|
|1467811594|           coZZ|@LOLTrish hey  lo...| Monday|2009| 06|22:20:03|    4|
|1467811795|2Hood4Hollywood|@Tatiana_K nope t...| Monday|2009| 06|22:20:05|    4|
|1467812025|        mimismo|@twittera que me ...| Monday|2009| 06|22:20:09|    4|
|1467812416| eri

                                                                                

# Sentiment analysis

Text processing for analysis 

In [28]:
from pyspark.ml.feature import Tokenizer

# tokenize tweets (breaking a sequence of text into smaller units, such as individual words or characters)
tokenizer = Tokenizer(inputCol  = "tweet",
                      outputCol = "token")

df3 = tokenizer.transform(df2)

In [29]:
df3.limit(2).toPandas()

                                                                                

Unnamed: 0,_id,user,tweet,DayWeek,year,day,hour,month,token
0,1467810672,scotthamilton,is upset that he can't update his Facebook by ...,Monday,2009,6,22:19:49,4,"[is, upset, that, he, can't, update, his, face..."
1,1467810917,mattycus,@Kenichan I dived many times for the ball. Man...,Monday,2009,6,22:19:53,4,"[@kenichan, i, dived, many, times, for, the, b..."


Removes special characters, call outs and web addresses from tokens

In [30]:
import re

def removeRegex(tokens: list) -> list:
    
    expr    = '(@[A-Za-z0-a9_]+)|(#[A-Za-z0-9_]+)|'+\
              '(https?://[^\s<>"]+|www\.[^\s<>"]+)'
        
    regex   = re.compile(expr)

    cleaned = [t for t in tokens if not(regex.search(t)) if len(t) > 0]

    return list(filter(None, cleaned))

In [31]:
removeWEBUDF = F.udf(removeRegex, ArrayType(StringType()))

Removing non-english characters and returns lower case versions of words.

In [32]:
def normalize(tokens : list) -> list:
    
    subbed   = [re.sub("[^a-zA-Z]+", "", s).lower() for s in tokens]
    
    filtered = filter(None, subbed)
    
    return list(filtered)


normalizeUDF = F.udf(normalize, ArrayType(StringType()))

In [33]:
df3 = df3.withColumn("tokens_re", removeWEBUDF(df3["token"]))

df3 = df3.withColumn("tokens_clean", normalizeUDF(df3["tokens_re"]))

# rename columns
df4 = df3.drop("token","tokens_re")
df4 = df4.withColumnRenamed("tokens_clean", "tokens")

# remove tweets where the tokens array is empty
df5= df4.where(F.size(F.col("tokens")) > 0)

In [34]:
df5.limit(2).toPandas()

                                                                                

Unnamed: 0,_id,user,tweet,DayWeek,year,day,hour,month,tokens
0,1467810672,scotthamilton,is upset that he can't update his Facebook by ...,Monday,2009,6,22:19:49,4,"[is, upset, that, he, cant, update, his, faceb..."
1,1467810917,mattycus,@Kenichan I dived many times for the ball. Man...,Monday,2009,6,22:19:53,4,"[i, dived, many, times, for, the, ball, manage..."


In [None]:
dfmongo= df5