In [None]:
import logging
from utils import *
import sys

sys.modules['pkg_resources'] = None
logging.basicConfig(level=logging.INFO)

In [None]:
print("set environment variables")
os.environ['OPENAI_API_KEY'] = get_secret("chatKeys", "openaiKey")   
os.environ['storage_account_name'] = 'chatgptv2stn'
os.environ['container_name'] = 'chatgpt-ctn'
os.environ['resource_group_name'] ='chatgptGp'
os.environ['cosmosdb_acc'] ='chatgptdb-acn'
os.environ['database_name']='chatgptdb-dbn'
os.environ['collection_name']='chatgptdb-cln'        
pinecone_dict = get_pinecone_keys()
pinecone_jar, cosmos_jar = set_spark_liraries()    

In [None]:
storage_account_name = get_env_vars()['storage_account_name']
container_name = get_env_vars()['container_name']
resource_group_name = get_env_vars()['resource_group_name'] 
storage_connection_string = os.popen(f"az storage account show-connection-string -g {resource_group_name} -n {storage_account_name} --query connectionString").read().strip()
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_string)
container_client = blob_service_client.get_container_client(container_name)

In [None]:
[item.name for item  in container_client.list_blobs() if item.name.endswith('.pdf')]

In [None]:
spark = SparkSession.builder\
        .appName("chatgpt")\
        .config("spark.jars.packages", f"{pinecone_jar}")\
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [None]:
def extract_text_from_container( list_of_pdf_uploaded =list_filepaths_in_cosmosdb_container()):
    logging.info(f"extract list of pdf and the extracted text in the blob container")
    
    print("get environment variables")
    storage_account_name = get_env_vars()['storage_account_name']
    container_name = get_env_vars()['container_name']
    resource_group_name = get_env_vars()['resource_group_name'] 
    storage_connection_string = os.popen(f"az storage account show-connection-string -g {resource_group_name} -n {storage_account_name} --query connectionString").read().strip()
    blob_service_client = BlobServiceClient.from_connection_string(storage_connection_string)
    container_client = blob_service_client.get_container_client(container_name)
    try:
        extracted_list = [(item.name,tika_parser(BlobServiceClient.from_connection_string(storage_connection_string).get_blob_client(container=container_name,blob= item.name).download_blob().content_as_bytes()))\
                        for item in \
                        ContainerClient.from_connection_string(conn_str=storage_connection_string, container_name=container_name).list_blobs()  \
                        if item.name not in list_of_pdf_uploaded and item.name.endswith('.pdf')]
        return extracted_list
    except Exception as e:
        logging.error(f"error occured while extracting text from pdf {e}")
        return []
    

In [None]:
def chatgpt3 (userinput, temperature=0.7, frequency_penalty=0, presence_penalty=0):
    """ chat with gpt-3.5-turbo, the much cheaper version of gpt-3"""
    
    suffix = "\n\nTl;dr"
    prompt = userinput+suffix
    assistant_prompt =""
    message = [
        {"role": "user", "content": prompt },        
        {"role": "system", "content": "you are a helpful distinguished scholarly assistant that uses efficient \
         communication to help finish the task of concisely summarizing an article by summarizing the most pertinent essence of the text as part of a paragraph. \
         use the fewest words as possible in english"}
         ]
    try:
        openai.api_key = get_env_vars()['OPENAI_API_KEY']
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            temperature=temperature,
            frequency_penalty=frequency_penalty,
            presence_penalty=presence_penalty,
            messages=message
        )
        text = response['choices'][0]['message']['content']
        return text
    except Exception as e:
        logging.error(f"error occured while chatting with gpt-3.5-turbo {e}")
        return None



In [None]:
def cheaper_summarizer(text, title,temperature=0.7, frequency_penalty=0, presence_penalty=0,api_key=None):
    """ chat with gpt-3.5-turbo, the much cheaper version of gpt-3"""        
    
    if text is None:
        print(f"there is no text to summarize - Skipping {title}")
        return ''
    else:
        try:
            print(f"Summarizing {title} for {len(text)} characters")
            #split text into chunks
            chunks = split_text(text)
            max_retry = 3
            retry = 0
            while retry < max_retry:
                try:
                    summaries = ' \n'.join([chatgpt3(chunk, temperature=temperature, frequency_penalty=frequency_penalty, presence_penalty=presence_penalty) for chunk in chunks])
                    break
                except Exception as e:
                    print(f"Exception: {e} - Retrying {title}") 
                    retry += 1
                    sleep(5)
                    continue                
            return summaries
        except Exception as e:
            print(f"Exception: {e} - Skipping {title}")
            return ''
   

In [None]:
def create_id(folder, typeofDoc, subject, author, title):
    """ create id field for cosmos db """
    # create a string to hash
    my_string = f"{folder}{typeofDoc}{subject}{author}{title}"
    # create a hash object using the SHA-256 algorithm
    hash_object = hashlib.sha256()
    # update the hash object with the string to be hashed
    hash_object.update(my_string.encode())
    # get the hexadecimal representation of the hash
    hex_dig = hash_object.hexdigest()
    return hex_dig

In [None]:
def extract_title(pdf_path):
    logging.info(f"extract metatdata from a pdf path")    
    lst = pdf_path.replace('..','').split('/')[1:]
    return lst

