In [6]:
!pip3 install transformers

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Collecting transformers
  Downloading transformers-4.26.1-py3-none-any.whl (6.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.3/6.3 MB[0m [31m16.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting regex!=2019.12.17
  Downloading regex-2022.10.31-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (757 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m757.1/757.1 kB[0m [31m41.3 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.11.0
  Downloading huggingface_hub-0.12.1-py3-none-any.whl (190 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m190.3/190.3 kB[0m [31m123.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: regex, huggingface-hub, transformers
Successfully installed huggingface-hub-0.12.1 regex-2022.10.31 transformers-4.26.1


In [7]:
!pip install google.cloud.bigquery
!pip install google.cloud.storage

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com


In [8]:
!pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cpu

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com, https://download.pytorch.org/whl/cpu


In [9]:
!pip3 install google-cloud-logging
!pip3 install google-cloud-storage

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com


In [10]:
from transformers import AutoModelForSequenceClassification
from transformers import AutoTokenizer
import numpy as np
from scipy.special import softmax
import csv
import urllib.request
import logging
import os
from pathlib import Path
import pandas as pd
import time 
import sys
from typing import Union
import google.cloud.aiplatform as vertex_ai
from google.cloud import bigquery
import google.cloud.logging_v2 as logging_v2
from google.cloud import storage

In [11]:
bq_client = bigquery.Client(project="fake-news-bears")
storage_client = storage.Client()

In [12]:

client = logging_v2.client.Client()

# set the format for the log
google_log_format= logging.Formatter(
fmt='%(name)s | %(module)s | %(funcName)s | %(message)s',
datefmt='%Y-%m-$dT%H:%M:%S')


handler = client.get_default_handler()
handler.setFormatter(google_log_format)

cloud_logger = logging.getLogger("scoring-model-logger")
cloud_logger.setLevel("INFO")
cloud_logger.addHandler(handler)

log = logging.getLogger("vertex-ai-notebook-logger")
log.info("This is a log from the scoring model notebook")
log.info("Finished Downloading all required files. Now getting dataframe from BQ and loading libraries")


In [13]:
def run_bq_query(sql: str) -> Union[str, pd.DataFrame]:
    """
    Input: SQL query, as a string, to execute in BigQuery
    Returns the query results as a pandas DataFrame, or error, if any
    """

    # Try dry run before executing query to catch any errors
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    bq_client.query(sql, job_config=job_config)

    # If dry run succeeds without errors, proceed to run query
    job_config = bigquery.QueryJobConfig()
    client_result = bq_client.query(sql, job_config=job_config)

    job_id = client_result.job_id

    # Wait for query/job to finish running. then get & return data frame
    df = client_result.result().to_arrow().to_pandas()
    print(f"Finished job_id: {job_id}")
    return df

In [19]:
sql_query = """
SELECT * FROM `fake-news-bears.usa_congress_twitter.tweets`
WHERE text != ""
"""

df_input= run_bq_query(sql_query)

Finished job_id: 7fee4441-cd21-4e18-b57d-46b9e8fbd417


In [20]:
## We make a partition of the dataframe_tweets. We then modify part of it but its' not part of the parent dataframe so this error comes up. This is set to ignore that error since we append to list and it doesn't matter
pd.options.mode.chained_assignment = None 

## Supress scientific notation for values
pd.set_option('display.float_format', str)

# Tasks:
# emoji, emotion, hate, irony, offensive, sentiment
# stance/abortion, stance/atheism, stance/climate, stance/feminist, stance/hillary

## For now only doing 3 ML binary ml models: hate, irony and offensive. Will add more as we see fit.  
potentialTasks = ['hate' ,'irony', 'offensive','all']
defaultModel = "cardiffnlp/twitter-roberta-base" ## Choosing this model for this script. Will add other models in different scripts and import base functions from this script
task = "all" ##which task are you looking for
runtime = str(round(time.time(),0))[:-2]

bucket_name = "fake_news_bears_scoring_model"
folder = f'scoring_run_{runtime}/'
bucket = storage_client.get_bucket(bucket_name)

In [21]:
def inputValidation(task):
    if (task.lower() in potentialTasks):
        log.info(f"\t Task is a valid choice. Moving on")
        task = task.lower()
    else:
        raise Exception(f"\t Task is not a valid Task. Choose one of the following: {potentialTasks}")
        sys.exit(1)
    return task

# Preprocess text (username and link placeholders)
def preprocess(text):
    new_text = []
    for t in text.split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)

def getTasks(task):
    if task == "hate":
        tasks = ["hate"]
    elif task == "offensive":
        tasks = ["offensive"]
    elif task == ["irony"]:
        tasks = "irony"
    else:
        tasks = ["hate","irony","offensive"]
    return tasks
## Will make this yargs later on for model directory
## Default choice - twitter-roberta-base
def get_Tokenizer(token_name,task):
    model_dir = os.getcwd() + '/model'
    os.makedirs(model_dir, exist_ok=True)

    TOKEN_repo = f"{token_name}-{task}"
    roberta_model = model_dir + f'/{TOKEN_repo}'
    tokenizer_config_file = roberta_model + '/tokenizer_config.json'

    if Path(tokenizer_config_file).is_file():
        log.info(f"\t Tokenizer File is ready. Loading Token File")
        log.debug(f"\t Token config file:{tokenizer_config_file}")
        tokenizer = AutoTokenizer.from_pretrained(roberta_model)
    else:
        log.info(f"\t Downloading Token files to {roberta_model} directory")
        log.debug(f"\t Model Token file:{tokenizer_config_file}")
        FILE_t_repo = f"./model/{TOKEN_repo}"
        tokenizer = AutoTokenizer.from_pretrained(TOKEN_repo,force_download=True)
        tokenizer.save_pretrained(FILE_t_repo) ##Choosing to save token files so that we can reuse when we dockerize and API this setup

    return tokenizer

def get_Model(model_name,task):
    model_dir = os.getcwd() + '/model'
    os.makedirs(model_dir, exist_ok=True)

    MODEL_repo = f"{model_name}-{task}"
    roberta_model = model_dir + f'/{MODEL_repo}'
    model_config_file = roberta_model + '/config.json'

    if Path(model_config_file).is_file():
        log.info(f"\t Model File is ready. Loading Model File")
        model = AutoModelForSequenceClassification.from_pretrained(roberta_model)
    else:
        log.info(f"\t Downloading Model files to {roberta_model} directory")
        FILE_m_repo = f"./model/{MODEL_repo}"
        model = AutoModelForSequenceClassification.from_pretrained(MODEL_repo,force_download=True)
        model.save_pretrained(FILE_m_repo) ##Choosing to save model files so that we can reuse when we dockerize and API this setup
        
    return model

def get_Labels(task):
    labels=[]
    mapping_link = f"https://raw.githubusercontent.com/cardiffnlp/tweeteval/main/datasets/{task}/mapping.txt"
    with urllib.request.urlopen(mapping_link) as f:
        html = f.read().decode('utf-8').split("\n")
        csvreader = csv.reader(html, delimiter='\t')
    labels = [row[1] for row in csvreader if len(row) > 1]
    log.debug(f"\t Labels are {labels}")
    return labels

def writeListtoFile(fileList,directory):
    log.debug(f"\t Writing data to file in {directory}")
    with open(rf"{directory}", 'w+') as fp:
        for item in fileList:
            fp.write("%s\n" % item)

def readFiletoList(directory):
    fileList = []
    with open(rf"{directory}", 'r') as fp:
        for line in fp:
            item = line[:-1]
            fileList.append(item)
    return fileList

def get_scoring_csvs(dataframe,task):
    validateDataFrame(dataframe)
    label = get_Labels(task)[0]
    tokenizer = get_Tokenizer(defaultModel,task)
    model = get_Model(defaultModel,task)
    user_output_label = "low_3_" + str.replace(label,'-','_') + "_tweets"

    saveDirectory = os.getcwd()+'/savestate'
    os.makedirs(saveDirectory, exist_ok=True)
    saveFile = saveDirectory + f"/{task}_users.txt"

    if Path(saveFile).is_file():
        log.info(f"\t Saved File exists from previous run. Continuing from Saved File in {saveFile}")
        userPartition = readFiletoList(saveFile)
        totalUsers = len(userPartition)
    else:
        log.info(f"\t First time running job for {label}. Save file is in {saveFile}")
        userPartition = dataframe['author_id'].unique().tolist()
        writeListtoFile(userPartition,saveFile)
        totalUsers = len(userPartition)

    users_directory = os.getcwd()+f'/users_score_{task}/'
    tweets_directory = os.getcwd()+f'/tweets_score_{task}/'
    log.debug(f"\t Making Users directory:{users_directory}\n\t Making Tweets directory:{tweets_directory}")
    os.makedirs(users_directory, exist_ok=True)
    os.makedirs(tweets_directory, exist_ok=True)
    user_final_list = []
    tweet_final_list = []
    time_list=[]
    progress = 0

    for user in userPartition:
        user_scoring_list = []
        tweet_scoring_list = []
        userFile = users_directory + "user_" + str(user) + "_" + str.replace(label,'-','_') + ".csv"
        tweetFile = tweets_directory + "tweets_" + str(user) + "_" + str.replace(label,'-','_') + ".csv"
        startTime = time.time()
        
        dataframe_user=dataframe.loc[dataframe['author_id'] == user].astype(str) ##Prevent scientific notation
        log.debug(f"\t There are {len(dataframe_user)} text rows to go through for user_id:{user}")
        encoded_series = dataframe_user['text'].apply(lambda x: tokenizer(x, return_tensors='pt'))
        features = encoded_series.apply(lambda x: model(**x))
        scores = features.apply(lambda x: x[0][0].detach().numpy())
        scores_softmax = scores.apply(lambda x: softmax(x))
        dataframe_user[label] = scores_softmax.apply(lambda x: x[0])
        
        score_value = round(dataframe_user[label].mean(),4) ##For now using mean but can change to median easily
        top_3_btweets = dataframe_user[[label,'text']].sort_values(by=[label], ascending=True).head(3).values.tolist()
        user_scoring_list.append([user,score_value,top_3_btweets])
        tweet_scoring_list.extend(dataframe_user[['author_id','id',label]].values.tolist())
        user_final_list.append([user,score_value,top_3_btweets])
        tweet_final_list.extend(dataframe_user[['author_id','id',label]].values.tolist())
        
        user_DF = pd.DataFrame(user_scoring_list, columns = ['user_id',label,user_output_label])
        tweet_DF = pd.DataFrame(tweet_scoring_list, columns = ['user_id','tweet_id',label]) 
        user_DF.to_csv(userFile, encoding='utf-8',index=False)
        tweet_DF.to_csv(tweetFile,encoding='utf-8', index=False)

        with open(saveFile, "r") as fp: ##Fixing Saved File in case it needs to restart. 
            lines = fp.readlines()

        with open(saveFile, "w") as fp: ##Writing remaining user_ids back in 
            for line in lines:
                if line.strip("\n") != str(user):
                    fp.write(line)
                else:
                    log.debug(f"\t removing user {user}")

        endTime = time.time()
        progress += 1
        time_list.append([progress,round((endTime-startTime),3)])
        log.debug(f"\t User Scoring List is \n\t {user_scoring_list}.  \n\t The size of the tweet scoring list is {len(user_scoring_list)}")
        log.debug(f"\t Tweet Scoring List is \n\t {tweet_scoring_list}.  \n\t The size of the tweet scoring list is {len(tweet_scoring_list)}")
        log.info(f"\t Finished {progress} users. {totalUsers-progress} users to go")
        log.debug(f"\t Duration of loop is {round((endTime-startTime),3)} seconds. Runs so far are {time_list}")
        time.sleep(2) ##Throttles CPU to make it manageable.  Encoding step is a ton of CPU cost and theirs no way around it.
    tweets_final_DF = pd.DataFrame(tweet_final_list, columns = ['user_id','tweet_id',label])
    users_final_DF = pd.DataFrame(user_final_list, columns = ['user_id',label,user_output_label])
    return users_directory,tweets_directory,time_list,tweets_final_DF,users_final_DF

def validateDataFrame(dataframe):
    if isinstance(dataframe, pd.DataFrame):
        if {'author_id', 'text'}.issubset(dataframe.columns):
            dataframe['author_id'] = dataframe['author_id'].astype(str)
            log.debug('\t Dataframe Validated')
        else:
            raise Exception("Dataframe doesn't have [author_id] or [text] columns. Verify dataframe.columns exist and rename if necessary")
            sys.exit(1)
    else:
        raise Exception("Object is not DataFrame.  Please pass in valid DataFrame")
        sys.exit(1)
        
def csv_to_gcs(folder,task,directory):
    final_folder = folder+task+"/"
    folder_blob = bucket.blob(final_folder)
    folder_blob.upload_from_string('')
    
    for filename in os.listdir(directory):
        if filename.endswith(".csv"):
            try:
                blob = bucket.blob(final_folder + filename)
                blob.upload_from_filename(os.path.join(directory, filename))
            except Exception as e:
                print(f"Error uploading {filename}: {str(e)}")
    return final_folder

In [None]:
inputValidation(task)
tasks = getTasks(task)
timeMetrics=[]
for task in tasks:
    directories=[]
    users_directory,tweet_directory,time_list,tweets_final_DF,users_final_DF = get_scoring_csvs(df_input,task)
    log.info(f"\t Finished {task}. {task} user files are in {users_directory} and {task} tweet files are in {tweet_directory}")
    directories.extend([users_directory])
    directories.extend([tweet_directory])
    timeMetrics.append([task,time_list])
    log.info(f"Uploading files to GCS")
    for directory in directories:
        csv_to_gcs(folder,task,directory)
        log.info(f"Files have been uploaded to {bucket_name}")
    ## Add akiko's DF -> BQ function here
log.info(f"Finished All Tasks! Files are in {bucket_name} in folder {folder}")

