In [1]:
#importing the relevant packages
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.functions import date_format
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import col, year,month, to_date, col, isnan, when, count, udf, trim, ltrim, rtrim, lower
from pyspark.sql import Window
import time
from pyspark.sql.functions import to_timestamp
import re

import string

#nltk related package/modules
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.stem.porter import PorterStemmer
from pyspark.sql.types import ArrayType, StringType

#supressing the warnings
import warnings
warnings.filterwarnings("ignore")

[nltk_data] Downloading package stopwords to /home/hduser/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [2]:
#may need to pip install nltk if not available

In [3]:
#!pip install nltk

In [4]:
#looking at the spark context, showing the connection to the spark cluster
sc

## Section 1: Loading data directly from Cassandra database
Using the cassandra connector, it is possible to load the data directy from Cassandra using the spark.read.format command in the below code:

In [16]:
#reading in file from cassandra cluster to pyspark dataframe
project_tweets_df = spark.read.format("org.apache.spark.sql.cassandra")\
                    .options(table="project_tweets", keyspace="twitter_sentiment").load()

Spark programming model has two types of operations: **Actions** and **Transformations**.<br>

- **Transformations** are operations on RDDs, DataFrames, or Datasets that produce a new distributed dataset from an existing one. They are generally lazy, meaning they are not executed immediately but create a logical execution plan. These include functions such as withColumn, groupBy, filter, join etc. <br>

- **Actions** are operations that trigger the execution of transformations and return a value to the driver program or write data to an external storage system. They are the operations that actually initiate the computation. <br>

Both of these will be utilised in the below code, starting with the two Action operations .show() and .count() used immediately below.

In [17]:
#displaying the first 5 rows of the data
project_tweets_df.show(5)

+-------+--------------------+--------+----------+--------------------+-----------+
|   indx|                date|    flag|       ids|                text|       user|
+-------+--------------------+--------+----------+--------------------+-----------+
|1328206|Wed Jun 03 04:47:...|NO_QUERY|2015487701|hmmm i need to fi...|  xxlogannn|
|  84049|Sun May 10 01:57:...|NO_QUERY|1753501222|2 stressed about ...|  SonyCandy|
| 571099|Wed Jun 17 09:28:...|NO_QUERY|2208663921|And of course, pa...|   JMJ697MN|
| 296315|Mon Jun 01 16:14:...|NO_QUERY|1997095180|Hmm... Everything...|sillygirlkc|
| 761637|Tue Jun 23 10:20:...|NO_QUERY|2297349871|Oh gee i hate whe...|MLB_Roxanne|
+-------+--------------------+--------+----------+--------------------+-----------+
only showing top 5 rows



In [18]:
project_tweets_df.count()

                                                                                

1599671

- Note that there are less than the total 1,600,000 records due to Cassandra not including duplicates when inserting tables into the database. Therefore any records with matching index (i.e indx) values will not be inserted.

In [19]:
#taking a look at the data types
project_tweets_df.printSchema()

root
 |-- indx: string (nullable = false)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- ids: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user: string (nullable = true)



## Section 2: cleaning the data

### recasting string columns to longs 

In [20]:
#casting to longs
columns_to_integer = ['ids','indx']

for col_name in columns_to_integer:
    project_tweets_df = project_tweets_df.withColumn(col_name, col(col_name).cast('long'))

### updating date column to a timestamp datatype to aid in analysis
Need to timeParserPolicy to LEGACY to allow this casting as it is not supported in Spark version > 3.0

In [21]:
#set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0.
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

DataFrame[key: string, value: string]

In [22]:
# Convert string to timestamp and standardizing timezone to UTC
project_tweets_df = project_tweets_df.withColumn("datetime", \
                                                 to_timestamp(project_tweets_df["date"],\
                                                              "EEE MMM dd HH:mm:ss zzz yyyy"))

In [23]:
project_tweets_df.show()

