In [1]:
import pyspark as spark
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import col,udf,monotonically_increasing_id,unix_timestamp,round,avg
import re
sc = spark.SparkContext()
sql = spark.SQLContext(sc)

In [2]:
df1=pd.read_csv('/home/manoj/ProjectBigData/Data/tweetsfinal.csv',error_bad_lines=False,engine = 'python',header = None) 
df2=pd.read_csv('/home/manoj/ProjectBigData/Data/BitCoinPrice.csv',error_bad_lines=False,engine = 'python',header = None) 
FullDataTw=sql.createDataFrame(df1)
FullDataBtc=sql.createDataFrame(df2) #creating pandas df and then changing it to pyspark df

Skipping line 845142: unexpected end of data


In [3]:
FullDataTw = FullDataTw.dropna() #getting rid of full empty rows
print(FullDataTw.count())
print(FullDataBtc.count())

845141
217


In [4]:
FullDataTw.select(monotonically_increasing_id().alias("rowId"),"*")
FullDataTw = FullDataTw.withColumnRenamed('0', 'DateTime') #setting column names of Twitter dataset
FullDataTw = FullDataTw.withColumnRenamed('1', 'Tweet')
FullDataBtc = FullDataBtc.withColumnRenamed('0', 'DateTime') #setting column names of Bitcoin price dataset
FullDataBtc = FullDataBtc.withColumnRenamed('1', 'Price')
FullDataBtc = FullDataBtc.filter(FullDataBtc.DateTime != 'Date') #to get rid of first row with the header

In [5]:
Tw_samp = FullDataTw #taking sample of 50 rows and working on it

In [6]:
import preprocessor as p #cleaning each tweet using tweet-preprocessor like removing hashtags,urls,emojis....
def function_udf(input_str):
    input_str = re.sub(r'RT', '', input_str)
    p.set_options(p.OPT.URL, p.OPT.EMOJI,p.OPT.MENTION)
    input_str = p.clean(input_str)
    return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", input_str).split())
func_udf = udf(function_udf, StringType())
CleanDF = Tw_samp.withColumn('CleanedTweets', func_udf(Tw_samp['Tweet']))
CleanDF.show(5)

+--------------------+--------------------+--------------------+
|            DateTime|               Tweet|       CleanedTweets|
+--------------------+--------------------+--------------------+
|Thu Nov 09 17:43:...|RT @Forbes: The F...|The Failure of Se...|
|Thu Nov 09 17:43:...|RT @mindstatex: L...|Lots of love from...|
|Thu Nov 09 17:43:...|RT @LevelNetwork:...|Join our telegram...|
|Thu Nov 09 17:43:...|RT @realsheepwolf...|DIGAF FLOAT 16M T...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [7]:
from textblob import TextBlob  #passing cleaned tweets and getting a sentiment score for each tweet
def senti_score_udf(input_str):
    analysis = TextBlob(input_str)
    return analysis.sentiment.polarity
func_udf2 = udf(senti_score_udf, FloatType())
CleanDF = CleanDF.withColumn('Sentiment_score', func_udf2(CleanDF['CleanedTweets']))
CleanDF.show(5)

+--------------------+--------------------+--------------------+---------------+
|            DateTime|               Tweet|       CleanedTweets|Sentiment_score|
+--------------------+--------------------+--------------------+---------------+
|Thu Nov 09 17:43:...|RT @Forbes: The F...|The Failure of Se...|    -0.18888889|
|Thu Nov 09 17:43:...|RT @mindstatex: L...|Lots of love from...|     0.25833333|
|Thu Nov 09 17:43:...|RT @LevelNetwork:...|Join our telegram...|            0.0|
|Thu Nov 09 17:43:...|RT @realsheepwolf...|DIGAF FLOAT 16M T...|          -0.05|
+--------------------+--------------------+--------------------+---------------+
only showing top 5 rows



In [8]:
def Tw_Time_format(stri):  #manipulating and casting the strings(DateTime) of tweets dataframe to timestamps
    dic = {'Nov':'11','Oct':'10'}
    ans = ''
    ans += stri[-4:]+'-'+ dic[stri[4:7]]+'-'+stri[8:19]
    return ans
func_udf3 = udf(Tw_Time_format,StringType())
CleanDF = CleanDF.withColumn('DateTime_c', func_udf3(CleanDF['DateTime']))
CleanDF = CleanDF.withColumn("DateTime_casted",CleanDF['DateTime_c'].cast(TimestampType()))
CleanDF.show(5)

