In [1]:
import sqlite3
from sqlite3 import Error

from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.functions import udf
from pyspark.sql.types import (StructField,StructType,StringType,FloatType)

from textblob import TextBlob
import re

In [2]:
def create_connection(db_file):
    connection = None
    try:
        connection = sqlite3.connect(db_file)
        return connection
    except Error as e:
        print(e)
 
    return connection

In [3]:
def return_rows_from_sqlite(select_sql):
    try:
        c = conn.cursor()
        c.execute(select_sql)
        rows = c.fetchall()
        c.close()  
        
        return rows
    except Error as e:
        print(e)

In [4]:
def return_spark_df(select_sql, data_schema):
    
    rows = return_rows_from_sqlite(select_sql)
    rdd = sc.parallelize(rows)
    df = sqlContext.createDataFrame(rdd, StructType(fields=data_schema))
    
    return df

In [5]:
def return_sentiment(text):

    text = str(text)
    
    cleaned_text = ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t]) |(\w+:\/\/\S+)", " ", text).split()) 

    sentiment_analysis = TextBlob(cleaned_text) 
    
    sentiment_score = sentiment_analysis.sentiment.polarity
    
    if sentiment_score < 0 :
        return "Negative"
    elif sentiment_score == 0:
        return "Neutral"
    else:
        return "Positive"

In [6]:
def update_table(update_sql, parameters):
    try:
        c = conn.cursor()
        c.execute(update_sql, parameters)
        conn.commit()
        c.close()  
        
    except Error as e:
        print(e)

In [7]:
def update_sentiment_table(updated_sentiment_df):
    sql = ''' UPDATE sentiment_table
              SET sentiment = ? 
              WHERE sentiment_id = ?'''
   
    sentiments_list = [x["sentiment"] for x in updated_sentiment_df.rdd.collect()]
    sentiment_ids_list = [x["sentiment_id"] for x in updated_sentiment_df.rdd.collect()]
    
    for sentiment, sentiment_id in zip(sentiments_list,sentiment_ids_list):
        parameters = (sentiment, sentiment_id)
        update_table(sql, parameters)

In [8]:
if __name__ == "__main__": 

    global conn, sc, sqlContext

    database = "Twitter_Database.db"   
    conn = create_connection(database)

    sc = SparkContext()
    sqlContext = SQLContext(sc)

    data_schema = [StructField('sentiment_id', StringType(), False),
               StructField('user_name', StringType(), False),
               StructField('tweet', StringType(), False),
               StructField('longitude', FloatType(), False),
               StructField('latitude', FloatType(), False),
               StructField('sentiment', StringType(), True)
              ]

    select_sql = "select * from sentiment_table where sentiment = ''"

    sentiment_df = return_spark_df(select_sql, data_schema)

    sentiment_udf = udf(return_sentiment, StringType())

    updated_sentiment_df = sentiment_df.withColumn('sentiment', sentiment_udf('tweet'))

    update_sentiment_table(updated_sentiment_df)
    
    conn.close()