In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from preproc_functions import preprocess_all
from pyspark.sql import functions as F
import pandas as pd
from preproc_functions import emoji_to_words

spark = SparkSession \
        .builder \
        .master('spark://10.10.28.172:7077') \
        .appName('pre_processing') \
        .enableHiveSupport() \
        .config("spark.pyfiles", "preproc_functions.py") \
        .getOrCreate()

# Change to 'OFF' if it produces too much output
spark.sparkContext.setLogLevel('OFF')  
# files used for preprocessing
spark.sparkContext.addPyFile("/home/ubuntu/twitter_sentiment/code/preproc_functions.py")

In [8]:
# name of database
spark.sql('use twitter_data')

query = '''
    SELECT *
    FROM raw_data
    WHERE text IS NOT NULL
    AND created_at IS NOT NULL
    AND id IS NOT NULL
'''

raw_data = spark.sql(query)

In [9]:
udf_preprocessing = udf(lambda text: preprocess_all(text), StringType())
raw_data = raw_data.withColumn('text', udf_preprocessing(col('text')))

In [10]:
data = raw_data.select(['id', 'text'])

In [11]:
spark.sql('select text from raw_data where id=1458961863723700231').show(5)



+--------------------+
|                text|
+--------------------+
|Here's the play: ...|
+--------------------+



                                                                                

### 
1. translates emojies
2. removes user mentions
3. removes urls
4. removes all non-numerical characters
5. all text to lowercase

In [12]:
udf_translate_emojies = udf(lambda text: emoji_to_words(text))
user_regex = r'(@\w{1,15})'
url_regex = r'(https?:\/\/)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)'
neg_alphanumeric_regex = r'([^ a-zA-Z])'

data.\
    withColumn('text', udf_translate_emojies(col('text'))). \
    withColumn('text', F.regexp_replace(col('text'), url_regex, '')). \
    withColumn('text', F.regexp_replace(col('text'), user_regex, '')). \
    withColumn('text', F.regexp_replace(col('text'), neg_alphanumeric_regex, '')). \
    withColumn('text', F.lower(col('text'))
).show(5)


+-------------------+--------------------+
|                 id|                text|
+-------------------+--------------------+
|1486390083485843460|ive deleted my tw...|
|1486390081246085125|oil hits highest ...|
|1486390080956944385| did tucker carls...|
|1486390059498713099| west supports la...|
|1486390057896456200|russia wont invad...|
+-------------------+--------------------+
only showing top 5 rows



----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 52052)
Traceback (most recent call last):
  File "/usr/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pyspark/accumulators.py", line 235, in poll
    if func():
  File "/home/ubuntu/.local/lib/python3.8/site-packages/pyspark/accumulators.py", line 239, in accum_updates
    num_updates = read_int(