In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 41 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 44.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=093fabd948b97b9c88bd3ba268ae910ef877479bb6af9b4eb61154dba11f181e
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
import html
from pyspark.sql import functions as f
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

spark = SparkSession.builder.getOrCreate()

RAW_PATH = "/content/drive/MyDrive/sentiment-140-training-data/RAW"
CLEAN_PATH = "/content/drive/MyDrive/sentiment-140-training-data/CLEAN"

In [None]:
# Have a first look at the data
raw_data = spark.read.csv(RAW_PATH)  # .cache()

# Let's look at 50 rows of data
raw_data.show(50, False)

+----------+----------------------------+--------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+--------+
|_c0       |_c1                         |_c2     |_c3            |_c4                                                                                                                                      |polarity|
+----------+----------------------------+--------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------+--------+
|2326691715|Thu Jun 25 07:23:01 PDT 2009|NO_QUERY|androslee      |aw snap, win7 discounts tomorrow?  goodbye checkbook   http://tr.im/pIT5                                                                 |0.0     |
|2050459563|Fri Jun 05 18:42:24 PDT 2009|NO_QUERY|ShannonRaeP    |not having a good day today feeling depressed and un motivated all alone all d

In [None]:
schema_ddl = """
polarity STRING COMMENT "the polarity of the tweet (0 = negative, 2 = neutral, 4 = positive)",
id LONG COMMENT "the id of the tweet (2087)",
date TIMESTAMP COMMENT "the date of the tweet (Sat May 16 23:58:44 UTC 2009)",
query STRING COMMENT "the query (lyx). If there is no query, then this value is NO_QUERY.",
user string COMMENT "the user that tweeted (robotickilldozr)",
text string COMMENT "the text of the tweet (Lyx is cool)"
"""

#spark_reader = spark.read.schema(schema_ddl)

In [None]:
simple_date_format = "EEE MMM dd HH:mm:ss zzz yyyy"

raw_data = spark.read.csv(RAW_PATH, inferSchema=True )

raw_data = raw_data.withColumnRenamed("_c4","text")

raw_data.show(5)

+----------+--------------------+--------+-----------+--------------------+--------+
|       _c0|                 _c1|     _c2|        _c3|                text|polarity|
+----------+--------------------+--------+-----------+--------------------+--------+
|2326691715|Thu Jun 25 07:23:...|NO_QUERY|  androslee|aw snap, win7 dis...|     0.0|
|2050459563|Fri Jun 05 18:42:...|NO_QUERY|ShannonRaeP|not having a good...|     0.0|
|2257676117|Sat Jun 20 14:42:...|NO_QUERY|    sweett8|watchinG paRis hi...|     0.0|
|2205642391|Wed Jun 17 05:09:...|NO_QUERY| nuttychris|@nelsonmaud fucki...|     0.0|
|2014908397|Wed Jun 03 03:05:...|NO_QUERY|   mishok13|huh, pylint doesn...|     0.0|
+----------+--------------------+--------+-----------+--------------------+--------+
only showing top 5 rows



In [None]:
# Let's start with getting some data to test our RegEx on
raw_data.select("text").show(50, False)

+-----------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                     |
+-----------------------------------------------------------------------------------------------------------------------------------------+
|aw snap, win7 discounts tomorrow?  goodbye checkbook   http://tr.im/pIT5                                                                 |
|not having a good day today feeling depressed and un motivated all alone all day today                                                   |
|watchinG paRis hiltOn my new BFF [wii] haha anD jOb                                                                                      |
|@nelsonmaud fucking amazing just got shit loadsa work to do  haha                                                                        |
|huh, pylint doesn't

In [None]:
# extract twitter user names/handles to the output column `user_mentioned`

user_regex = r"(@\w{1,15})"


In [None]:
def regexp_extract_all(
    df: DataFrame,
    regex: str,
    no_of_extracts: int,
    input_column_name: str,
    output_column_name: str = "output",
    empty_array_replace: bool = True,
):
    
    repeats = range(0, no_of_extracts)
    
    # A set of interim columns are created that will be dropped afterwards
    match_columns = [f"___{r}___" for r in repeats]
    
    # Apply regexp_extract an r number of times
    for r in repeats:
        df = df.withColumn(
            match_columns[r],
            f.regexp_extract(
                f.col(input_column_name),
                # the input regex string is amended with ".*?" 
                # and repeated an r number of times
                # r needs to be +1 as matching groups are 1-index
                "".join([f"{regex}.*?" for i in range(0, r+1)]),
                r+1,
            ),
        )

    # Create a distinct array, all empty strings removed
    df = df.withColumn(
        output_column_name,
        f.array_remove(f.array_distinct(f.array(match_columns)), ""),
    )

    # Replace empty string with None if empty_array_replace was set 
    if empty_array_replace:
        df = df.withColumn(
            output_column_name,
            f.when(f.size(output_column_name) == 0, f.lit(None)).otherwise(
                f.col(output_column_name)
            ),
        )
    
    # Drop interim columns
    for c in match_columns:
        df = df.drop(c)

    return df




In [None]:
hashtag_regex = "(#\w{1,})"

temp_df = raw_data.filter(f.instr(f.col("text"), "#") > 4)


In [None]:
hashtag_replace_regex = "#(\w{1,})"



In [None]:
url_regex=r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
email_regex=r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"

In [None]:
from pyspark.sql.functions import udf
import html

@udf
def html_unescape(s: str):
    return html.unescape(s)


