In [1]:
import pandas as pd
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
path = 'C://Users/beile.yaaqob.aisin/Downloads/The_Reddit_Ethereum_Dataset/'

#initiating spark session

spark = SparkSession.builder \
    .master('local') \
    .appName('Flight Delay') \
    .getOrCreate()

In [3]:
#setting up the schema
productSchema_reddit = StructType().add("type", "string") \
                        .add("id", "string") \
                        .add("subreddit.id", "string") \
                        .add("subreddit.name", "string") \
                        .add("subreddit.nsfw", "string") \
                        .add("created_utc", "string") \
                        .add("permalink", "string") \
                        .add("body", "string") \
                        .add("sentiment", "float") \
                        .add("score", "long")

In [4]:
# load reddit data, and the price action data

df_reddit = spark.read.format("csv").option("header","true").option("multiline","true").option("quote", "\"").option("escape", "\"").schema(productSchema_reddit).load(path+"the-reddit-ethereum-dataset-comments.csv")
df_eth = spark.read.format("csv").option("header","true").load(path+"eth_price.csv")
df_btc = spark.read.format("csv").option("header","true").load(path+"btc_price.csv")

In [5]:
#length of the reddit dataset
df_reddit.count()

1132331

In [6]:
df_reddit = df_reddit[['created_utc','body']]
df_reddit = df_reddit.dropna()
df_reddit = df_reddit[df_reddit['created_utc'] >= '2016-05-15'] #to match the btc and eth price datasets


df_eth = df_eth[['date','Adj Close']]
df_eth = df_eth.select(col("date").alias("eth_date"), col("Adj Close").alias("eth_price"))

df_btc = df_btc[['date','Adj Close']]
df_btc = df_btc.select(col("date").alias("btc_date"), col("Adj Close").alias("btc_price"))

In [7]:
df_reddit.show()

+-------------------+--------------------+
|        created_utc|                body|
+-------------------+--------------------+
|2021-11-01 23:59:52|Surely you must b...|
|2021-11-01 23:59:14|For real man, the...|
|2021-11-01 23:58:58|What happens with...|
|2021-11-01 23:58:40|Not exactly.  Eth...|
|2021-11-01 23:57:16|Is the babydoge o...|
|2021-11-01 23:55:31|Loopring is not m...|
|2021-11-01 23:54:58|Did you listen to...|
|2021-11-01 23:53:14|            Ethereum|
|2021-11-01 23:52:34|It was a mistake ...|
|2021-11-01 23:52:08|            Ethereum|
|2021-11-01 23:51:03|Why does Curecoin...|
|2021-11-01 23:49:47|I'd love for some...|
|2021-11-01 23:49:36|Nowhere yet, they...|
|2021-11-01 23:48:22|So you’re saying ...|
|2021-11-01 23:47:27|I’m not sure I un...|
|2021-11-01 23:46:58|It's pure surviva...|
|2021-11-01 23:46:03|There are limits ...|
|2021-11-01 23:45:46|   Ethereum and $GME|
|2021-11-01 23:45:25|I think that was ...|
|2021-11-01 23:45:00|          Ethereum..|
+----------

In [8]:
#grouping rows by dates

df_reddit = df_reddit.withColumn("date", to_date(col("created_utc")))

w = Window.partitionBy('date').orderBy('created_utc')

df_reddit = df_reddit.withColumn("body",collect_list("body").over(w)).groupBy("date").agg(max("body").alias("body"))#return texts in list format
df_reddit = df_reddit.withColumn("body", concat_ws(" ", "body"))#swapping text data from list to strings

df_reddit = df_reddit[['date','body']]

In [9]:
# post grouping reddit data
df_reddit.show()

+----------+--------------------+
|      date|                body|
+----------+--------------------+
|2016-05-23|Why does ethereum...|
|2016-05-26|The 1.1 revision ...|
|2016-05-27|https://www.reddi...|
|2016-05-31|I'm still extreme...|
|2016-06-02|Not really a curr...|
|2016-06-16|Here was the scri...|
|2016-06-17|Ethereum can be u...|
|2016-07-02| \n\n \n\n **Auth...|
|2016-07-03|&gt;Ethereum was ...|
|2016-07-17|Isn't the well de...|
|2016-07-19|&gt; The communit...|
|2016-07-21|If you want to be...|
|2016-07-26|It's good to see ...|
|2016-08-01|Well it isn't "li...|
|2016-08-05|I've read what yo...|
|2016-08-06|Well apparently M...|
|2016-08-15|I've had too many...|
|2016-08-16|Progress with blo...|
|2016-08-23|Not by me. I thou...|
|2016-08-26|Maybe we can copy...|
+----------+--------------------+
only showing top 20 rows