+-------+--------------------+--------+----------+--------------------+---------------+-------------------+
|   indx|                date|    flag|       ids|                text|           user|           datetime|
+-------+--------------------+--------+----------+--------------------+---------------+-------------------+
| 738991|Sun Jun 21 06:57:...|NO_QUERY|2265614063|is church, celebr...|  buffsnowangel|2009-06-21 14:57:49|
| 766924|Tue Jun 23 13:50:...|NO_QUERY|2300192267|//myspace message...|         jroyyy|2009-06-23 21:50:40|
|1079727|Fri May 29 20:47:...|NO_QUERY|1968117864|Had a super long ...|marryme_invegas|2009-05-30 04:47:34|
| 563328|Wed Jun 17 05:38:...|NO_QUERY|2205889101|@JustJenzz well i...|     keisyaarya|2009-06-17 13:38:06|
|1406060|Sat Jun 06 09:03:...|NO_QUERY|2055291244|@NoBSGamers i jus...|        craven_|2009-06-06 17:03:00|
| 731496|Sun Jun 21 01:57:...|NO_QUERY|2263839151|says this is gett...|   heidelicious|2009-06-21 09:57:58|
| 469332|Mon Jun 15 03:10:..

In [15]:
#updating the datetime 
#df = project_tweets_df.withColumn('date', to_date(project_tweets_df['datetime']))
#df = df.sort('datetime',ascending=True)
#distinct_dates_df = df.select('datetime').distinct().sort('datetime',ascending=True)#
#distinct_dates_df.tail(10)
#df.show(5)
#distinct_dates_list = [row.datetime for row in distinct_dates_df.collect()]
#distinct_dates_list

In [28]:
project_tweets_df.select("flag").distinct().show() 



+--------+
|    flag|
+--------+
|NO_QUERY|
+--------+



                                                                                

### Removing flag and date column as these are not required for analysis

In [29]:
project_tweets_df = project_tweets_df.drop(*['date','flag','ids'])

### Restructuring layout of columns

In [30]:
project_tweets_df = project_tweets_df.select(["indx","datetime","user","text"])

### Sorting based on index value

In [31]:
#sorting based on indx
project_tweets_df = project_tweets_df.sort('indx',ascending=True)

In [32]:
project_tweets_df.show()



+----+-------------------+---------------+--------------------+
|indx|           datetime|           user|                text|
+----+-------------------+---------------+--------------------+
|   0|2009-04-07 06:19:45|_TheSpecialOne_|@switchfoot http:...|
|   1|2009-04-07 06:19:49|  scotthamilton|is upset that he ...|
|   2|2009-04-07 06:19:53|       mattycus|@Kenichan I dived...|
|   3|2009-04-07 06:19:57|        ElleCTF|my whole body fee...|
|   4|2009-04-07 06:19:57|         Karoli|@nationwideclass ...|
|   5|2009-04-07 06:20:00|       joy_wolf|@Kwesidei not the...|
|   6|2009-04-07 06:20:03|        mybirch|         Need a hug |
|   7|2009-04-07 06:20:03|           coZZ|@LOLTrish hey  lo...|
|   8|2009-04-07 06:20:05|2Hood4Hollywood|@Tatiana_K nope t...|
|   9|2009-04-07 06:20:09|        mimismo|@twittera que me ...|
|  10|2009-04-07 06:20:16| erinx3leannexo|spring break in p...|
|  11|2009-04-07 06:20:17|   pardonlauren|I just re-pierced...|
|  12|2009-04-07 06:20:19|           TLe

                                                                                

### Checking for null values

In [33]:
project_tweets_df.select([count(when(col(c).isNull(), c)).alias(c) for c in project_tweets_df.columns]).show()



+----+--------+----+----+
|indx|datetime|user|text|
+----+--------+----+----+
|   0|       0|   0|   0|
+----+--------+----+----+



                                                                                

### Checking for duplicates

In [35]:
project_tweets_df.exceptAll(project_tweets_df.dropDuplicates()).show()

24/05/13 09:20:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/05/13 09:20:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

+----+--------+----+----+
|indx|datetime|user|text|
+----+--------+----+----+
+----+--------+----+----+





In [36]:
project_tweets_df.select('text').show(5,truncate=False)