+--------------------+--------------------+--------------------+---------------+-------------------+-------------------+
|            DateTime|               Tweet|       CleanedTweets|Sentiment_score|         DateTime_c|    DateTime_casted|
+--------------------+--------------------+--------------------+---------------+-------------------+-------------------+
|Thu Nov 09 17:43:...|RT @Forbes: The F...|The Failure of Se...|    -0.18888889|2017-11-09 17:43:41|2017-11-09 17:43:41|
|Thu Nov 09 17:43:...|RT @mindstatex: L...|Lots of love from...|     0.25833333|2017-11-09 17:43:40|2017-11-09 17:43:40|
|Thu Nov 09 17:43:...|RT @LevelNetwork:...|Join our telegram...|            0.0|2017-11-09 17:43:39|2017-11-09 17:43:39|
|Thu Nov 09 17:43:...|RT @realsheepwolf...|DIGAF FLOAT 16M T...|          -0.05|2017-11-09 17:43:39|2017-11-09 17:43:39|
+--------------------+--------------------+--------------------+---------------+-------------------+-------------------+
only showing top 5 rows



In [9]:
FinalTw = CleanDF.selectExpr("DateTime_casted as Date_Time", "CleanedTweets as Cleaned_Tweets","Sentiment_score")
FinalTw.show(5) #selecting necessary columns

+-------------------+--------------------+---------------+
|          Date_Time|      Cleaned_Tweets|Sentiment_score|
+-------------------+--------------------+---------------+
|2017-11-09 17:43:41|The Failure of Se...|    -0.18888889|
|2017-11-09 17:43:40|Lots of love from...|     0.25833333|
|2017-11-09 17:43:39|Join our telegram...|            0.0|
|2017-11-09 17:43:39|DIGAF FLOAT 16M T...|          -0.05|
+-------------------+--------------------+---------------+
only showing top 5 rows



In [10]:
FinalTw.printSchema()

root
 |-- Date_Time: timestamp (nullable = true)
 |-- Cleaned_Tweets: string (nullable = true)
 |-- Sentiment_score: float (nullable = true)



In [11]:
from datetime import datetime 
from dateutil import parser
def Btc_Time_format(input_str): #manipulating and casting the strings(DateTime) of BTC dataframe to timestamps
    input_str = re.sub(r'/17','', input_str)
    input_str = '2017-'+ input_str
    input_str = re.sub(r'/', '-', input_str)
    input_str += ':00'
    return input_str[:10]+""+input_str[10:]
func_udf = udf(Btc_Time_format, StringType())
FullDataBtc = FullDataBtc.withColumn('Cleaned_BTC_Time', func_udf(FullDataBtc['DateTime']))

In [12]:
CleandfBtc = FullDataBtc.withColumn("Cleaned_BTC_Time_New",FullDataBtc['Cleaned_BTC_Time'].cast(TimestampType()))
FinalBtc = CleandfBtc.selectExpr("Cleaned_BTC_Time_New as Date_Time", "Price")
FinalBtc = FinalBtc.withColumn("Price",FinalBtc['Price'].cast(DoubleType()))
FinalBtc.show(5)#In this cell, casting to timesstamp, changing col names and casting price type to double

+-------------------+-------+
|          Date_Time|  Price|
+-------------------+-------+
|2017-10-31 00:00:00|6142.46|
|2017-10-31 01:00:00|6139.47|
|2017-10-31 02:00:00| 6128.2|
|2017-10-31 03:00:00|6130.72|
|2017-10-31 04:00:00|6143.92|
+-------------------+-------+
only showing top 5 rows



In [13]:
FinalBtc.printSchema()

root
 |-- Date_Time: timestamp (nullable = true)
 |-- Price: double (nullable = true)



In [14]:
dt_truncated = ((round(unix_timestamp(col('Date_Time')) / 3600) * 3600).cast('timestamp'))
FinalTw = FinalTw.withColumn('dt_truncated', dt_truncated)
FinalTw = FinalTw.selectExpr("dt_truncated as Date_Time","Cleaned_Tweets","Sentiment_score")
UTC = ((unix_timestamp(col('Date_Time'))+ 5*60*60).cast('timestamp'))
FinalTw = FinalTw.withColumn('UTC', UTC)
FinalTw = FinalTw.selectExpr("UTC as Date_Time","Cleaned_Tweets","Sentiment_score")
FinalTw.show(10)

+-------------------+--------------------+---------------+
|          Date_Time|      Cleaned_Tweets|Sentiment_score|
+-------------------+--------------------+---------------+
|2017-11-09 23:00:00|The Failure of Se...|    -0.18888889|
|2017-11-09 23:00:00|Lots of love from...|     0.25833333|
|2017-11-09 23:00:00|Join our telegram...|            0.0|
|2017-11-09 23:00:00|DIGAF FLOAT 16M T...|          -0.05|
|2017-11-09 23:00:00|My luggage likes ...|            0.0|
|2017-11-09 23:00:00|As Bitcoin become...|           0.55|
|2017-11-09 23:00:00|A crucial feature...|            0.1|
|2017-11-09 23:00:00|As Bitcoin become...|           0.55|
|2017-11-09 23:00:00|As Bitcoin become...|           0.55|
+-------------------+--------------------+---------------+
only showing top 10 rows



