In [1]:
# setup environment
# import findspark
# findspark.init()

# from pyspark import SparkContext
# sc = SparkContext("local", "first app")
# sc = SparkContext.getOrCreate()

# import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

# from pyspark.sql import SQLContext
# sql = SQLContext(sc)

# data analysis
from pyspark.sql.functions import to_timestamp,col,lit,udf,monotonically_increasing_id,unix_timestamp,round,avg,split,size,isnan,when,count
from pyspark.sql.types import StringType, FloatType, ArrayType
from functools import reduce

# visualization
from IPython import display
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib import cm

#sentiment analysis
from textblob import TextBlob
import re
import nltk
from nltk.corpus import stopwords
stop = stopwords.words('english')
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

ModuleNotFoundError: No module named 'findspark'

## Read CSV

In [2]:
df = spark.read.csv('data/btc_tweets_1418244_unique.csv',header=True) 
#.withColumn('id',to_timestamp(col('id'),'MM/dd/yyyy hh:mm:ss a'))
df.show(5)

+----+----------------+--------+--------------------+--------------------+------------------+
|date|              id|    text|           permalink|       permalink_uid|          username|
+----+----------------+--------+--------------------+--------------------+------------------+
|   0|12/25/2017 11:00|9.45E+17|#Bitcoin Is Alrea...|https://twitter.c...|945323495235481600|
|   1|12/25/2017 11:00|9.45E+17|bitcoin $ BTCUSD ...|https://twitter.c...|945323493079617536|
|   2|12/25/2017 11:00|9.45E+17|Tire suas dúvidas...|https://twitter.c...|945323491817082881|
|   4|12/25/2017 11:00|9.45E+17|Five Reasons #Bit...|https://twitter.c...|945323481411047426|
|   5|12/25/2017 11:00|9.45E+17|#الفوركس #форекс ...|https://twitter.c...|945323480744112129|
+----+----------------+--------+--------------------+--------------------+------------------+
only showing top 5 rows



In [3]:
oldColumns = df.schema.names
newColumns = ['id_row_unique', 'date', 'id_user_short', 'tweet', 'permalink', 'id_user_long']

df = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), 
            range(len(oldColumns)), df)
df.printSchema()
df.show()

root
 |-- id_row_unique: string (nullable = true)
 |-- date: string (nullable = true)
 |-- id_user_short: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- id_user_long: string (nullable = true)

+-------------+----------------+-------------+--------------------+--------------------+------------------+
|id_row_unique|            date|id_user_short|               tweet|           permalink|      id_user_long|
+-------------+----------------+-------------+--------------------+--------------------+------------------+
|            0|12/25/2017 11:00|     9.45E+17|#Bitcoin Is Alrea...|https://twitter.c...|945323495235481600|
|            1|12/25/2017 11:00|     9.45E+17|bitcoin $ BTCUSD ...|https://twitter.c...|945323493079617536|
|            2|12/25/2017 11:00|     9.45E+17|Tire suas dúvidas...|https://twitter.c...|945323491817082881|
|            4|12/25/2017 11:00|     9.45E+17|Five Reasons #Bit...|https://twitter.c...|9453234814

In [4]:
df = df.drop('id_row_unique', 'id_user_short', 'permalink')

In [9]:
df = df.withColumnRenamed('id_user_long', 'id_user')
df.show()

+----------------+--------------------+------------------+---------+
|            date|               tweet|           id_user|wordCount|
+----------------+--------------------+------------------+---------+
|12/25/2017 11:00|#Bitcoin Is Alrea...|945323495235481600|       20|
|12/25/2017 11:00|bitcoin $ BTCUSD ...|945323493079617536|        8|
|12/25/2017 11:00|Tire suas dúvidas...|945323491817082881|        7|
|12/25/2017 11:00|Five Reasons #Bit...|945323481411047426|       14|
|12/25/2017 11:00|#الفوركس #форекс ...|945323480744112129|       36|
|12/25/2017 11:00|https:// box.netl...|945323478852521984|       14|
|12/25/2017 11:00|Opera 50 Web Brow...|945323470010826753|       20|
|12/25/2017 11:00|Bitcoin Price Dro...|945323469440503808|       21|
|12/25/2017 11:00|"RT AlertTrade ""...|              null|        8|
|12/25/2017 11:00|http:// ow.ly/xqM...|945323467909541889|       16|
|12/25/2017 11:00|5 things that sta...|945323466273812480|       34|
|12/25/2017 11:00|"RT AlertTrade "

