In [None]:
!pip install 'apache-beam' --user
!pip install "apache-beam[gcp,interactive,dataframe]" --user

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import GlobalWindows
import text2emotion as te
import stanza
import time
from datetime import date

# Initialize your stanza and text_emotion models
nlp = stanza.Pipeline()



# Initialize your stanza and text_emotion models
nlp = stanza.Pipeline()

pipeline_options = PipelineOptions()

In [None]:
def classify_text(text):
    doc = nlp(text)
    sentiment_score = sum([sentence.sentiment for sentence in doc.sentences]) / len(doc.sentences)
    topics = list()
    for sentence in doc.sentences:
        for entity in sentence.ents:
            if entity.type != 'O':  # Exclude non-entity tokens
                topics.append(entity.text)
    topics = ",".join(topics)
    emotions = te.get_emotion(text)
    return [sentiment_score, topics, emotions]

class ClassifyTextDoFn(beam.DoFn):
    def process(self, element):
        text = element['text']  # Replace 'your_text_column_name' with the actual column name containing the text
        
        # Use the shared nlp and te objects
        doc = nlp(text)
        
        sentiment_score = sum([sentence.sentiment for sentence in doc.sentences]) / len(doc.sentences)
        
        topics = list()
        for sentence in doc.sentences:
            for entity in sentence.ents:
                if entity.type != 'O':  # Exclude non-entity tokens
                    topics.append(entity.text)
        topics = ",".join(topics)
        
        emotions = te.get_emotion(text)
        
        # Yield each value separately
        yield {
            'comment': text,
            'sentiment': sentiment_score,
            'topic': topics,
            'happiness': emotions["Happy"],
            'anger': emotions["Angry"],
            'sadness': emotions["Sad"],
            'fear': emotions["Fear"]
        }

pipeline_options = beam.options.pipeline_options.PipelineOptions(temp_location = 'gs://reddit_raw_data_0184598608709384596/temp/',
                                                                project='ml-deployments-practice',
                                                                region = "europe-west1")



start = time.time()
today = str(date.today())
with beam.Pipeline(options=pipeline_options) as p:
    
    query = f"SELECT * FROM reddit_comment_data.raw_daily_hot_comments WHERE date= {today}"
    
    data = (
        p
        | 'Read from BigQuery' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query))
        | 'Classify text' >> beam.ParDo(ClassifyTextDoFn())
        | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
            table='ml-deployments-practice.reddit_comment_data.beam_test_output',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            custom_gcs_temp_location='gs://reddit_raw_data_0184598608709384596/temp/'
        )
    )
    
end = time.time()
print(end - start)