In [None]:
def write_to_cosmosdb(items):
    
    resource_group_name =  get_env_vars()['resource_group_name']
    cosmosdb_acc =         get_env_vars()['cosmosdb_acc']
    database_name =        get_env_vars()['database_name']
    collection_name =      get_env_vars()['collection_name']
    connecting_string = os.popen(f"az cosmosdb keys list --type connection-strings --resource-group {resource_group_name}\
                              --name {cosmosdb_acc} | jq '.connectionStrings[0].connectionString' ").read().strip().replace('"','')
    
    mongo_client = pymongo.MongoClient(connecting_string)
    collection = mongo_client[database_name][collection_name]

    for item in items:
        print(item['summary'])
        collection.update_one({"id": item["id"]}, {"$set": item}, upsert=True)

In [None]:
preprocess_text_list =  spark.sparkContext.parallelize(extract_text_from_container()).collect()


In [None]:
spark.sparkContext.parallelize(preprocess_text_list).map(lambda x: (x[0],preprocess_text(x[1]))).filter(lambda x: x[1] !='').\
                                                     map(lambda x: {
                                                                                            "Filepath": x[0],
                                                                                            "Metadata": {
                                                                                                "folder": extract_title(x[0])[0],
                                                                                                "typeofDoc": extract_title(x[0])[1],
                                                                                                "subject": extract_title(x[0])[2],
                                                                                                "author": extract_title(x[0])[3],
                                                                                                "title": extract_title(x[0])[4].split('.')[0]
                                                                                            },
                                                                                            "text": x[1],
                                                                                            "summary": cheaper_summarizer(x[1], extract_title(x[0])),
                                                                                            "id": create_id(
                                                                                                extract_title(x[0])[0],  # folder
                                                                                                extract_title(x[0])[1],  # typeofDoc
                                                                                                extract_title(x[0])[2],  # subject
                                                                                                extract_title(x[0])[3],  # author
                                                                                                extract_title(x[0])[4].split('.')[0],  # title
                                                                                            ),
                                                                                            "uploadDate": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                                                                                        }).foreachPartition(lambda x: write_to_cosmosdb(x))

In [None]:
load_to_cosmosdb_rdd = spark.sparkContext.parallelize(preprocess_text_list[0]).map(lambda x: (x[0], preprocess_text(x[1]))).\
                                    map(lambda x: {
                                            "Filepath": x[0],
                                            "Metadata": {
                                                "folder": extract_title(x[0])[0],
                                                "typeofDoc": extract_title(x[0])[1],
                                                "subject": extract_title(x[0])[2],
                                                "author": extract_title(x[0])[3],
                                                "title": extract_title(x[0])[4].split('.')[0]
                                            },
                                            "text": x[1],
                                            "summary": cheaper_summarizer(x[1], extract_title(x[0])),
                                            "id": create_id(
                                                extract_title(x[0])[0],  # folder
                                                extract_title(x[0])[1],  # typeofDoc
                                                extract_title(x[0])[2],  # subject
                                                extract_title(x[0])[3],  # author
                                                extract_title(x[0])[4].split('.')[0],  # title
                                            ),
                                            "uploadDate": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                                        }).foreachPartition(lambda x: write_to_cosmosdb(x))

In [None]:




def main():

    #set variables to be used in this ETL process

    print("set environment variables")
    os.environ['OPENAI_API_KEY'] = get_secret("chatKeys", "openaiKey")   
    os.environ['storage_account_name'] = 'chatgptv2stn'
    os.environ['container_name'] = 'chatgpt-ctn'
    os.environ['resource_group_name'] ='chatgptGp'
    os.environ['cosmosdb_acc'] ='chatgptdb-acn'
    os.environ['database_name']='chatgptdb-dbn'
    os.environ['collection_name']='chatgptdb-cln'        
    pinecone_dict = get_pinecone_keys()
    pinecone_jar, cosmos_jar = set_spark_liraries()    
    
    
    

        
    #blob storage loading

    print("load blob storage")
    pdf_paths = [item for item in list_pdfblobs()][:1]
    print(f"number of pdf files: {len(pdf_paths)}")

    
    print("starting spark session")    
    spark = SparkSession.builder\
        .appName("chatgpt")\
        .config("spark.jars.packages", f"{pinecone_jar}")\
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    print(pdf_paths)        
    
    print("create the cosmosdb rdd")

    preprocess_text_list =  spark.sparkContext.parallelize(extract_text_from_container()).collect()

    
    load_to_cosmosdb_rdd = spark.sparkContext.parallelize(preprocess_text_list).map(lambda x: (x[0], preprocess_text(x[1]))).\
                                    map(lambda x: {
                                            "Filepath": x[0],
                                            "Metadata": {
                                                "folder": extract_title(x[0])[0],
                                                "typeofDoc": extract_title(x[0])[1],
                                                "subject": extract_title(x[0])[2],
                                                "author": extract_title(x[0])[3],
                                                "title": extract_title(x[0])[4].split('.')[0]
                                            },
                                            "text": x[1],
                                            "summary": cheaper_summarizer(x[1], extract_title(x[0])),
                                            "id": create_id(
                                                extract_title(x[0])[0],  # folder
                                                extract_title(x[0])[1],  # typeofDoc
                                                extract_title(x[0])[2],  # subject
                                                extract_title(x[0])[3],  # author
                                                extract_title(x[0])[4].split('.')[0],  # title
                                            ),
                                            "uploadDate": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                                        }).foreachPartition(lambda x: write_to_cosmosdb(x))
    

    # print("create the pinecone rdd and write to pinecone")
    # load_data_to_pinecone_rdff = spark.sparkContext.parallelize(get_data_from_cosmosdb()).map(lambda x : ([x['text'],x['Metadata']])).\
    #                                             map(lambda x : get_pincone_pdfdata(x[0],x[1])).\
    #                                             foreach(lambda x : upsert_pinecone_data(x))
                                                

    
    
    
if __name__ == "__main__":
    main()

    