In [None]:
def regexp_extract_all(
    df: DataFrame,
    regex: str,
    no_of_extracts: int,
    input_column_name: str,
    output_column_name: str = "output",
    empty_array_replace: bool = True,
):
    
    repeats = range(0, no_of_extracts)

    # A set of interim columns are created that will be dropped afterwards
    match_columns = [f"___{r}___" for r in repeats]

    # Apply regexp_extract an r number of times
    for r in repeats:
        df = df.withColumn(
            match_columns[r],
            f.regexp_extract(
                f.col(input_column_name),
                # the input regex string is amended with ".*?"
                # and repeated an r number of times
                # r needs to be +1 as matching groups are 1-indexed
                "".join([f"{regex}.*?" for i in range(0, r + 1)]),
                r + 1,
            ),
        )

    # Create a distinct array, all empty strings removed
    df = df.withColumn(
        output_column_name,
        f.array_remove(f.array_distinct(f.array(match_columns)), ""),
    )

    # Replace empty string with None if empty_array_replace was set
    if empty_array_replace:
        df = df.withColumn(
            output_column_name,
            f.when(f.size(output_column_name) == 0, f.lit(None)).otherwise(
                f.col(output_column_name)
            ),
        )

    # Drop interim columns
    for c in match_columns:
        df = df.drop(c)

    return df


@udf
def html_unescape(s: str):
    if isinstance(s, str):
        return html.unescape(s)
    return s


def cleaning_process(df: DataFrame):
    url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
    email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"
    user_regex = r"(@\w{1,15})"
    hashtag_regex = "(#\w{1,})"
    hashtag_replace_regex = "#(\w{1,})"

    # Cleaning process:

    # 1. Extract user mentions and hashtags
    df = regexp_extract_all(df, user_regex, 6, "text", "users_mentioned", True)
    df = regexp_extract_all(df, hashtag_regex, 6, "text", "hashtags", True)

    # 2. Remove email addresses, URLs, and user mentions
    # Order is important here
    df = (
        df
        .withColumn("original_text", f.col("text"))
        .withColumn("text", f.regexp_replace(f.col("text"), url_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), email_regex, ""))
        .withColumn("text", f.regexp_replace(f.col("text"), user_regex, ""))

    )

    # 3. Extract words from hashtags
    df = df.withColumn(
        "text", f.regexp_replace(f.col("text"), hashtag_replace_regex, "$1")
    )

    # 4. Unescape HTML
    df = df.withColumn("text", html_unescape("text"))

    # 5. Remove non - words
    df = df.withColumn(
        "text", f.regexp_replace(f.col("text"),r"[^A-za-z]", " ")
    )
    

    # 6. Remove multiple white spaces
    df = df.withColumn(
        "text", f.regexp_replace(f.col("text"),r"[\s]{2,}", " ")
    )

    # 7. To Lowercase
    df = df.withColumn(
        "text", f.lower(f.col("text")) 
    )

    # 8. Trimming Text
    df = df.withColumn(
        "text", f.trim(f.col("text")) 
    )

    # 9. Drop all rows with no data in text after cleaning steps were applied
    df = df.filter(f.col("text") != "").na.drop(subset="text")

    
    return df

schema = """
polarity STRING COMMENT "the polarity of the tweet (0 = negative, 2 = neutral, 4 = positive)",
id INT COMMENT "the id of the tweet (2087)",
date STRING COMMENT "the date of the tweet (Sat May 16 23:58:44 UTC 2009)",
query STRING COMMENT "the query (lyx). If there is no query, then this value is NO_QUERY.",
user string COMMENT "the user that tweeted (robotickilldozr)",
text string COMMENT "the text of the tweet (Lyx is cool)"
"""

clean_data = cleaning_process(raw_data)

clean_data.select("text").show(250, False)



+--------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                  |
+--------------------------------------------------------------------------------------------------------------------------------------+
|aw snap win discounts tomorrow goodbye checkbook                                                                                      |
|not having a good day today feeling depressed and un motivated all alone all day today                                                |
|watching paris hilton my new bff [wii] haha and job                                                                                   |
|fucking amazing just got shit loadsa work to do haha                                                                                  |
|huh pylint doesn t handle yield expressi

In [None]:
raw_data.count()

1600000

In [None]:
clean_data.count()

1596236

In [None]:
clean_data.filter("text == ''").show(1000)

+---+---+---+---+----+--------+---------------+--------+-------------+
|_c0|_c1|_c2|_c3|text|polarity|users_mentioned|hashtags|original_text|
+---+---+---+---+----+--------+---------------+--------+-------------+
+---+---+---+---+----+--------+---------------+--------+-------------+



In [None]:
clean_data = clean_data.select(["text","polarity"])
clean_data.limit(10).toPandas()

Unnamed: 0,text,polarity
0,aw snap win discounts tomorrow goodbye checkbook,0.0
1,not having a good day today feeling depressed ...,0.0
2,watching paris hilton my new bff [wii] haha an...,0.0
3,fucking amazing just got shit loadsa work to d...,0.0
4,huh pylint doesn t handle yield expressions cr...,0.0
5,wants to get out but it s a little late,0.0
6,thought i noticed some chemistry,0.0
7,finally figured out how to make roller coaster...,0.0
8,watchin tv but it s alomst time to go to sleep...,0.0
9,can you wish happy birthday please stop reply ...,0.0


In [None]:
clean_data.write.parquet(CLEAN_PATH)

In [None]:
spark.stop()