In [42]:
import re
from PIL import Image
import pytesseract
import pdfplumber
from pdf2image import convert_from_path
from fpdf import FPDF
import openai
import os
from datetime import datetime
import requests
import streamlit as st
import io

#hardcoded variables
OUTPUT_PATH = '/output'
EXAMPLE_DOC = 'contract.pdf'
SIGNATURE_PATH = 'signature.png'

In [36]:
# Get the OpenAI API key from the environment variable
api_key = ""
if not api_key:
    st.error("API key not found. Please set the OPENAI_API_KEY environment variable.")
else:
    openai.api_key = api_key

In [4]:
def extract_text_from_image(image_path):
    try:
        image = Image.open(image_path)
        text = pytesseract.image_to_string(image)
        return text
    except Exception as e:
        print(f"Error extracting text from image: {e}")
        return None

In [5]:
def extract_text_from_pdf(file_path):
    try:
        text = ""
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                text += page.extract_text()
        return text
    except Exception as e:
        print(f"Error extracting text from PDF: {e}")
        return None

In [6]:
def extract_contract_info(text):
    try:
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": f"Analyze the following contract text and extract the name of the person as a quitting party, company with whom we want to cancel the contract, contract number, date of birth:\n\n{text}\n\n"}
        ]
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=messages,
            max_tokens=500
        )
        analysis_result = response.choices[0].message["content"].strip()

        # Initialize the dictionary to store the extracted information
        extracted_info = {
            "company": [],
            "contract_number": [],
            "date_of_birth": [],
            "quitting_party": []
        }

        # Here, you can implement a parsing logic based on the expected format of analysis_result
        lines = analysis_result.split('\n')
        for line in lines:
            if "Company:" in line:
                extracted_info["company"].append(line.split("Company:")[1].strip())
            if "Contract Number:" in line:
                extracted_info["contract_number"].append(line.split("Contract Number:")[1].strip())
            if "Date of Birth:" in line:
                extracted_info["date_of_birth"].append(line.split("Date of Birth:")[1].strip())
            if "Quitting Party:" in line:
                extracted_info["quitting_party"].append(line.split("Quitting Party:")[1].strip())

        return extracted_info

    except Exception as e:
        print(f"Error analyzing contract: {e}")
        return None

In [7]:
def analyze_contract(text):
    try:
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": f"Analyze the following contract text and extract the contract party, contract number, date of birth, and quitting party:\n\n{text}\n\n"}
        ]
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=messages,
            max_tokens=500
        )
        analysis_result = response.choices[0].message["content"].strip()
        return analysis_result
    except Exception as e:
        print(f"Error analyzing contract: {e}")
        return None

def extract_contract_info(text):
    try:
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": f"Analyze the following contract text and extract the name of the person as a quitting party, company with whom we want to cancel the contract, contract number, date of birth:\n\n{text}\n\n"}
        ]
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=messages,
            max_tokens=500
        )
        analysis_result = response.choices[0].message["content"].strip()

        # Initialize the dictionary to store the extracted information
        extracted_info = {
            "company": [],
            "contract_number": [],
            "date_of_birth": [],
            "quitting_party": []
        }

        # Here, you can implement a parsing logic based on the expected format of analysis_result
        lines = analysis_result.split('\n')
        for line in lines:
            if "Company:" in line:
                extracted_info["company"].append(line.split("Company:")[1].strip())
            if "Contract Number:" in line:
                extracted_info["contract_number"].append(line.split("Contract Number:")[1].strip())
            if "Date of Birth:" in line:
                extracted_info["date_of_birth"].append(line.split("Date of Birth:")[1].strip())
            if "Quitting Party:" in line:
                extracted_info["quitting_party"].append(line.split("Quitting Party:")[1].strip())

        return extracted_info

    except Exception as e:
        print(f"Error analyzing contract: {e}")
        return None