+-------------------------------------------------------------------------------------------------------------------+
|text                                                                                                               |
+-------------------------------------------------------------------------------------------------------------------+
|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|
|is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!    |
|@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds                          |
|my whole body feels itchy and like its on fire                                                                     |
|@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there.     |
+-------------------------------------------------------

                                                                                

## Section 3: Text Processing

This section will involve processing the text column in preparation for sentiment analysis. In order to perform sentiment analysis correctly, the text needs to be cleaned:
- Twitter handles or usernames which add no value to the sentiment removed.
- URLs or http/https links removed from the text.
- Special characters such as #,",@ etc. removed.
- Stopwords (which are commonly appearing words in the english language such as "the", "a", "an" etc.) are removed.
- Digits are removed.
- All text made lower case.
- Any leading or trailing whitespace removed.
- Lemmetization is applied to break down each word in the text to its base root mode.

Each of these processing steps are conducted using **User Defined Functions (UDF)**, which are a feature of spark, to enable the defined function to applied to each row using the withColumn transformation function.<br>
User Defined Functions (UDF) can be used to perform data transformation operations which are not already present in Pyspark built-in functionality


### Removing Username handles from text

In [37]:
#defining UDF for removing usernames
def remove_usernames(text):
    #regular expression for twitter username handles
    regex = r'@([A-Za-z0-9_]+)'
    #returning text without username handles by replacing them with an empty string value
    return re.sub(regex, '', text)

In [38]:
remove_usernames_udf = udf(remove_usernames)

In [39]:
project_tweets_df = project_tweets_df.withColumn('text', remove_usernames_udf('text'))

In [40]:
project_tweets_df.select('text').show(5,truncate=False)

[Stage 58:>                                                         (0 + 1) / 1]

+---------------------------------------------------------------------------------------------------------------+
|text                                                                                                           |
+---------------------------------------------------------------------------------------------------------------+
| http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D       |
|is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!|
| I dived many times for the ball. Managed to save 50%  The rest go out of bounds                               |
|my whole body feels itchy and like its on fire                                                                 |
| no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there.                 |
+---------------------------------------------------------------------------------------

                                                                                

In [41]:
project_tweets_df.filter(project_tweets_df.text.like("%http%")).select('text').show(truncate=False)



+----------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                    |
+----------------------------------------------------------------------------------------------------------------------------------------+
| http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D                                |
| sorry! bed time came here (GMT+1)   http://is.gd/fNge                                                                                  |
|Broadband plan 'a massive broken promise' http://tinyurl.com/dcuc33 via www.diigo.com/~tautao Still waiting for broadband we are        |
|Why won't you show my location?!   http://twitpic.com/2y2es                                                                             |
|Strider is a sick little p

                                                                                

### Removing urls within the text

In [42]:
def remove_URL(text):
    return re.sub(r"http\S+", "", text)

In [43]:
remove_URL_udf = udf(remove_URL)

In [44]:
project_tweets_df = project_tweets_df.withColumn('text', remove_URL_udf('text'))

In [45]:
project_tweets_df.select('text').show(5,truncate=False)



+---------------------------------------------------------------------------------------------------------------+
|text                                                                                                           |
+---------------------------------------------------------------------------------------------------------------+
|  - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D                               |
|is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!|
| I dived many times for the ball. Managed to save 50%  The rest go out of bounds                               |
|my whole body feels itchy and like its on fire                                                                 |
| no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there.                 |
+---------------------------------------------------------------------------------------

                                                                                

In [46]:
project_tweets_df.filter(project_tweets_df.text.like("%http://%")).select('text').show(truncate=False)



+----+
|text|
+----+
+----+



### Removing special characters (i.e punctuation, hashtags etc.)

In [47]:
project_tweets_df.show(5)