In [36]:
FinalTw_avg = FinalTw.select("Date_Time","Sentiment_score").groupBy("Date_Time").agg(avg(col("Sentiment_score")))
# FinalTw_avg.show()
# FinalTw_avg.printSchema()
FinalTw_avg = FinalTw_avg.selectExpr("Date_Time as date", "`avg(Sentiment_score)` as score")


+-------------------+--------------------+
|          Date_Time|avg(Sentiment_score)|
+-------------------+--------------------+
|2017-11-09 09:00:00| 0.09240741625240456|
|2017-11-04 14:00:00| 0.11845594881100904|
|2017-11-04 13:00:00|  0.1172484553339806|
|2017-11-02 12:00:00| 0.06527885267149504|
|2017-11-07 23:00:00| 0.10283782658623176|
|2017-11-04 09:00:00| 0.09418897322089734|
|2017-11-02 02:00:00| 0.09436981377657502|
|2017-11-03 03:00:00| 0.09344412833299827|
|2017-11-04 04:00:00| 0.10142201882228671|
|2017-11-05 00:00:00| 0.08632902932326081|
|2017-10-31 17:00:00| 0.09206832917391784|
|2017-11-06 16:00:00| 0.09368665402965974|
|2017-11-03 05:00:00| 0.07860593016294816|
|2017-11-02 21:00:00| 0.08349927669926008|
|2017-11-06 19:00:00| 0.10290178138846101|
|2017-11-02 16:00:00| 0.06061797001144715|
|2017-11-01 05:00:00| 0.11368575691864594|
|2017-10-31 16:00:00| 0.09301943334583278|
|2017-10-31 12:00:00| 0.10549754665746006|
|2017-11-04 19:00:00| 0.09140639081231433|
+----------

In [37]:
FinalTw_avg.printSchema()

# FinalTw.registerTempTable("temp")
# FinalTw_avg = sql.sql("SELECT Date_Time As DateTime, AVG(Sentiment_score) As Sentiment_score FROM temp GROUP BY Date_Time")
# FinalTw_avg.show(5)


root
 |-- date: timestamp (nullable = true)
 |-- score: double (nullable = true)



In [17]:
# from pyspark.sql import functions as f
# df_with_text = FinalTw.groupby("Date_Time").agg(f.concat_ws(" ", f.collect_list(FinalTw.Cleaned_Tweets)))
# df_with_text.show(5)

+-------------------+------------------------------------------+
|          Date_Time|concat_ws( , collect_list(Cleaned_Tweets))|
+-------------------+------------------------------------------+
|2017-11-09 09:00:00|                      Segwit2X died Thi...|
|2017-11-04 14:00:00|                      NEW Roger Ver CEO...|
|2017-11-04 13:00:00|                      Bitcoin prices me...|
|2017-11-02 12:00:00|                      Nice Ethersport a...|
|2017-11-07 23:00:00|                      CME Unveils Bitco...|
+-------------------+------------------------------------------+
only showing top 5 rows



In [38]:
FinalTw_avg.count()

236

In [40]:
# from pyspark.sql.functions import *
# df_sort = FinalTw_avg.sort(asc("Date_Time"))
# df_sort.show()

# temp1 = FinalTw_avg.alias('temp1')
# temp2 = FinalBtc.alias('temp2')
# FinalTwTemp = FinalTw_avg.limit(10)

FinalTw_avg = FinalTw_avg.join(FinalBtc, FinalTw_avg.date == FinalBtc.Date_Time)
FinalTw_avg = FinalTw_avg.selectExpr('Date_Time', 'score as Sentiment_score', 'Price')
FinalTw_avg.show()