In [37]:
def analyze_and_extract_contract_info(text):
    try:
        # Define the messages for the API request
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": f"Analyze the following contract text and extract the company, contract number, date of birth, and quitting party as a name of the person:\n\n{text}\n\n"}
        ]

        # Call the OpenAI API to analyze the contract text
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=messages,
            max_tokens=500
        )

        # Extract the response content
        analysis_result = response.choices[0].message["content"].strip()

        # Initialize the dictionary to store the extracted information
        extracted_info = {
            "company": [],
            "contract_number": [],
            "date_of_birth": [],
            "quitting_party": []
        }

        # Parsing logic based on the expected format of analysis_result
        lines = analysis_result.split('\n')
        for line in lines:
            if "Company:" in line:
                extracted_info["company"].append(line.split("Company:")[1].strip())
            elif "Contract Number:" in line:
                extracted_info["contract_number"].append(line.split("Contract Number:")[1].strip())
            elif "Date of Birth:" in line:
                extracted_info["date_of_birth"].append(line.split("Date of Birth:")[1].strip())
            elif "Quitting Party:" in line:
                extracted_info["quitting_party"].append(line.split("Quitting Party:")[1].strip())

        return extracted_info

    except Exception as e:
        print(f"Error analyzing contract: {e}")
        return None

In [9]:
import io

def generate_termination_pdf(data):
    try:
        pdf = FPDF()
        pdf.add_page()
        pdf.set_font("Arial", size=12)

        # Add contract party information
        pdf.cell(200, 10, txt="Max Mustermann · Kaulbachstraße 60 · 80539 München", ln=True, align='C')
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add contract party and contract number information
        pdf.cell(200, 10, txt=f"{data['parties']}", ln=True)
        pdf.cell(200, 10, txt="40875 Ratingen", ln=True)
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add contract number and date of birth information
        pdf.cell(200, 10, txt=f"Vodafone-Handynummer: {data['contract_number']}  Geburtsdatum: {data['date_of_birth']}", ln=True)
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add termination information
        pdf.set_font("Arial", size=14, style='B')
        pdf.cell(200, 10, txt="Kündigung zum nächstmöglichen Zeitpunkt", ln=True)
        pdf.set_font("Arial", size=12)
        pdf.cell(200, 10, txt="28.06.2024", ln=True, align='R')
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add termination letter content
        pdf.multi_cell(200, 10, txt="Sehr geehrte Damen und Herren,\n\n"
                                    "hiermit kündige ich meinen Vertrag fristgerecht zum nächstmöglichen Zeitpunkt. "
                                    "Bitte senden Sie mir eine schriftliche Bestätigung der Kündigung unter Angabe des Beendigungszeitpunktes zu.\n\n"
                                    "Mit freundlichen Grüßen\n\n")
        pdf.set_font("Arial", size=16, style='B')
        pdf.cell(200, 10, txt=f"{data['quitting_party']}", ln=True)
        pdf.set_font("Arial", size=12)
        pdf.cell(200, 10, txt=f"{data['quitting_party']}", ln=True)

        # Save PDF to a binary stream instead of a file
        pdf_output = io.BytesIO()
        pdf.output(pdf_output)
        pdf_output.seek(0)
        
        return pdf_output.read()

    except Exception as e:
        print(f"Error generating PDF: {e}")
        return None

In [10]:
def generate_signature_with_dalle(prompt):
    try:
        response = openai.Image.create(
            prompt=prompt,
            n=1,
            size="256x256"
        )
        image_url = response['data'][0]['url']
        return image_url
    except Exception as e:
        print(f"Error generating image with DALL-E: {e}")
        return None

In [11]:
#from google.colab import drive
#drive.mount('/content/drive')

# 确保文件路径正确
pdf_path = EXAMPLE_DOC
print(f"PDF path: {EXAMPLE_DOC}")


PDF path: contract.pdf


In [15]:
pdf_text = extract_text_from_pdf(pdf_path)

if pdf_text:
    print("Extracted text from PDF:")
    print(pdf_text)
    analysis_result = analyze_contract(pdf_text)
    if analysis_result:
        print("Analysis result from GPT-4:")
        print(analysis_result)
        contract_data = extract_text_from_pdf(analysis_result)
        if contract_data:
            print("Contract data extracted:")
            print(contract_data)
            generate_termination_pdf(contract_data)
            # List files to check if the output PDF is generated
            print(os.listdir(OUTPUT_PATH))
else:
    print("Failed to extract text from PDF.")