+-------------+----------------+--------------------+------------------+
|id_row_unique|            date|               tweet|      id_user_long|
+-------------+----------------+--------------------+------------------+
|            0|12/25/2017 11:00|#Bitcoin Is Alrea...|945323495235481600|
|            1|12/25/2017 11:00|bitcoin $ BTCUSD ...|945323493079617536|
|            2|12/25/2017 11:00|Tire suas dúvidas...|945323491817082881|
|            4|12/25/2017 11:00|Five Reasons #Bit...|945323481411047426|
|            5|12/25/2017 11:00|#الفوركس #форекс ...|945323480744112129|
|            6|12/25/2017 11:00|https:// box.netl...|945323478852521984|
|            7|12/25/2017 11:00|Opera 50 Web Brow...|945323470010826753|
|            8|12/25/2017 11:00|Bitcoin Price Dro...|945323469440503808|
|           10|12/25/2017 11:00|"RT AlertTrade ""...|              null|
|           11|12/25/2017 11:00|http:// ow.ly/xqM...|945323467909541889|
|           12|12/25/2017 11:00|5 things that sta..

## Check for Nulls

In [10]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+-----+-------+---------+
|date|tweet|id_user|wordCount|
+----+-----+-------+---------+
|   1|    0|   4598|        0|
+----+-----+-------+---------+



In [11]:
df = df.filter(df.tweet.isNotNull())

In [12]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+-----+-------+---------+
|date|tweet|id_user|wordCount|
+----+-----+-------+---------+
|   1|    0|   4598|        0|
+----+-----+-------+---------+



## Word Count

In [8]:
df = df.withColumn('wordCount', size(split(col('tweet'), ' ')))
df.show(3)

+----------------+--------------------+------------------+---------+
|            date|               tweet|      id_user_long|wordCount|
+----------------+--------------------+------------------+---------+
|12/25/2017 11:00|#Bitcoin Is Alrea...|945323495235481600|       20|
|12/25/2017 11:00|bitcoin $ BTCUSD ...|945323493079617536|        8|
|12/25/2017 11:00|Tire suas dúvidas...|945323491817082881|        7|
+----------------+--------------------+------------------+---------+
only showing top 3 rows



## Clean tweets

In [13]:
def clean_tweet(tweet):
    '''
        Utility function to clean the text in a tweet by removing: stop words, links, special characters using regex.
    Args:
        tweet: DataFrame column 'text'
    Returns:
        DataFrame
    '''
    tweet = str(tweet)
    tweet = tweet.lower()
    tweet = [word for word in tweet.split() if word not in stop]
    return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", str(tweet)).split())

func_udf =  udf(clean_tweet, StringType())
df = df.withColumn('cleaned_tweets', func_udf(df['tweet']))
df.show(3)

+----------------+--------------------+------------------+---------+--------------------+
|            date|               tweet|           id_user|wordCount|      cleaned_tweets|
+----------------+--------------------+------------------+---------+--------------------+
|12/25/2017 11:00|#Bitcoin Is Alrea...|945323495235481600|       20|bitcoin already b...|
|12/25/2017 11:00|bitcoin $ BTCUSD ...|945323493079617536|        8|bitcoin btcusd tr...|
|12/25/2017 11:00|Tire suas dúvidas...|945323491817082881|        7|tire suas d vidas...|
+----------------+--------------------+------------------+---------+--------------------+
only showing top 3 rows