+-------------------+-------------------+-------------------+-------+
|               date|              score|          Date_Time|  Price|
+-------------------+-------------------+-------------------+-------+
|2017-11-02 12:00:00|0.06527885267149504|2017-11-02 12:00:00|6975.91|
|2017-11-04 13:00:00| 0.1172484553339806|2017-11-04 13:00:00|7313.96|
|2017-11-04 14:00:00|0.11845594881100904|2017-11-04 14:00:00|7300.69|
|2017-11-02 02:00:00|0.09436981377657502|2017-11-02 02:00:00|6814.19|
|2017-11-04 09:00:00|0.09418897322089734|2017-11-04 09:00:00|7143.41|
|2017-11-07 23:00:00|0.10283782658623176|2017-11-07 23:00:00|7112.74|
|2017-11-03 03:00:00|0.09344412833299827|2017-11-03 03:00:00|7195.12|
|2017-11-04 04:00:00|0.10142201882228671|2017-11-04 04:00:00|7255.67|
|2017-10-31 17:00:00|0.09206832917391784|2017-10-31 17:00:00|6361.79|
|2017-11-05 00:00:00|0.08632902932326081|2017-11-05 00:00:00|7343.74|
|2017-11-03 05:00:00|0.07860593016294816|2017-11-03 05:00:00|7233.02|
|2017-11-06 16:00:00

In [20]:
# FinalTw_avg.registerTempTable("avgs")
# FinalBtc.registerTempTable("prices")
# results = sql.sql("SELECT * FROM avgs JOIN prices ON avgs.DateTime = prices.Date_Time order by avgs.DateTime")
# results.show(5)

AnalysisException: "cannot resolve '`avgs.DateTime`' given input columns: [Date_Time, avg(Sentiment_score), Date_Time, Price]; line 1 pos 34;\n'Sort ['avgs.DateTime ASC NULLS FIRST], true\n+- 'Project [*]\n   +- 'Join Inner, ('avgs.DateTime = Date_Time#131)\n      :- SubqueryAlias avgs\n      :  +- Aggregate [Date_Time#161], [Date_Time#161, avg(cast(Sentiment_score#65 as double)) AS avg(Sentiment_score)#181]\n      :     +- Project [Date_Time#161, Sentiment_score#65]\n      :        +- Project [UTC#155 AS Date_Time#161, Cleaned_Tweets#107, Sentiment_score#65]\n      :           +- Project [Date_Time#150, Cleaned_Tweets#107, Sentiment_score#65, cast((unix_timestamp(Date_Time#150, yyyy-MM-dd HH:mm:ss, Some(America/New_York)) + cast(18000 as bigint)) as timestamp) AS UTC#155]\n      :              +- Project [dt_truncated#144 AS Date_Time#150, Cleaned_Tweets#107, Sentiment_score#65]\n      :                 +- Project [Date_Time#106, Cleaned_Tweets#107, Sentiment_score#65, cast((round((cast(unix_timestamp(Date_Time#106, yyyy-MM-dd HH:mm:ss, Some(America/New_York)) as double) / cast(3600 as double)), 0) * cast(3600 as double)) as timestamp) AS dt_truncated#144]\n      :                    +- Project [DateTime_casted#86 AS Date_Time#106, CleanedTweets#54 AS Cleaned_Tweets#107, Sentiment_score#65]\n      :                       +- Project [DateTime#37, Tweet#41, CleanedTweets#54, Sentiment_score#65, DateTime_c#79, cast(DateTime_c#79 as timestamp) AS DateTime_casted#86]\n      :                          +- Project [DateTime#37, Tweet#41, CleanedTweets#54, Sentiment_score#65, Tw_Time_format(DateTime#37) AS DateTime_c#79]\n      :                             +- Project [DateTime#37, Tweet#41, CleanedTweets#54, senti_score_udf(CleanedTweets#54) AS Sentiment_score#65]\n      :                                +- Project [DateTime#37, Tweet#41, function_udf(Tweet#41) AS CleanedTweets#54]\n      :                                   +- Project [DateTime#37, 1#1 AS Tweet#41]\n      :                                      +- Project [0#0 AS DateTime#37, 1#1]\n      :                                         +- Filter AtLeastNNulls(n, 0#0,1#1)\n      :                                            +- LogicalRDD [0#0, 1#1]\n      +- SubqueryAlias prices\n         +- Project [Date_Time#131, cast(Price#49 as double) AS Price#135]\n            +- Project [Cleaned_BTC_Time_New#125 AS Date_Time#131, Price#49]\n               +- Project [DateTime#45, Price#49, Cleaned_BTC_Time#120, cast(Cleaned_BTC_Time#120 as timestamp) AS Cleaned_BTC_Time_New#125]\n                  +- Project [DateTime#45, Price#49, Btc_Time_format(DateTime#45) AS Cleaned_BTC_Time#120]\n                     +- Filter NOT (DateTime#45 = Date)\n                        +- Project [DateTime#45, 1#6 AS Price#49]\n                           +- Project [0#5 AS DateTime#45, 1#6]\n                              +- LogicalRDD [0#5, 1#6]\n"

In [None]:
# results.count()

In [None]:
FinalTw_avg.repartition(1).write.csv("One.csv") #this will write df to single csv instead of writing diff csv acc to partitions 