In [1]:
# pip install emoji

In [2]:
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.types import *
from pyspark.sql import functions 
from datetime import datetime
from pyspark.sql.functions import to_date
from operator import add
import re
import emoji
from pyspark import SparkContext
import pyspark.sql.functions as f
from pyspark import SparkContext
import string

In [3]:
customSchema = StructType([StructField("id", FloatType(), True), 
                          StructField("date", StringType(), True),
                          StructField("ticker", StringType(), True),
                          StructField("tweet", StringType(), True),
                          ])


In [4]:
df = spark.read.load('hdfs://localhost:9000/CA2/stocktweet.csv', format="csv", header="true", sep=',', schema=customSchema)

In [5]:
df.printSchema()

root
 |-- id: float (nullable = true)
 |-- date: string (nullable = true)
 |-- ticker: string (nullable = true)
 |-- tweet: string (nullable = true)



In [6]:
df.show(20)

                                                                                

+--------+----------+------+--------------------+
|      id|      date|ticker|               tweet|
+--------+----------+------+--------------------+
|100001.0|01/01/2020|  AMZN|$AMZN Dow futures...|
|100002.0|01/01/2020|  TSLA|$TSLA Daddy's dri...|
|100003.0|01/01/2020|  AAPL|$AAPL We’ll been ...|
|100004.0|01/01/2020|  TSLA|$TSLA happy new y...|
|100005.0|01/01/2020|  TSLA|"$TSLA haha just ...|
|100006.0|01/01/2020|  TSLA|$TSLA NOBODY: Gas...|
|    null|      null|  null|                null|
|100007.0|02/01/2020|  AAPL|$AAPL $300 calls ...|
|100008.0|02/01/2020|  AAPL|$AAPL Remember, i...|
|100009.0|02/01/2020|  AAPL|$AAPL called it, ...|
|100010.0|02/01/2020|    HD|$HD Bought more a...|
|100011.0|02/01/2020|  AAPL|Apple is taking t...|
|100012.0|02/01/2020|  AAPL|$AAPL not a bad d...|
|100013.0|02/01/2020|  AAPL|$AAPL where are a...|
|100014.0|03/01/2020|  NVDA|$NVDA This should...|
|100015.0|03/01/2020|  AAPL|$AAPL tomorrow bu...|
|100016.0|03/01/2020|  AAPL|$AAPL Thanks for ...|


In [7]:
# Coverting data type to date in the correct format
df = df.withColumn('date', to_date(df['date'], 'dd/MM/yyyy'))

In [8]:
df.show(20)

+--------+----------+------+--------------------+
|      id|      date|ticker|               tweet|
+--------+----------+------+--------------------+
|100001.0|2020-01-01|  AMZN|$AMZN Dow futures...|
|100002.0|2020-01-01|  TSLA|$TSLA Daddy's dri...|
|100003.0|2020-01-01|  AAPL|$AAPL We’ll been ...|
|100004.0|2020-01-01|  TSLA|$TSLA happy new y...|
|100005.0|2020-01-01|  TSLA|"$TSLA haha just ...|
|100006.0|2020-01-01|  TSLA|$TSLA NOBODY: Gas...|
|    null|      null|  null|                null|
|100007.0|2020-01-02|  AAPL|$AAPL $300 calls ...|
|100008.0|2020-01-02|  AAPL|$AAPL Remember, i...|
|100009.0|2020-01-02|  AAPL|$AAPL called it, ...|
|100010.0|2020-01-02|    HD|$HD Bought more a...|
|100011.0|2020-01-02|  AAPL|Apple is taking t...|
|100012.0|2020-01-02|  AAPL|$AAPL not a bad d...|
|100013.0|2020-01-02|  AAPL|$AAPL where are a...|
|100014.0|2020-01-03|  NVDA|$NVDA This should...|
|100015.0|2020-01-03|  AAPL|$AAPL tomorrow bu...|
|100016.0|2020-01-03|  AAPL|$AAPL Thanks for ...|


In [9]:
df = df.dropna(how='all')
df.show(20)