In [12]:
#eth daily prices
df_eth.show()

+----------+------------------+
|  eth_date|         eth_price|
+----------+------------------+
|2016-05-15| 9.962349891662598|
|2016-05-16|11.171299934387207|
|2016-05-17|12.198800086975098|
|2016-05-18|13.558600425720215|
|2016-05-19|14.769700050354004|
|2016-05-20|13.635600090026855|
|2016-05-21|14.015000343322754|
|2016-05-22|14.286100387573242|
|2016-05-23| 13.46150016784668|
|2016-05-24|  12.7322998046875|
|2016-05-25|12.526000022888184|
|2016-05-26|12.430399894714355|
|2016-05-27| 11.29580020904541|
|2016-05-28|11.892900466918945|
|2016-05-29|  12.3503999710083|
|2016-05-30|12.725299835205078|
|2016-05-31|14.077300071716309|
|2016-06-01|14.001500129699707|
|2016-06-02| 13.73799991607666|
|2016-06-03|13.846699714660645|
+----------+------------------+
only showing top 20 rows



In [13]:
#btc daily prices
df_btc.show()

+----------+-----------------+
|  btc_date|        btc_price|
+----------+-----------------+
|2016-05-15|457.5679931640625|
|2016-05-16|454.1629943847656|
|2016-05-17|453.7829895019531|
|2016-05-18|454.6189880371094|
|2016-05-19|438.7149963378906|
|2016-05-20|442.6759948730469|
|2016-05-21|  443.18798828125|
|2016-05-22| 439.322998046875|
|2016-05-23|444.1549987792969|
|2016-05-24|445.9809875488281|
|2016-05-25|449.5989990234375|
|2016-05-26|453.3840026855469|
|2016-05-27|473.4639892578125|
|2016-05-28|530.0399780273438|
|2016-05-29|526.2329711914062|
|2016-05-30| 533.864013671875|
|2016-05-31| 531.385986328125|
|2016-06-01|536.9199829101562|
|2016-06-02|537.9719848632812|
|2016-06-03|569.1939697265625|
+----------+-----------------+
only showing top 20 rows



In [14]:
df = df_reddit.join(df_btc,df_reddit.date == df_btc.btc_date,"left_outer")
df = df.join(df_eth,df.date == df_eth.eth_date,"left_outer")
df = df[['date','body','btc_price','eth_price']]

In [15]:
df.show() #final form in data_manipulation

+----------+--------------------+-----------------+------------------+
|      date|                body|        btc_price|         eth_price|
+----------+--------------------+-----------------+------------------+
|2016-05-23|Why does ethereum...|444.1549987792969| 13.46150016784668|
|2016-05-26|The 1.1 revision ...|453.3840026855469|12.430399894714355|
|2016-05-27|https://www.reddi...|473.4639892578125| 11.29580020904541|
|2016-05-31|I'm still extreme...| 531.385986328125|14.077300071716309|
|2016-06-02|Not really a curr...|537.9719848632812| 13.73799991607666|
|2016-06-16|Here was the scri...|766.3079833984375|20.588600158691406|
|2016-06-17|Ethereum can be u...|748.9089965820312|15.376799583435059|
|2016-07-02| \n\n \n\n **Auth...|703.7020263671875|12.128399848937988|
|2016-07-03|&gt;Ethereum was ...|658.6640014648438|11.720199584960938|
|2016-07-17|Isn't the well de...|    679.458984375|11.158699989318848|
|2016-07-19|&gt; The communit...| 672.864013671875|11.619099617004395|
|2016-

In [17]:
pandas_df = df.toPandas() #converting to pandas

In [18]:
#pyspark's .write().format('csv') malfunctions. have to use pandas
pandas_df.to_csv(path+'data.csv',index=False)