In [2]:
import re
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

# Function to clean tweet
def clean_tweet(text):
    if text:
        text = str(text)

        # Remove URLs
        text = re.sub(r'http\S+|www\S+|https\S+', '', text)

        # Remove user mentions and hashtags
        text = re.sub(r'\@\w+|\#', '', text)

        # Remove leading 'b' from bytes literal strings
        text = re.sub(r"^b'", "", text)
        text = re.sub(r'^b"', "", text)

        # Remove reserved words (RT, FAV)
        text = re.sub(r'\bRT\b|\bFAV\b', '', text)

        # Remove punctuation and unwanted characters
        text = re.sub(r'[^\w\s]', '', text)

        # Remove numbers
        text = re.sub(r'\d+', '', text)

        # Remove emojis
        emoji_pattern = re.compile("["
                                   u"\U0001F600-\U0001F64F"  # emoticons
                                   u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                                   u"\U0001F680-\U0001F6FF"  # transport & map symbols
                                   u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                                   "]+", flags=re.UNICODE)
        text = emoji_pattern.sub(r'', text)

        # Remove additional unwanted characters
        text = re.sub(r"\?|\_|\-|\!|\,|\...|\&|\:|\;|\'s|\(|\)|\.", '', text)
        text = re.sub(r"\\n", '', text)

        # Remove extra spaces and convert to lowercase
        text = text.strip()
        text = text.lower()

        return text
    return ""

# Register the UDF
clean_tweet_udf = udf(clean_tweet, StringType())

# Load the data
df = spark.sql("SELECT * FROM twitterdata.twitter_raw")

# Clean the tweets
df = df.withColumn('Tweet', clean_tweet_udf(col('Tweet')))
df = df.filter((col('Tweet') != 'xexxa') & (col('Tweet') != ''))
df = df.select('id', 'Created_At', 'Tweet')

# Show the cleaned DataFrame (optional)
display(df)

# Save the cleaned tweets to a new table
df = df.drop_duplicates()
df.write.mode('overwrite').saveAsTable('clean_tweets')


StatementMeta(, bd52cef3-7254-4560-8bbb-863935c14c41, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 64d436bf-e64d-475d-921d-f8fa6384b83d)

For tokenization, import nltk and download the required module of nltk
- Remove stopwords and save the data as a delta table named rem_stopwords

In [8]:
import nltk
from nltk.corpus import stopwords

# Download stopwords
nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

StatementMeta(, bd52cef3-7254-4560-8bbb-863935c14c41, 10, Finished, Available)

[nltk_data] Downloading package stopwords to /home/trusted-service-
[nltk_data]     user/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [9]:
# Remove stopwords and short words from 'Tweet'

def filter_words(text):
    words = text.split()
    filtered_words = [word for word in words if word not in stop_words and len(word) > 4]
    return ' '.join(filtered_words)

filter_words_udf = udf(filter_words, StringType())

    #Load clean_tweets
df = spark.sql("SELECT * FROM twitterdata.clean_tweets")
df = df.withColumn('Tweet', filter_words_udf(col('Tweet')))

# Save data    

table_name = "rem_stopwords"
df.write.format('delta').mode('overwrite').saveAsTable(table_name)

StatementMeta(, bd52cef3-7254-4560-8bbb-863935c14c41, 11, Finished, Available)

In [10]:
# Function to remove banned words
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, udf, regexp_replace, lower,to_date,date_format
def remove_banned_words():
    df = spark.sql("SELECT * FROM twitterdata.rem_stopwords")
    # List of banned words
    banned_words = set(['xe2','x80','xa6','xf0' ,'x9f','x99','x99t','xa3','xa4','xc2','xe7x8fxe5x88xa9xe5xa7xac',
                    'xc2xa1nfidel','xc2xa1dxc2xa10t','x98x94x98x94',"'",'x94','b','b',"''",'','xa3i',"'m",
                    'x98x81',"n't",'x87xacx87xa7','esn','\\\\\\', 'foreve\\\\\\', '\\xc2\\xa1d\\xc2\\xa10t',
                    '\\xc2\\xa1nfidel','th\\\\\\','m','\\\\\\\\xa3I','\\xe7\\\\x8f\\xe5\\x88\\xa9\\xe5\\xa7\\xac',
                    '\\\\\\x98\\x94\\\\\\x98\\x94','xe7x8fxe5x88xa9xe5xa7xac','xc2xa1nfidel','xc2xa1dxc2xa10t','xexxa',
                    'x98x94x98x94',"'",'x94','b','b',"''",'','xa3i',"'m",'x98x81',"n't",'x87xacx87xa7','esn',
                    '\\\\\\', 'foreve\\\\\\', '\\xc2\\xa1d\\xc2\\xa10t','\\xc2\\xa1nfidel','th\\\\\\','m',
                    '\\\\\\\\xa3I','...','\\xe7\\\\x8f\\xe5\\x88\\xa9\\xe5\\xa7\\xac','\\\\\\x98\\x94\\\\\\x98\\x94','x87xacx87xa7'])

    # Create a regular expression pattern that matches any of the banned words
    pattern = '|'.join(re.escape(word) for word in banned_words)
    
    # Remove banned words from the 'Tweet' column
    df = df.withColumn('Tweet', regexp_replace(col('Tweet'), pattern, '')) 
    df= df.withColumn('Tweet', regexp_replace(col('Tweet'), r'xexxa\b', ''))         

    df = df.withColumn('Created_At',date_format(to_date('Created_At'),'dd-MMM-yyyy'))
    df = df.filter((col('Tweet') != ''))
    df.write.format('delta').mode('overwrite').saveAsTable('final_tweets')
    
    # Save final cleaned tweets
    # Save data   

remove_banned_words() 

StatementMeta(, bd52cef3-7254-4560-8bbb-863935c14c41, 12, Finished, Available)