In [1]:
import os
import json
from io import StringIO
from dotenv import load_dotenv
from pdfminer.high_level import extract_text
from pdfminer.converter import TextConverter
from pdfminer.layout import LAParams
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfparser import PDFParser
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
import csv
import pandas as pd
import docx
import pptx

from groq import Groq

In [2]:
def load_environment_variables():
    load_dotenv()
    categories = os.getenv('CATEGORIES')
    categories = categories.split(',')
    GROQ_KEY = os.getenv("GROQ_API_KEY")
    return categories, GROQ_KEY

In [3]:
def get_prompts(categories):
    SUMMARY_PROMPT = """
    Please read the content of the provided file and provide a concise but informative summary. 
    The purpose of the summary is to rename files based on their content. 
    Ensure the summary is as specific to the file as possible and write your response as a JSON object with the following schema:

    RESPOND WITH JUST THE JSON OBJECT AND NOTHING ELSE (NO ADDITIONAL TEXT OR EXPLANATION).
    MAKE SURE TO FOLLOW THE SCHEMA AND DON'T MAKE ANY MISTAKES.

    {
        "summary": "summary of the content"
    }
    """

    RENAME_PROMPT = """
    Please read the content of the provided file and provide a concise but informative new name for the file without the extension.
    Also provide the category that best fits the file based on the summary from the following category list: {}.

    ONLY CHOOSE A CATEGORY FROM THE LIST PROVIDED. DO NOT MAKE UP CATEGORIES.
    The purpose of the new name is to rename files based on their content.
    Ensure the new name is as specific to the file as possible and write your response as a JSON object with the following schema:

    RESPOND WITH JUST THE JSON OBJECT AND NOTHING ELSE (NO ADDITIONAL TEXT OR EXPLANATION).
    MAKE SURE TO FOLLOW THE SCHEMA AND DON'T MAKE ANY MISTAKES.

    {{
        "new_name": "new name for the file",
        "category": "category of the file"
    }}
    """.format(categories)
    
    return SUMMARY_PROMPT, RENAME_PROMPT

In [4]:
def read_file_contents(file_path):
    file_type = file_path.split('.')[-1].lower()
    
    if file_type == 'csv':
        return read_csv(file_path)
    elif file_type in ['xls', 'xlsx']:
        return read_excel(file_path)
    elif file_type in ['py', 'java', 'cpp', 'c', 'js', 'ts', 'go', 'rb', 'swift', 'kt', 'scala', 'php', 'perl', 'ruby', 'bash', 'sh', 'zsh', 'html']:
        return read_code_file(file_path)
    elif file_type == 'docx':
        return read_docx(file_path)
    elif file_type == 'pptx':
        return read_pptx(file_path)
    elif file_type == 'pdf':
        return read_pdf(file_path)
    elif file_type == 'json':
        return read_json(file_path)
    elif file_type == 'txt':
        return read_txt(file_path)
    else:
        return "Unsupported file type"

In [5]:
def read_csv(file_path):
    with open(file_path, 'r') as file:
        reader = csv.reader(file)
        contents = ['\t'.join(row) for row in reader]
    return contents

In [6]:
def read_excel(file_path):
    df = pd.read_excel(file_path)
    return df.values.tolist()

In [7]:
def read_code_file(file_path):
    with open(file_path, 'r') as file:
        content = file.read()
        return content

In [8]:
def read_docx(file_path):
    doc = docx.Document(file_path)
    contents = [para.text for para in doc.paragraphs]
    return contents

In [9]:
def read_pptx(file_path):
    presentation = pptx.Presentation(file_path)
    contents = [shape.text for slide in presentation.slides for shape in slide.shapes if hasattr(shape, "text")]
    return contents

In [10]:
def read_pdf(file_path):
    output_string = StringIO()
    with open(file_path, 'rb') as in_file:
        parser = PDFParser(in_file)
        doc = PDFDocument(parser)
        rsrcmgr = PDFResourceManager()
        device = TextConverter(rsrcmgr, output_string, laparams=LAParams())
        interpreter = PDFPageInterpreter(rsrcmgr, device)
        for page in PDFPage.create_pages(doc):
            interpreter.process_page(page)
    contents = output_string.getvalue().splitlines()
    return contents

In [11]:
def read_json(file_path):
    with open(file_path, 'r') as file:
        content = json.load(file)
        return content

In [12]:
def read_txt(file_path):
    with open(file_path, 'r') as file:
        content = file.read()
        return content

In [13]:
def summarize_and_rename_file(contents, groq_key, summary_prompt, rename_prompt):
    try:
        client = Groq(api_key=groq_key)
        summary = get_summary(client, summary_prompt, contents)
        name = get_new_name(client, rename_prompt, summary["summary"])
        return summary, name
    except Exception as e:
        print(f"Error summarizing and renaming file: {e}")
        return None, None

In [14]:
def get_summary(client, summary_prompt, contents):
    chat_completion1 = client.chat.completions.create(
        messages=[
            {
                "role": "system",
                "content": summary_prompt
            },
            {
                "role": "user",
                "content": contents,
            }
        ],
        model="llama3-8b-8192"
    )
    summary = chat_completion1.choices[0].message.content
    if summary.find('}') == -1:
        summary += '}'
    summary = summary[(summary.index('{')):(summary.index('}') + 1)]
    return json.loads(summary)

In [15]:
def get_new_name(client, rename_prompt, summary):
    chat_completion2 = client.chat.completions.create(
        messages=[
            {
                "role": "system",
                "content": rename_prompt
            },
            {
                "role": "user",
                "content": summary,
            }
        ],
        model="llama3-8b-8192"
    )
    name = chat_completion2.choices[0].message.content
    if name.find('}') == -1:
        name += '}'
    name = name[(name.index('{')):(name.index('}') + 1)]
    return json.loads(name)

In [16]:
def rename_file(file_path, new_name, category):
    file_extension = os.path.splitext(file_path)[1]
    new_file_path = os.path.join(os.path.dirname(file_path), f"{category}_{new_name}{file_extension}")
    os.rename(file_path, new_file_path)

In [17]:
def read_and_process_all_files_in_directory(directory_path, groq_key, summary_prompt, rename_prompt):
    for filename in os.listdir(directory_path):
        if os.path.isfile(os.path.join(directory_path, filename)) and not os.path.basename(filename).startswith('.'):
            file_path = os.path.join(directory_path, filename)
            contents = read_file_contents(file_path)
            if contents != "Unsupported file type":
                if len(str(contents)) > 15000:
                    contents = str(contents)[:15000]
                summary, name = summarize_and_rename_file(str(contents), groq_key, summary_prompt, rename_prompt)
                if summary and name:
                    rename_file(file_path, name['new_name'], name['category'])
                    print("------------------------")
                    print(f"File: {filename}")
                    print(f"Summary: {summary['summary']}")
                    print(f"New Name: {name['new_name']}")
                    print(f"Category: {name['category']}")
                else:
                    print(f"Error processing file: {filename}")
            else:
                print(f"Unsupported file type: {filename}")

In [18]:
if __name__ == "__main__":
    categories, GROQ_KEY = load_environment_variables()
    SUMMARY_PROMPT, RENAME_PROMPT = get_prompts(categories)
    DIRECTORY_PATH = "data"
    read_and_process_all_files_in_directory(DIRECTORY_PATH, GROQ_KEY, SUMMARY_PROMPT, RENAME_PROMPT)