In [0]:

from pyspark.sql.session import SparkSession
import time
import boto3, botocore
import pandas as pd
from pyspark.sql.functions import udf, explode, lower, regexp_replace,to_timestamp,from_unixtime ,pandas_udf, PandasUDFType,length,split

# Return a data frame with the files in the source directory
def get_source_listing_df() -> pd.DataFrame:
    # Create a boto3 resource for S3 using anonymous credentials
    s3 = boto3.resource('s3', config=boto3.session.Config(signature_version=botocore.UNSIGNED))

    # Create a Bucket object
    bucket = s3.Bucket(TWEET_BUCKET_NAME)

    # List objects in the bucket
    objects = [obj.key for obj in bucket.objects.all()]

    # Convert the list of objects to a Pandas DataFrame
    df = pd.DataFrame(objects, columns=['File Name'])

    return df

# Show the contents of a file stored in S3
def show_s3_file_contents(filename: str) -> str:
    # Create a boto3 resource for S3 using anonymous credentials
    s3 = boto3.resource('s3', config=boto3.session.Config(signature_version=botocore.UNSIGNED))
    # Show the first record
    obj = s3.Object(TWEET_BUCKET_NAME, filename)
    data=obj.get()['Body'].read()
    return(data)

# This routine requires the paths defined in the includes notebook
# and it clears data from the previous run.
def clear_previous_run() -> bool:
    # delete previous run 
    dbutils.fs.rm(BRONZE_CHECKPOINT, True)
    dbutils.fs.rm(BRONZE_DELTA, True)
    dbutils.fs.rm(SILVER_CHECKPOINT, True)
    dbutils.fs.rm(SILVER_DELTA, True)
    dbutils.fs.rm(GOLD_CHECKPOINT, True)
    dbutils.fs.rm(GOLD_DELTA, True)
    return True

def stop_all_streams() -> bool:
    stopped = False
    for stream in spark.streams.active:
        stopped = True
        stream.stop()
    return stopped


def stop_named_stream(spark: SparkSession, namedStream: str) -> bool:
    stopped = False
    for stream in spark.streams.active:
        if stream.name == namedStream:
            stopped = True 
            stream.stop()
    return stopped

def wait_stream_start(spark: SparkSession, namedStream: str) -> bool:
    started = False
    count = 0
    if started == False and count <= 3:
        for stream in spark.streams.active:
            if stream.name == namedStream:
                started = True
        count += 1
        time.sleep(10)
    return started   


def untilStreamIsReady(namedStream: str, progressions: int = 3) -> bool:
    queries = list(filter(lambda query: query.name == namedStream, spark.streams.active))
    while len(queries) == 0 or len(queries[0].recentProgress) < progressions:
        time.sleep(5)
        queries = list(filter(lambda query: query.name == namedStream, spark.streams.active))
    print("The stream {} is active and ready.".format(namedStream))
    return True 


@pandas_udf("struct<sentiment_label:string, sentiment_score:double, sentiment_id:int>", PandasUDFType.SCALAR)
def predict_sentiment(texts: pd.Series) -> pd.DataFrame:
    # Apply prediction to each text entry
    results = texts.apply(lambda text: sentiment_model.predict([text]))
 
    # Extract data into separate series
    labels = results.apply(lambda result: result['label'][0])
    scores = results.apply(lambda result: result['score'][0])
    sentiment_ids = labels.apply(lambda label: 1 if label == 'POS' else 0)
 
    return pd.DataFrame({
        'sentiment_label': labels,
        'sentiment_score': scores,
        'sentiment_id': sentiment_ids
    })
 
#Method to clean or preprocess Text like removing Stop words, Punctuation.
def clean_text(text):
    import nltk
    from nltk.stem import WordNetLemmatizer
    from nltk.corpus import stopwords
    # Initialize the lemmatizer
    nltk.download('punkt')
    nltk.download('wordnet')
    nltk.download('stopwords')
    lemmatizer = WordNetLemmatizer()
    
    # Get English stopwords
    stop_words = set(stopwords.words('english'))
    
    # Remove punctuation
    text = text.translate(str.maketrans('', '', string.punctuation))
    
    # Lemmatize words, remove stopwords and extra spaces
    # Tokenize the text, lemmatize each word, and remove stopwords
    words = nltk.word_tokenize(text)
    lemmatized_text = ' '.join([lemmatizer.lemmatize(word).lower() for word in words if word.lower() not in stop_words])
    return lemmatized_text

#Extracting the mentions from the text
def extract_mentions(text):
    # Find all @mentions and remove '@'
    mentions = ' '.join(set(part[1:] for part in text.split() if part.startswith('@')))
    return mentions

# TO remove the mentions in text after removing the handles.
def remove_mentions_from_text(cleaned_text, mention):
    if mention and mention.strip():
        return cleaned_text.replace(mention, "")
    return cleaned_text

