In [8]:
import numpy as np
import tweepy
import config
import pyspark
import emoji
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_unixtime, date_format, col, avg, sum, to_timestamp
from pyspark.sql.functions import concat, lit
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.types import StringType
import findspark
findspark.init()
import yfinance as yf
import pandas as pd
from datetime import datetime, date, time, timedelta
import matplotlib.pyplot as plt
%matplotlib inline
plt.style.use('seaborn-darkgrid')
from textblob import TextBlob
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pickle
from sklearn.feature_extraction.text import CountVectorizer
v = CountVectorizer(stop_words='english')
import csv
import timeit
import random


# Create a SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()

### part 1: get cryptocurrency

In [2]:
# Fetch BTC trade raw data
today = date.today()
BTC_raw = yf.download('DOGE-USD', start=today-timedelta(days=7), end=today) # BTC, ETH, XRP, ADA, DOGE
# BTC_raw.columns = ['date', 'open_price', 'high', 'low', 'close_price', 'adj close', 'volume']

# reset index and move Date column to the first column
BTC_raw = BTC_raw.reset_index().rename(columns={'Date': 'date'})
BTC_raw = BTC_raw.loc[:, ['date', 'Close']]
BTC_raw.head()

l = len(BTC_raw["date"])
a = []
for n in range(l):
    data = [str(BTC_raw["date"][n])[0:10], float(BTC_raw["Close"][n])]
    a.append(data)
    
print(a)

[*********************100%***********************]  1 of 1 completed
[['2023-04-24', 0.3839159905910492], ['2023-04-25', 0.3949190080165863], ['2023-04-26', 0.4014979898929596], ['2023-04-27', 0.40984299778938293], ['2023-04-28', 0.4049319922924042], ['2023-04-29', 0.40278398990631104], ['2023-04-30', 0.39735400676727295]]


In [3]:
day_price_data = spark.createDataFrame(a,["date", "close price"])

day_price_data.show()

+----------+-------------------+
|      date|        close price|
+----------+-------------------+
|2023-04-24| 0.3839159905910492|
|2023-04-25| 0.3949190080165863|
|2023-04-26| 0.4014979898929596|
|2023-04-27|0.40984299778938293|
|2023-04-28| 0.4049319922924042|
|2023-04-29|0.40278398990631104|
|2023-04-30|0.39735400676727295|
+----------+-------------------+



### part 2: get twitter

In [9]:
# Set up Twitter API credentials
consumer_key = config.API_KEY
consumer_secret= config.API_SECRET
access_token= config.ACCESS_TOKEN
access_token_secret = config.ACCESS_TOKEN_SECRET

# Authenticate with Twitter API
auth = tweepy.OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_token_secret)
api = tweepy.API(auth)


# Query Twitter API for tweets
data = tweepy.Cursor(api.search_tweets, q="Dogecoin OR DOGE", until="2023-04-30 00:00:00", lang="en", count=100).items(8000)

# Create an empty list to store the processed data
processed_data_list = []

print("fetching twitter data...")

while True:
    try:
        tweet = data.next()
            
        user_date = tweet.created_at
        user_follower = int(tweet.user.followers_count)
        user_id = int(tweet.id)
        user_text = tweet.text
        final_data = [user_date, user_text, user_follower, user_id]   
            
        processed_data_list.append(final_data)
      
    except StopIteration:
        break

print("total twitter data:", len(processed_data_list))

# Create a DataFrame from the processed data list
twitter_data = spark.createDataFrame(processed_data_list, ["date", "text", "follower", "id"])

twitter_data.show()

fetching twitter data...
total twitter data: 7209
+-------------------+--------------------+--------+-------------------+
|               date|                text|follower|                 id|
+-------------------+--------------------+--------+-------------------+
|2023-04-29 19:57:01|RT @TopCoinLinks:...|     338|1652462019734392834|
|2023-04-29 19:55:07|RT @cryptozup: Wh...|    1750|1652461542011535362|
|2023-04-29 19:53:46|RT @daddymonkey_e...|       0|1652461201174011906|
|2023-04-29 19:52:34|RT @SatoshiOwl: ?...|     150|1652460898957643776|
|2023-04-29 19:50:59|RT @daddymonkey_e...|       0|1652460501731876864|
|2023-04-29 19:50:32|RT @daddymonkey_e...|       1|1652460388200460289|
|2023-04-29 19:47:49|RT @daddymonkey_e...|       0|1652459703383867392|
|2023-04-29 19:44:28|1: Bitcoin price ...|      64|1652458860857241600|
|2023-04-29 19:38:39|1: Bitcoin price ...|     964|1652457397716230145|
|2023-04-29 19:27:53|💵 US$ 0.00000000...|    3337|1652454688183164930|
|2023-04-29 19:

In [10]:
# sentiment analysis model 

analyzer=SentimentIntensityAnalyzer()
def get_vader_score(sentence): 
    compound=analyzer.polarity_scores(sentence)['compound']
    if compound > 0.05: 
        return ['positive',compound]
    elif (compound >= -0.05) and (compound <=0.05): 
        return ['neutral',compound]
    else: 
        return ['negative',compound]

def vader_sentiment(X):
  vader_result=get_vader_score(X)[0]#X.apply(lambda x: get_vader_score(x)[0])
  vader_score = get_vader_score(X)[1]#X.apply(lambda x: get_vader_score(x)[1])
  #vader_score_total = np.mean(vader_score)
  return vader_score

#### data cleaning

In [12]:
# start PySpark transform #####################################################################

# load shedding
cleaned_twitter_data = twitter_data.filter(col("follower") >= 10)

# operator reordering, first use filter bc its selectivity is the highest

cleaned_twitter_data = cleaned_twitter_data.filter(col("id") % 10 != random.randint(0, 9))

# remove emoji
def remove_emoji(col):
  result = emoji.replace_emoji(col, replace="")
  return result

clean_udf = F.UserDefinedFunction(remove_emoji, T.StringType())
tweets_df_cleaned = twitter_data.withColumn("text", clean_udf("text"))


# remove mention 
cleaned_twitter_data = twitter_data.withColumn("text", regexp_replace('text', "@\s*[A-Za-z0-9_]+", ''))

cleaned_twitter_data = cleaned_twitter_data.withColumn("text", regexp_replace("text", "#\s*[A-Za-z0-9_]+", ""))

# remove retweet
cleaned_twitter_data = cleaned_twitter_data.withColumn("text", regexp_replace('text', "RT : ", ''))

# remove links
cleaned_twitter_data = cleaned_twitter_data.withColumn('text', regexp_replace('text', r"http\S+", ''))

cleaned_twitter_data = cleaned_twitter_data.withColumn('text', regexp_replace('text', r"www.\S+", ''))

# remove next line
cleaned_twitter_data = cleaned_twitter_data.withColumn("text", regexp_replace("text", r"\n", ""))

cleaned_twitter_data = cleaned_twitter_data.withColumn('text', regexp_replace('text', '\s+', ' '))

cleaned_twitter_data = cleaned_twitter_data.select(col('date'), col('text'))

# put sentiment analysis here

# sentiment model 

vader_sentiment_udf = F.UserDefinedFunction(vader_sentiment, T.FloatType())
vader_sentiment_twitter_data = cleaned_twitter_data.withColumn("text", vader_sentiment_udf("text"))

# calculate average

vader_sentiment_twitter_data = vader_sentiment_twitter_data.groupBy(date_format(col("date"), "yyyy-MM-dd").alias("date"))
vader_sentiment_twitter_data = vader_sentiment_twitter_data.agg(avg("text").alias("avg_value"))

In [13]:
vader_sentiment_twitter_data.show()

+----------+--------------------+
|      date|           avg_value|
+----------+--------------------+
|2023-04-29| 0.15902398995958367|
|2023-04-28| 0.03677221499046253|
|2023-04-27|0.002421561549821...|
|2023-04-26| 0.14959816590738975|
|2023-04-25| 0.25151037540430227|
|2023-04-24| 0.30741864358273785|
|2023-04-23|0.052318421157066065|
|2023-04-22| 0.12403524156917407|
|2023-04-21|  0.4070333242416382|
+----------+--------------------+



In [None]:
# save csv
final = vader_sentiment_twitter_data.collect()
with open("./DOGE_vader.csv", 'w', newline='') as f:
    # create the csv writer
    writer = csv.writer(f)
    
    for row in final:
        
        # write a row to the csv file
        writer.writerow(row)

### part 3: integrate two data

In [None]:
vader_sentiment_twitter_data.show()

In [None]:
merged_df = vader_sentiment_twitter_data.join(day_price_data,"date","left")

merged_df = merged_df.dropna()

merged_df.show()