Dependencys

In [3]:
!pip install openpyxl
!pip install requests
!pip install pandas-gbq --user

Code

In [None]:
from google.cloud import storage
import pandas as pd
import numpy as np
from google.cloud import aiplatform
from google.cloud.aiplatform.gapic.schema import predict
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
from typing import Sequence, Union



""" This function performs a batch prediction job using the specified AI Platform model,
    source and destination.Returns: output_files (List[str]): The names of the files in 
    the GCS destination location that contain the prediction results. """


def batch_prediction_job(
    project: str,
    location: str,
    model_resource_name: str,
    job_display_name: str,
    gcs_source: Union[str, Sequence[str]],
    gcs_destination: str,
    sync: bool = True,
    ):
    
    
    # Initialize the AI Platform client for the given project and location
    aiplatform.init(project=project, location=location)

    # Get the AI Platform model to use for prediction
    my_model = aiplatform.Model(model_resource_name)

    # Submit the batch prediction job using the model, source, and destination
    batch_prediction_job = my_model.batch_predict(
        job_display_name=job_display_name,
        gcs_source=gcs_source,
        gcs_destination_prefix=gcs_destination,
        sync=sync,
    )
    
    # Get the names of the prediction results files in the GCS destination location
    client = storage.Client(project=project)
    bucket = client.bucket(gcs_destination.split("/")[2])
    blobs = bucket.list_blobs(prefix=gcs_destination.split("/", 3)[-1])
    output_files = [blob.name for blob in blobs]
    
    return output_files


    
    
class Param:
    
    def __init__(self):
        # path to the input file
        self.file_input = 'gs://data_sentiment_v5/SSS/sss_onsite/on_site_2023/SSS DB Power BI - Sentimiento OnSite  1-5 Febrero 2023.xls
        # project ID 
        self.project_id = "28########04"
        # model ID for off-site sentiment analysis
        self.model_id_off_site ="58############88"
        # model ID for on-site sentiment analysis
        self.model_id_on_site = "35############28"
        # input URI for batch predictions
        self.input_uri = "gs://batch_predictions_sentiment/Prediction_Masive_Sentiment/Main_Control_Batch/Prediction_Main_Batch.jsonl"
        # output URI for batch predictions
        self.output_uri = "gs://batch_predictions_sentiment/Prediction_Masive_Sentiment/Output_batch/"
        # location for cloud services
        self.location="us-central1"
        # display name for the batch prediction job
        self.job_display_name = "new_job"
        # path for processing batch data
        self.Processing_Batch = 'gs://batch_predictions_sentiment/Prediction_Masive_Sentiment/Prosecing_Masive_Data/'
        # path for main control batch data
        self.Path_Control = 'gs://batch_predictions_sentiment/Prediction_Masive_Sentiment/Main_Control_Batch/'
        # initial string for constructing the content URI
        self.complement_ini = "{\'content\' :\'gs://batch_predictions_sentiment/Prediction_Masive_Sentiment/Prosecing_Masive_Data/"
        # end string for constructing the content URI
        self.complement_end = ".txt\', \'mimeType\': \'text/plain\'}"
                               


class Input_Data:
    def __init__(self, Route):
        #Store the path to the input file
        self.Route = Route
    
    def Data_Sent_Filt(self):
        #Read the input file and return a DataFrame with only "Id_Model" and "Message" columns.
        df = pd.read_excel(self.Route)
        Data = df.loc[:,['Id_Model','Message']]
        return Data
    
    def Data_Sent_All(self):
        #Read the input file and return a DataFrame with all columns.
        df = pd.read_excel(self.Route)
        return df



    
class Data_Frame:
    def __init__(self, Data):
        self.Data = Data
        #Store the input data in the instance.
    def Transform(self):
        #Filter the data to only include the rows where the "Message" column is not null.
        All_Clean = self.Data
        Sentimentnot = All_Clean[All_Clean.Message.notnull()]
        return Sentimentnot
       

class Generator:
    def __init__(self, Sentiment,Path):
        #Store the sentiment data and the path for the output files.
        self.Sentiment = Sentiment
        self.Path = Path
        
    def Iterator(self):
        #Create a list of sentiment data and generate a text file for each sentiment in the list.
        df = self.Sentiment
        Sentiment_df = df['Message'] 
        General_list = Sentiment_df.to_numpy().tolist()
        New_df = pd.DataFrame (General_list, columns = ['Sentiment'])
        
        for i in range(len(New_df)):
            df_fn = (New_df.loc[i, 'Sentiment'])
            new_list = [df_fn]
            df_second = pd.DataFrame(new_list, columns = [''])
            df_second.to_csv(f"{self.Path}{i}.txt",index=False,header=False)
        
        return New_df 
    
    
class Generate_path:
    def __init__(self, Senti_df,Path):
        self.Senti_df = Senti_df
        self.Path = Path
         
    def paths_jsl(self, a, b):
        
        #Generates file paths for sentiment data and saves them to a .csv file.
        
        #Args:
        #    a (str): The beginning of the file name.
        #    b (str): The end of the file name.
            
        #Returns:
        #    DataFrame: A DataFrame containing the generated file paths.
        
        df = self.Senti_df
        row_max = (range(len(df)))
        print("number of registres is: " + str(row_max))
        new_list  = df.to_numpy().tolist()
        list_names = range(len(new_list))
        list_paths = [] 
        
        for i in list_names:
            c = str(i)
            Paths = a + c + b
            list_paths.append(Paths)
        
        New_df = pd.DataFrame (list_paths, columns = ['Paths_off_Sentiment'])
        New_df.to_csv(f'{self.Path}Prediction_Main_Batch.jsonl',index=False,header=False,quotechar=' ')
        return New_df
        
        