+----+-------------------+---------------+--------------------+
|indx|           datetime|           user|                text|
+----+-------------------+---------------+--------------------+
|   0|2009-04-07 06:19:45|_TheSpecialOne_|  - Awww, that's ...|
|   1|2009-04-07 06:19:49|  scotthamilton|is upset that he ...|
|   2|2009-04-07 06:19:53|       mattycus| I dived many tim...|
|   3|2009-04-07 06:19:57|        ElleCTF|my whole body fee...|
|   4|2009-04-07 06:19:57|         Karoli| no, it's not beh...|
+----+-------------------+---------------+--------------------+
only showing top 5 rows



                                                                                

In [48]:
string.punctuation

'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'

In [49]:
def remove_punctuation(text):
    #defining regex to only include characters, digits or whitespace
    char =r'[^a-zA-Z0-9\s]'
    return re.sub(char, "", text)

In [50]:
remove_punctuation_udf = udf(remove_punctuation)

In [51]:
project_tweets_df = project_tweets_df.withColumn('text', remove_punctuation_udf('text'))

In [52]:
project_tweets_df.select('text').show(20, truncate=False)



+---------------------------------------------------------------------------------------------------------+
|text                                                                                                     |
+---------------------------------------------------------------------------------------------------------+
|   Awww thats a bummer  You shoulda got David Carr of Third Day to do it D                               |
|is upset that he cant update his Facebook by texting it and might cry as a result  School today also Blah|
| I dived many times for the ball Managed to save 50  The rest go out of bounds                           |
|my whole body feels itchy and like its on fire                                                           |
| no its not behaving at all im mad why am i here because I cant see you all over there                   |
| not the whole crew                                                                                      |
|Need a hug                 

                                                                                

### Removing digits from the text

In [53]:
# Remove digits from the text column
project_tweets_df = project_tweets_df.withColumn("text", regexp_replace("text", "[0-9]", ""))

In [54]:
project_tweets_df.select('text').show(truncate=False)



+---------------------------------------------------------------------------------------------------------+
|text                                                                                                     |
+---------------------------------------------------------------------------------------------------------+
|   Awww thats a bummer  You shoulda got David Carr of Third Day to do it D                               |
|is upset that he cant update his Facebook by texting it and might cry as a result  School today also Blah|
| I dived many times for the ball Managed to save   The rest go out of bounds                             |
|my whole body feels itchy and like its on fire                                                           |
| no its not behaving at all im mad why am i here because I cant see you all over there                   |
| not the whole crew                                                                                      |
|Need a hug                 

                                                                                

### Removing leading and trailing whitespace from text

In [55]:
#can use the trim, ltrim and rtrim to remove the white space
project_tweets_df = project_tweets_df.withColumn('text', trim('text'))
project_tweets_df = project_tweets_df.withColumn('text', ltrim('text'))
project_tweets_df = project_tweets_df.withColumn('text', rtrim('text'))


project_tweets_df.select('text').show(truncate=False)



+---------------------------------------------------------------------------------------------------------+
|text                                                                                                     |
+---------------------------------------------------------------------------------------------------------+
|Awww thats a bummer  You shoulda got David Carr of Third Day to do it D                                  |
|is upset that he cant update his Facebook by texting it and might cry as a result  School today also Blah|
|I dived many times for the ball Managed to save   The rest go out of bounds                              |
|my whole body feels itchy and like its on fire                                                           |
|no its not behaving at all im mad why am i here because I cant see you all over there                    |
|not the whole crew                                                                                       |
|Need a hug                 

                                                                                

### Making lower case

In [56]:
project_tweets_df = project_tweets_df.withColumn('text', lower('text'))

project_tweets_df.select('text').show(truncate=False)



+---------------------------------------------------------------------------------------------------------+
|text                                                                                                     |
+---------------------------------------------------------------------------------------------------------+
|awww thats a bummer  you shoulda got david carr of third day to do it d                                  |
|is upset that he cant update his facebook by texting it and might cry as a result  school today also blah|
|i dived many times for the ball managed to save   the rest go out of bounds                              |
|my whole body feels itchy and like its on fire                                                           |
|no its not behaving at all im mad why am i here because i cant see you all over there                    |
|not the whole crew                                                                                       |
|need a hug                 

                                                                                

### Can drop the user column as this wont be required for sentiment forecasting