+--------+----------+------+--------------------+
|      id|      date|ticker|               tweet|
+--------+----------+------+--------------------+
|100001.0|2020-01-01|  AMZN|$AMZN Dow futures...|
|100002.0|2020-01-01|  TSLA|$TSLA Daddy's dri...|
|100003.0|2020-01-01|  AAPL|$AAPL We’ll been ...|
|100004.0|2020-01-01|  TSLA|$TSLA happy new y...|
|100005.0|2020-01-01|  TSLA|"$TSLA haha just ...|
|100006.0|2020-01-01|  TSLA|$TSLA NOBODY: Gas...|
|100007.0|2020-01-02|  AAPL|$AAPL $300 calls ...|
|100008.0|2020-01-02|  AAPL|$AAPL Remember, i...|
|100009.0|2020-01-02|  AAPL|$AAPL called it, ...|
|100010.0|2020-01-02|    HD|$HD Bought more a...|
|100011.0|2020-01-02|  AAPL|Apple is taking t...|
|100012.0|2020-01-02|  AAPL|$AAPL not a bad d...|
|100013.0|2020-01-02|  AAPL|$AAPL where are a...|
|100014.0|2020-01-03|  NVDA|$NVDA This should...|
|100015.0|2020-01-03|  AAPL|$AAPL tomorrow bu...|
|100016.0|2020-01-03|  AAPL|$AAPL Thanks for ...|
|100017.0|2020-01-03|  AAPL|$AAPL leave enoug...|


In [10]:
# Creating instance of the spark context object
sc = SparkContext.getOrCreate()
sc.master

'local[*]'

In [11]:
def clean_tweet(x):
  
  # Delete all the URLs in the tweets
  text00 = re.sub(r'www\S+', '', x)
  text01 = re.sub(r'http\S+', '', text00)
  
  # Delete all the numbers in the tweets
  text1 = ''.join([i for i in text01 if not i.isdigit()])
  
  # Delete all the punctuation marks in the tweets
  text2 = text1.translate(str.maketrans('','',string.punctuation))
  
  # Convert text to LOWERCASE
  text3 = text2.lower()

 # Remove emojis using regex
  text4 = emoji.replace_emoji(text3, replace='')  # Replaces all emojis with an empty string
    
  return text4

In [12]:
# Collecting tweets into a list 
tweet = df.select("tweet").rdd.flatMap(lambda x: x).collect()
type(tweet)

                                                                                

list

In [13]:
# Converting the tweets into an RDD 
tweet_rdd = sc.parallelize(tweet)

In [14]:
# Applying the clean_tweet function
clean_tweet_rdd = tweet_rdd.map(clean_tweet)
clean_tweet_rdd.take(10)

['amzn dow futures up by  points already ',
 'tsla daddys drinkin early tonight heres to a pt of ohhhhh  in  ',
 'aapl we’ll been riding since last december from  what to do decisions decisions hmm  i have  mins to decide any suggestions',
 'tsla happy new year  everyone',
 'tsla haha just a collection of greatsmars rofl  bork',
 'tsla nobody gas cars driven by humans killed s upon s in ',
 'aapl  calls first trade of  congrats to all bulls ',
 'aapl remember if you short every day one of those days you will be right ',
 'aapl called it the bear comment below makes me chuckle inside so sweeet ',
 'hd bought more at todays low she is turning stars aligned']

In [15]:
# The mapping function
# The words are picked out (split up everytime there is a space)
# Only words over three letters are retained
# Every appearance of a word is given a value of one
map = clean_tweet_rdd.flatMap(lambda line: line.split(" ")).filter(lambda x: len(x) > 3).map(lambda word: (word, 1))
map.take(10)

[('amzn', 1),
 ('futures', 1),
 ('points', 1),
 ('already', 1),
 ('tsla', 1),
 ('daddys', 1),
 ('drinkin', 1),
 ('early', 1),
 ('tonight', 1),
 ('heres', 1)]

In [16]:
# The reduce fucntion - the number of instances of each word is counted for all of the tweets
# This wouldn't work, tried multiple functions but kept getting the same error.
# counts = map.reduceByKey(lambda a, b: a + b)
counts = map.reduceByKey(add)

In [17]:
counts = counts.map(lambda x: (x[1],x[0])).sortByKey(False)

24/10/28 14:48:29 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 7)/ 2]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 820, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 828, in func
    return f(iterator)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 3964, in combineLocally
    merger.merge

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 6.0 failed 1 times, most recent failure: Lost task 1.0 in stage 6.0 (TID 7) (10.0.2.15 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 820, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 828, in func
    return f(iterator)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 3964, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_15942/3041315386.py", line 4, in clean_tweet
  File "/usr/local/anaconda3/lib/python3.12/re/__init__.py", line 186, in sub
    return _compile(pattern, flags).sub(repl, string, count)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: expected string or bytes-like object, got 'NoneType'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:976)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1022)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:408)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1021)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 820, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 828, in func
    return f(iterator)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/rdd.py", line 3964, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_15942/3041315386.py", line 4, in clean_tweet
  File "/usr/local/anaconda3/lib/python3.12/re/__init__.py", line 186, in sub
    return _compile(pattern, flags).sub(repl, string, count)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: expected string or bytes-like object, got 'NoneType'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
counts.take(10)