Extracted text from PDF:
Ein Unternehmen
der Stadtwerke München
Münchner Verkehrsgesellschaft mbH (MVG) . MVG AboCenter 09. April 2024
80287 München
MVG AboService:
0800 344 22 66 11
Johann-Leopold von Gerlach Mo. bis Do.8 17 Uhr
Kaulbachstrasse 59 Fr. 8 15 Uhr
gebührenfreie Servicenummer
80539 München
abocenter@mvg.de
MVG Kundencenter
Hauptbahnhof:
Zwischengeschoss
Mo. bis Fr. 8 20 Uhr
Sa. 9 16 Uhr
MVG Kundencenter
Marienplatz:
Zwischengeschoss
Mo. bis Fr. 8 20 Uhr
Sa. 9 16 Uhr
Münchner
Verkehrsgesellschaft mbH (MVG)
Postanschrift
80287 München
Hausanschrift
Emmy-Noether-Straße 2
80992 München
Telefon: +49 89 2191-0
www.mvg.de
Geschäftsführung
Ingo Wortmann (Vorsitzender)
Veit Bodenschatz
Alexandra Diessner
Oliver Glaser
Dr. Gabriele Jahn
Aufsichtsratsvorsitzender
Oberbürgermeister Dieter Reiter
Handelsregister
Amtsgericht München,
HRB 140658
USt-IdNr.
DE813357346
Gläubiger-ID
DE1077000000034030
Bankverbindung
HypoVereinsbank München
IBAN DE33 7002 0270 0000 0916 00
BIC HYVEDEMMXXX
96

In [16]:
#result1 = analyze_contract(pdf_text)
result2 = analyze_and_extract_contract_info(pdf_text)
result2

{'company': ['Münchner Verkehrsgesellschaft mbH (MVG)'],
 'contract_number': ['74159918-2'],
 'date_of_birth': ['Not specified in the contract text.'],
 'quitting_party': ['Johann-Leopold von Gerlach']}

In [39]:
def generate_signature(name):
    prompt = (
        f"Generate a realistic handwritten signature for the name '{name}'. "
        "The signature should be elegant, clear, and written in a cursive style. "
        "The background should be transparent, and the signature should be centered "
        "with no additional text or decorations. Ensure that the handwriting appears "
        "natural and fluid, resembling an authentic signature."
    )

    response = openai.Image.create(
        prompt=prompt,
        n=1,
        size="256x256"
    )

    image_url = response['data'][0]['url']
    
    # Download the image
    image_response = requests.get(image_url)
    image = Image.open(io.BytesIO(image_response.content))
    
    # Convert image to binary format
    img_byte_arr = io.BytesIO()
    image.save(img_byte_arr, format='PNG')
    img_byte_arr = img_byte_arr.getvalue()
    
    return img_byte_arr


# Example usage
signature_image = generate_signature(result2['quitting_party'][0])

# Save the signature image to a file for demonstration purposes
with open("signature.png", "wb") as f:
    f.write(signature_image)


In [41]:
def generate_termination_pdf(data, SIGNATURE_PATH):
    # Extract values from data
    contract_number = data['contract_number'][0] if isinstance(data['contract_number'], list) and data['contract_number'] else ''
    company = data['company'][0] if isinstance(data['company'], list) and data['company'] else ''
    quitting_party = data['quitting_party'][0] if isinstance(data['quitting_party'], list) and data['quitting_party'] else ''
    
    date_pattern = re.compile(r'\b\d{2}\.\d{2}\.\d{4}\b')
    if isinstance(data['date_of_birth'], list) and data['date_of_birth']:
        candidate = data['date_of_birth'][0]
        if date_pattern.match(candidate):
            date_of_birth = candidate
        else:
            date_of_birth = None
    else:
        date_of_birth = None

    try:
        pdf = FPDF()
        pdf.add_page()
        pdf.set_font("Arial", size=12)

        # Add contract party information
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add company and contract number information
        pdf.cell(200, 10, txt=f"{company}", ln=True)
        pdf.cell(200, 10, txt=" ", ln=True)
        pdf.cell(200, 10, txt=" ", ln=True)
        pdf.cell(200, 10, txt=" ", ln=True)
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add contract number and date of birth information
        pdf.cell(200, 10, txt=f"Contract Number: {contract_number}", ln=True) 
        if date_of_birth is not None:
            pdf.cell(200, 10, txt=f"Date of Birth: {date_of_birth}", ln=True) 
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add termination information
        pdf.set_font("Arial", size=14, style='B')
        pdf.cell(200, 10, txt="Termination at the next possible date", ln=True)
        pdf.set_font("Arial", size=12)
        today_date = datetime.today().strftime('%d.%m.%Y')
        pdf.cell(200, 10, txt=today_date, ln=True, align='R')
        pdf.cell(200, 10, txt="", ln=True)  # Empty line
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add termination letter content
        pdf.multi_cell(200, 10, txt="Dear Sir or Madam,\n\n"
                                    "I hereby give notice of termination of my contract with effect from the next possible date. \n"
                                    "Please send me a written confirmation of the termination stating the date of termination.\n\n"
                                    "Never text here again.\n\n")
        pdf.set_font("Arial", size=16, style='B')
        pdf.cell(200, 10, txt=f"{quitting_party}", ln=True)
        pdf.set_font("Arial", size=12)
        pdf.cell(200, 10, txt=f"{quitting_party}", ln=True)

        # Add signature image
        pdf.image(SIGNATURE_PATH)

        # Save PDF to a binary stream instead of a file
        pdf_output = io.BytesIO()
        pdf_content = pdf.output(dest='S').encode('latin1')
        pdf_output.write(pdf_content)
        pdf_output.seek(0)
        
        return pdf_output.read()

    except Exception as e:
        print(f"Error generating PDF: {e}")
        return None