In [57]:
project_tweets_df = project_tweets_df.drop('user')

In [58]:
project_tweets_df.show()



+----+-------------------+--------------------+
|indx|           datetime|                text|
+----+-------------------+--------------------+
|   0|2009-04-07 06:19:45|awww thats a bumm...|
|   1|2009-04-07 06:19:49|is upset that he ...|
|   2|2009-04-07 06:19:53|i dived many time...|
|   3|2009-04-07 06:19:57|my whole body fee...|
|   4|2009-04-07 06:19:57|no its not behavi...|
|   5|2009-04-07 06:20:00|  not the whole crew|
|   6|2009-04-07 06:20:03|          need a hug|
|   7|2009-04-07 06:20:03|hey  long time no...|
|   8|2009-04-07 06:20:05|nope they didnt h...|
|   9|2009-04-07 06:20:09|        que me muera|
|  10|2009-04-07 06:20:16|spring break in p...|
|  11|2009-04-07 06:20:17|i just repierced ...|
|  12|2009-04-07 06:20:19|i couldnt bear to...|
|  13|2009-04-07 06:20:19|it it counts idk ...|
|  14|2009-04-07 06:20:20|i wouldve been th...|
|  15|2009-04-07 06:20:20|i wish i got to w...|
|  16|2009-04-07 06:20:22|hollis death scen...|
|  17|2009-04-07 06:20:25| about to file

                                                                                

### Removing stop words

In [59]:
stop_words = set(stopwords.words("english"))

In [60]:
#removing the stop words

def remove_stop_words(text):
    return ' '.join([word for word in text.split() if word not in stop_words])


In [61]:
remove_stop_words_udf = udf(remove_stop_words)

In [62]:
project_tweets_df = project_tweets_df.withColumn('text', remove_stop_words_udf(col('text')))

In [63]:
project_tweets_df.select('text').show(truncate=False)



+--------------------------------------------------------------------------+
|text                                                                      |
+--------------------------------------------------------------------------+
|awww thats bummer shoulda got david carr third day                        |
|upset cant update facebook texting might cry result school today also blah|
|dived many times ball managed save rest go bounds                         |
|whole body feels itchy like fire                                          |
|behaving im mad cant see                                                  |
|whole crew                                                                |
|need hug                                                                  |
|hey long time see yes rains bit bit lol im fine thanks hows               |
|nope didnt                                                                |
|que muera                                                                 |

                                                                                

### applying lemmatization to the text 

In [64]:
#nltk.download('wordnet')

In [65]:
#nltk.download('punkt')

In [66]:
lemmatizer = WordNetLemmatizer() 
 
print("giving :", lemmatizer.lemmatize("giving")) 
print("rocks :", lemmatizer.lemmatize("rocks")) 

giving : giving
rocks : rock


In [67]:
#firstly need to tokenize the words to allow each word to be lemmatized
def tokenize_text(text):
    return word_tokenize(text)

tokenize_text_udf = udf(tokenize_text, ArrayType(StringType()))

In [68]:
project_tweets_df = project_tweets_df.withColumn("text", tokenize_text_udf(col("text")))

In [69]:
#defining udf to lemmatize the text using the WordNetLemmatizer instantiated above
def lemmatize_text(words):
    return [lemmatizer.lemmatize(word) for word in words]

lemmatize_text_udf = udf(lemmatize_text, ArrayType(StringType()))

project_tweets_df = project_tweets_df.withColumn("text", lemmatize_text_udf(col("text")))

In [70]:
#finally need to join the tokens back together 
def join_text(words):
    return " ".join(words)

join_text_udf = udf(join_text, StringType())
project_tweets_df = project_tweets_df.withColumn("text", join_text_udf(col("text")))

In [71]:
project_tweets_df.select('text').show(30, truncate=False)

[Stage 100:>                                                        (0 + 1) / 1]