class generate_list:
    #Generates a list of formatted integers with a maximum length of `files_user`.
    def create_list(self,files_user):  
        my_list = []
        for i in range(1,files_user+1):
            my_list.append(i)
        formatter = "{:02d}".format
        my_list = list(map(formatter, my_list))
        return my_list
            


class Input_Result:
    def __init__(self, Result):
        # Result (str): Path to the result file.
        self.Result = Result
    
    def Data_Out(self):
        ## Define column names for the resulting DataFrame
        columts = ['1', '2', '3','4','5','Order','Sentiment']
        ## Read the result file and return the resulting DataFrame
        Data = pd.read_csv(self.Result, names=columts,header=None, sep='/')
        return Data

    
    
    

    
def generator_0 (count_files):
    #Read batch prediction
    for i in count_files:
        Results = Input_Result(i)
        Data_Result = Results.Data_Out()
        Data_Recl = Data_Result.loc[:,['Order','Sentiment']]
        yield Data_Recl
    
    

    
def generator_1(Data_Recl):
    #Extract numeric values
    df_num = pd.DataFrame()
    df_num = df_num.fillna(0)
    for i in Data_Recl.columns:
        df_num[i]=Data_Recl [i].str.extract('(\d+(?:\.\d+)?)')
    yield df_num


    
def cycle_1(primer_g,a,row_max,df_final):
    #Combine results from two generators
    while a != row_max:
        state_gene_1 = next(primer_g)
        segundo_g = generator_1(state_gene_1)
        state_gene_2 = next(segundo_g)

        df_final =pd.concat([df_final,state_gene_2],ignore_index=True)
        a = int(len(df_final))
        
    return df_final
    
    
    
    
        
"""This is the main code to start the program in this part we call the different objects"""

def run():
    
    #Call Params to find read data
    Params = Param()
    Route_m = Params.file_input
    print("The Params is: Ok")
    
    
    #Extract the Data Filt
    All_Data = Input_Data(Route_m)
    Data_1 = All_Data.Data_Sent_Filt()
    print("The Data is: Ok")
    
    
    #Extract the Sentiment 
    Extract = Data_Frame(Data_1)
    Data_Sentiment = Extract.Transform()
    print("The sentiment export to list is: Ok")
    
    
    #Generate Data files txt
    Variable_S = Generator(Data_Sentiment, Params.Processing_Batch)
    Sentiments = Variable_S.Iterator()
    print ("The sentiment already exported to txt : Ok ")
 
    
    #Generate list with paths of sentiments 
    list_sent = Generate_path(Data_Sentiment, Params.Path_Control)
    jsonl = list_sent.paths_jsl(Params.complement_ini, Params.complement_end)
    print ("The Paths  prediction exported : OK " )
    
    
    
    #Batch prediction of sentiments
    output_files= batch_prediction_job(Params.project_id, 
                                        Params.location,                     
                                        #change depend of the model offsite - onsite
                                        Params.model_id_on_site, 
                                        Params.job_display_name,
                                        Params.input_uri,
                                        Params.output_uri)
    print("Prediction Batch: ok")
    
    #fix files output batch prediction
    output_files.pop(0)
    string = "gs://batch_predictions_sentiment/"
    new_list = list(map(lambda x: string + str(x), output_files)) 
    print('files of batch is : ok')
    
    
    #Extract the  ALL Data 
    Data_2 = All_Data.Data_Sent_All()
    print("The Data Model is: Ok")
    
    
    #Extract the Sentiment 
    Extract = Data_Frame(Data_2)
    Data_Sentiment = Extract.Transform()
    print("The sentiment export to list is: Ok")
    
    #concat and organize the data of sentiment
    df_final = pd.DataFrame()
    primer_g = generator_0(new_list)
    a = 0
    row_max = (int(len(Data_Sentiment)))
    print("number of registres is: " + str(row_max))
    df_final = cycle_1(primer_g,a,row_max,df_final)
    
    #Order and index of dataframe
    print("Data_frame is complete: OK")
    df_order = df_final.sort_values('Order',ascending=True) 
    df_order['Order'] = df_order['Order'].astype(int)
    df_order = df_order.set_index('Order')
    key_df  = pd.merge(Data_Sentiment, df_order, left_index=True, right_index=True)
    
    #Rename columns
    key_df.rename(columns = {'Sentiment_y':'Sentiment_M', 'Sentiment_x':'Sentiment_H'}, inplace = True)
    
    #Export file
    key_df.to_excel("data/sentiment.xlsx",index=False)
    print("The file is  : OK")
    
if __name__ == '__main__':
    run()

This code performs a batch prediction job on Google AI Platform, using an AI model and input data stored in Google Cloud Storage (GCS). The code uses the Google Cloud Storage and AI Platform APIs, as well as the Pandas and Numpy libraries.

The main function, batch_prediction_job, submits a batch prediction job using the specified AI Platform model, source and destination. The function returns a list of the names of the files in the GCS destination location that contain the prediction results. The function uses the AI Platform API to get a reference to the model and submit the batch prediction job. It also uses the Google Cloud Storage API to get a reference to the GCS destination bucket and retrieve the names of the prediction result files.

The Param class defines a set of parameters that are used by the code, including the project ID, model IDs, input and output URIs for the batch predictions, location for the cloud services, display name for the batch prediction job, and paths for processing and controlling batch data.

The Input_Data class reads the input file specified in the Route parameter and returns either a DataFrame with only the "Id_Model" and "Message" columns or a DataFrame with all columns.

The Data_Frame class transforms the input data by filtering the data to only include the rows where the "Message" column is not empty, and then constructs a JSONL file in the format required by the AI Platform API for batch predictions. The class also uploads the resulting file to GCS.