In [45]:
def generate_termination_pdf(data, signature_image):
    contract_number = data['contract_number'][0] if isinstance(data['contract_number'], list) and data['contract_number'] else ''
    company = data['company'][0] if isinstance(data['company'], list) and data['company'] else ''
    quitting_party = data['quitting_party'][0] if isinstance(data['quitting_party'], list) and data['quitting_party'] else ''
    
    date_pattern = re.compile(r'\b\d{2}\.\d{2}\.\d{4}\b')
    if isinstance(data['date_of_birth'], list) and data['date_of_birth']:
        candidate = data['date_of_birth'][0]
        if date_pattern.match(candidate):
            date_of_birth = candidate
        else:
            date_of_birth = None
    else:
        date_of_birth = None

    try:
        pdf = FPDF()
        pdf.add_page()
        pdf.set_font("Arial", size=12)

        # Add contract party information
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add company and contract number information
        pdf.cell(200, 10, txt=f"{company}", ln=True)
        pdf.cell(200, 10, txt=" ", ln=True)
        pdf.cell(200, 10, txt=" ", ln=True)
        pdf.cell(200, 10, txt=" ", ln=True)
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add contract number and date of birth information
        pdf.cell(200, 10, txt=f"Contract Number: {contract_number}", ln=True) 
        if date_of_birth is not None:
            pdf.cell(200, 10, txt=f"Date of Birth: {date_of_birth}", ln=True) 
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add termination information
        pdf.set_font("Arial", size=14, style='B')
        pdf.cell(200, 10, txt="Termination at the next possible date", ln=True)
        pdf.set_font("Arial", size=12)
        today_date = datetime.today().strftime('%d.%m.%Y')
        pdf.cell(200, 10, txt=today_date, ln=True, align='R')
        pdf.cell(200, 10, txt="", ln=True)  # Empty line
        pdf.cell(200, 10, txt="", ln=True)  # Empty line

        # Add termination letter content
        pdf.multi_cell(200, 10, txt="Dear Sir or Madam,\n\n"
                                    "I hereby give notice of termination of my contract with effect from the next possible date. \n"
                                    "Please send me a written confirmation of the termination stating the date of termination.\n\n"
                                    "Never text here again.\n\n")
        pdf.set_font("Arial", size=16, style='B')
        pdf.cell(200, 10, txt=f"{quitting_party}", ln=True)
        pdf.set_font("Arial", size=12)
        pdf.cell(200, 10, txt=f"{quitting_party}", ln=True)

        # Add signature image
        signature_io = io.BytesIO(signature_image)
        image = Image.open(signature_io)
        image_path = "signature_temp.png"
        image.save(image_path)
        pdf.ln(10)
        pdf.image(image_path, x=10, y=pdf.get_y(), w=50)

        # Save PDF to a binary stream instead of a file
        pdf_output = io.BytesIO()
        pdf_content = pdf.output(dest='S').encode('latin1')
        pdf_output.write(pdf_content)
        pdf_output.seek(0)
        
        return pdf_output.read()

    except Exception as e:
        print(f"Error generating PDF: {e}")
        return None

# Generate the PDF with the signature image
pdf_content = generate_termination_pdf(result2, SIGNATURE_PATH)

# Save to a file for demonstration purposes
if pdf_content:
    with open("termination_letter.pdf", "wb") as f:
        f.write(pdf_content)


Error generating PDF: a bytes-like object is required, not 'str'