+--------------------------------------------------------------------------+
|text                                                                      |
+--------------------------------------------------------------------------+
|awww thats bummer shoulda got david carr third day                        |
|upset cant update facebook texting might cry result school today also blah|
|dived many time ball managed save rest go bound                           |
|whole body feel itchy like fire                                           |
|behaving im mad cant see                                                  |
|whole crew                                                                |
|need hug                                                                  |
|hey long time see yes rain bit bit lol im fine thanks hows                |
|nope didnt                                                                |
|que muera                                                                 |

                                                                                

### checking for empty string values after filtering done
-These would have arisen from where the text only contained usernames or links which were removed leading to empty strings:

In [72]:
project_tweets_df.filter(project_tweets_df['text']=='').count()

                                                                                

7515

In [73]:
project_tweets_df = project_tweets_df.filter(project_tweets_df["text"]!='')

In [74]:
project_tweets_df.filter(project_tweets_df['text']=='').count()

0

### getting the number of tweets per day

In [60]:
project_tweets_df = project_tweets_df.withColumn("date", date_format(col("datetime"), "yyyy-MM-dd"))

In [61]:
tweets_by_date = project_tweets_df.groupBy('date').count().show()



+----------+------+
|      date| count|
+----------+------+
|2009-06-23| 18447|
|2009-04-18| 16073|
|2009-05-04| 29672|
|2009-06-20| 45448|
|2009-05-27| 11554|
|2009-04-19| 33522|
|2009-05-25|   168|
|2009-06-18| 43077|
|2009-05-29| 55588|
|2009-06-04|  4612|
|2009-04-20| 18360|
|2009-06-03| 60970|
|2009-04-07| 20568|
|2009-04-21| 11067|
|2009-05-22| 40964|
|2009-06-22|  6480|
|2009-05-03| 24920|
|2009-06-17| 43818|
|2009-06-21| 32531|
|2009-06-06|103495|
+----------+------+
only showing top 20 rows



                                                                                

## Section 4: Saving back to Cassandra database

In [75]:
project_tweets_df.show()



+----+-------------------+--------------------+
|indx|           datetime|                text|
+----+-------------------+--------------------+
|   0|2009-04-07 06:19:45|awww thats bummer...|
|   1|2009-04-07 06:19:49|upset cant update...|
|   2|2009-04-07 06:19:53|dived many time b...|
|   3|2009-04-07 06:19:57|whole body feel i...|
|   4|2009-04-07 06:19:57|behaving im mad c...|
|   5|2009-04-07 06:20:00|          whole crew|
|   6|2009-04-07 06:20:03|            need hug|
|   7|2009-04-07 06:20:03|hey long time see...|
|   8|2009-04-07 06:20:05|          nope didnt|
|   9|2009-04-07 06:20:09|           que muera|
|  10|2009-04-07 06:20:16|spring break plai...|
|  11|2009-04-07 06:20:17|       repierced ear|
|  12|2009-04-07 06:20:19|couldnt bear watc...|
|  13|2009-04-07 06:20:19|count idk either ...|
|  14|2009-04-07 06:20:20|wouldve first did...|
|  15|2009-04-07 06:20:20|wish got watch mi...|
|  16|2009-04-07 06:20:22|hollis death scen...|
|  17|2009-04-07 06:20:25|            fi

                                                                                

In [76]:
project_tweets_df = project_tweets_df.drop('date')

In [77]:
project_tweets_df.printSchema()

root
 |-- indx: long (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- text: string (nullable = true)



In [78]:
#save to cassandra
project_tweets_df.write.format("org.apache.spark.sql.cassandra").options(table="project_tweets_processed", keyspace="twitter_sentiment").mode('append').save()

In [81]:
#save to local
project_tweets_df.write.format('csv').save('file:///home/hduser/Desktop/CA2/processed_tweets.csv')

In [82]:
#project_tweets_df_new.write\
#    .format("org.apache.spark.sql.cassandra")\
#    .options(table="project_tweets", keyspace="twitter_sentiment")\
#    .mode('append')\
#    .option("spark.cassandra.connection.host","127.0.0.1")\
#    .option("spark.cassandra.connection.port", "9042").save()

In [83]:
#saves to HDFS
#project_tweets_df.write.csv('processed_tweets.csv')