In [15]:
df = df.withColumn('word_count_tweet_cleaned', size(split(col('cleaned_tweets'), ' ')))
df.show(3)

+----------------+--------------------+------------------+---------+--------------------+------------------------+
|            date|               tweet|           id_user|wordCount|      cleaned_tweets|word_count_tweet_cleaned|
+----------------+--------------------+------------------+---------+--------------------+------------------------+
|12/25/2017 11:00|#Bitcoin Is Alrea...|945323495235481600|       20|bitcoin already b...|                      20|
|12/25/2017 11:00|bitcoin $ BTCUSD ...|945323493079617536|        8|bitcoin btcusd tr...|                      11|
|12/25/2017 11:00|Tire suas dúvidas...|945323491817082881|        7|tire suas d vidas...|                       9|
+----------------+--------------------+------------------+---------+--------------------+------------------------+
only showing top 3 rows



In [19]:
df = df.drop('wordCount', 'word_count_tweet_cleaned')


## Sentiment analysis:  Vader

In [20]:
analyser = SentimentIntensityAnalyzer()

def analyze_sentiment_v(sentence):
    snt = analyser.polarity_scores(sentence)
    return ([snt['neg'], snt['neu'], snt['pos'], snt['compound']])
    #return (snt['compound'])

func_udf2 = udf(analyze_sentiment_v, ArrayType(FloatType()))
df = df.withColumn('sentiment_vader', func_udf2(df['cleaned_tweets'])[0])
df.show(3)

+----------------+--------------------+------------------+--------------------+---------------+
|            date|               tweet|           id_user|      cleaned_tweets|sentiment_vader|
+----------------+--------------------+------------------+--------------------+---------------+
|12/25/2017 11:00|#Bitcoin Is Alrea...|945323495235481600|bitcoin already b...|           0.13|
|12/25/2017 11:00|bitcoin $ BTCUSD ...|945323493079617536|bitcoin btcusd tr...|            0.0|
|12/25/2017 11:00|Tire suas dúvidas...|945323491817082881|tire suas d vidas...|            0.0|
+----------------+--------------------+------------------+--------------------+---------------+
only showing top 3 rows



In [21]:
df.groupBy('date').count().orderBy('count', ascending=False).show(10)

+----------------+-----+
|            date|count|
+----------------+-----+
|12/22/2017 10:15|  342|
|12/22/2017 10:00|  305|
|12/22/2017 12:00|  279|
|12/22/2017 11:15|  276|
| 12/22/2017 9:30|  275|
| 12/22/2017 9:15|  255|
|12/22/2017 11:00|  253|
|12/22/2017 13:00|  252|
| 12/22/2017 9:00|  252|
|12/22/2017 10:30|  248|
+----------------+-----+
only showing top 10 rows



### archived

In [None]:
# https://datascience.stackexchange.com/questions/13123/import-csv-file-contents-into-pyspark-dataframes
# ps_df = sql.read.format("com.databricks.spark.csv")
#                 .options(header="true", inferschema='true')
#                 .load("data/output_2018-11-11-to-2014-12-31-1000-perdate.csv")

In [None]:
## Sentiment analysis: Text Blob 
#https://github.com/harishpuvvada/BitCoin-Value-Predictor/blob/master/Data_PreProcessing.ipynb
def analyze_sentiment_tb(tweet):
    '''
        Classify the polarity of a tweet using textblob.
    Args:
        tweet:
    Returns:
    
    '''
    analysis = TextBlob(tweet)
    polarity = analysis.sentiment.polarity
    return polarity
    
func_udf2 = udf(analyze_sentiment_tb, ArrayType(FloatType()))
df = df.withColumn('sentiment_txtblob', func_udf2(df['cleaned_tweets'])[0])
df.show(3)

In [None]:
#ps_df = ps_df.selectExpr('date','permalink','cleaned_tweets','sentiment_vader','wordCount')