In [6]:
##自傳內容
from google.colab import drive
drive.mount('/content/drive')
import pandas as pd
import pdfplumber
import os
from opencc import OpenCC
import numpy as np
import ast
import jieba
from datetime import datetime
import re
from dateutil.relativedelta import relativedelta
from tabula import read_pdf
import PyPDF2
import fitz  # PyMuPDF
import pytesseract
from PIL import Image
import io


def extract_text_with_keyword(pdf_path, keyword, keyword2):
    try:
        with pdfplumber.open(pdf_path) as pdf:
            for page in pdf.pages:
                page_text = page.extract_text()
                if keyword in page_text:
                    index_keyword = page_text.find(keyword)
                    index_keyword2 = page_text.find(keyword2, index_keyword)
                    if index_keyword2 != -1:
                        extracted_text = page_text[index_keyword2 + len(keyword2):]
                        # Find the last period in the extracted text
                        last_period_index = extracted_text.rfind("。")
                        if last_period_index != -1:
                            extracted_text = extracted_text[:last_period_index+1]  # Include the last period
                        return extracted_text.strip()  # Remove leading/trailing whitespace
    except Exception as e:
        print(f"Error extracting text from {pdf_path}: {e}")
    return ""

def process_pdfs_in_folder(folder_path, keyword, keyword2):
    data = []

    for filename in os.listdir(folder_path):
        if filename.endswith(".pdf"):
            pdf_path = os.path.join(folder_path, filename)
            id_column = filename[:10]  # Extract first 10 characters of filename

            # Extract text containing keyword
            extracted_text = extract_text_with_keyword(pdf_path, keyword, keyword2)
            
            data.append({"id": id_column, "intro": extracted_text or "No text extracted"})

    return data

keyword = "自傳內容"
keyword2 = "---------------------------------------------------------------------"
output_csv = '自傳.csv'

# Specify the folder paths containing PDFs
folder_paths = [
    ""
]

# Process PDFs in the specified folders and combine the results
combined_data = []
for folder_path in folder_paths:
    data = process_pdfs_in_folder(folder_path, keyword,keyword2)
    combined_data.extend(data)

# Create DataFrame from combined data
df = pd.DataFrame(combined_data)
df = df.drop_duplicates(subset=df.columns[0], keep='first')
# Write DataFrame to CSV
df.to_csv(output_csv, index=False)


In [7]:


def is_standard_resume_page(text):
    # Add keywords or patterns related to "制式招募履歷表"
    keywords = ["甄才類別", "standard recruitment resume"]
    return any(keyword in text for keyword in keywords)

def contains_job_description(text):
    # Check if the text contains "職務與工作內容"
    return "職務與工作內容" in text

def extract_page_content(pdf_path, page_number, filename, output_folder):
    # Try to read tables from the PDF
    dfs = read_pdf(pdf_path, pages=page_number, lattice=True, multiple_tables=True)
    
    if dfs and not dfs[0].empty:  # If tables are found
        # Concatenate the dataframes
        concatenated_df = pd.concat(dfs, ignore_index=True)

        # Check the number of rows in the dataframe
        num_rows = len(concatenated_df)

        # Determine whether to save the CSV file or not based on the number of rows
        if num_rows < 10:
            print(f"{filename}: 第 {page_number} 頁 - 不儲存，因為行數小於10")
            return


        else:
           # Find the existing CSV file
            csv_file = os.path.join(output_folder, f"{filename}履歷.csv")

            # Check if the CSV file already exists
            if os.path.exists(csv_file):
                # Read the existing CSV file to compare the number of rows
                existing_df = pd.read_csv(csv_file)

                # Check the number of rows in the existing CSV file
                existing_num_rows = len(existing_df)

                # Check the number of rows in the current DataFrame
                current_num_rows = len(concatenated_df)

                if current_num_rows > existing_num_rows:
                    # Overwrite the existing CSV file
                    concatenated_df.to_csv(csv_file, index=False, header=False)
                    print(f"{filename}: 第 {page_number} 頁 - 覆蓋原本的 CSV 檔案 (履歷)")
                else:
                    print(f"{filename}: 第 {page_number} 頁 - 跳過 (行數不多於原本的 CSV 檔案)")

        # Save the CSV file
        concatenated_df.to_csv(csv_file, index=False, header=False)
        print(f"{filename}: 第 {page_number} 頁 - 儲存為 CSV 檔案 (履歷)")

    
    # If no content found
    return f"{filename}: 第 {page_number} 頁 - 跳過 (無內容)"

def process_first_five_pdfs(folder_path, output_folder):
    # Get the first five PDF files in the specified folder
    pdf_files = [f for f in os.listdir(folder_path) if f.endswith(".pdf")]

    # Process each of the first five PDFs
    for filename in pdf_files:
        pdf_path = os.path.join(folder_path, filename)
        reader = PyPDF2.PdfReader(pdf_path)
        total_pages = len(reader.pages)

        # Iterate through each page of the PDF
        for page_number in range(1, total_pages + 1):
            result = extract_page_content(pdf_path, page_number, os.path.splitext(filename)[0], output_folder)
            print(result)

# Specify the folder paths
folder_path = ''
output_folder = ''

# Process the first five PDFs in the specified folder and save CSVs in the output folder
process_first_five_pdfs(folder_path, output_folder)



A123456789: 第 1 頁 - 覆蓋原本的 CSV 檔案 (履歷)
A123456789: 第 1 頁 - 儲存為 CSV 檔案 (履歷)
A123456789: 第 1 頁 - 跳過 (無內容)
A123456789: 第 2 頁 - 跳過 (無內容)
B123456789: 第 1 頁 - 覆蓋原本的 CSV 檔案 (履歷)
B123456789: 第 1 頁 - 儲存為 CSV 檔案 (履歷)
B123456789: 第 1 頁 - 跳過 (無內容)
B123456789: 第 2 頁 - 跳過 (無內容)


In [8]:


############################ 撈出資料 ###################### 
def clean_items(items):
    clean_items = [item.replace(' ', '').replace('\r', '').replace('\n', '').replace('(最高)', '').replace('(次高)', '') for item in items if isinstance(item, str) and '專業證照' not in item]
    clean_items = [x if x != "" else None for x in clean_items]
    return clean_items

def extract_and_clean_data(data, resume_row, keyword_1, keyword_2, length):
    data_section = []  
    while keyword_1 not in str(resume_row):
        resume_row = next(resume_tuples)
    resume_row = next(resume_tuples)  # 获取 '學歷背景' 下一行的数据

    while keyword_2 not in str(resume_row):
        data_section.append(resume_row)
        resume_row = next(resume_tuples)

    for s in data_section:
    # Check if s[0] is a number and not NaN
        if isinstance(s[1], str):
            data.extend(clean_items(s))

    data.extend([''] * (length - len(data)))
    return data, resume_row


all_data = []
directory_path_name = ['']
for path in directory_path_name:
    file_names = os.listdir(f'{path}')

    resume_names = [i for i in file_names if "履歷" in i]

    for name in resume_names:
        resume_df = pd.read_csv(f'{path}/{name}', header=None)
        resume_tuples = resume_df.itertuples(index=False, name=None)

        data = [f'{name[:10]}']
        data.append(f'{path}')
        licience = []
        try:
            resume_row = None
            data, resume_row = extract_and_clean_data(data, resume_row, '學歷背景', '工作經驗', 10)
            data, resume_row = extract_and_clean_data(data, resume_row, '工作經驗', '外語能力', 26)

            data.append(resume_row[1]) # 外語能力

            while '專業證照' not in str(resume_row):
                resume_row = next(resume_tuples)
            
            licience = []

            while True:
                licience.extend(resume_row)
                resume_row = next(resume_tuples)


        except StopIteration:
            pass
        data.append(clean_items(licience))
        all_data.append(data)

# Convert the list of lists to a DataFrame
result_df = pd.DataFrame(all_data)


# 定义转换函数，将所有的民国年转换为公元年
def convert_minguo_to_ad(date_range_str):
    import re
    # 确保输入是字符串类型
    if not isinstance(date_range_str, str):
        return date_range_str  # 如果不是字符串，直接返回原值
    
    # 查找所有的年份
    matches = re.findall(r'(\d+)年', date_range_str)
    
    # 对每一个匹配的年份进行检查和可能的转换
    for match in matches:
        if len(match) < 4:  # 如果年份长度小于4位数，视为民国年份
            ad_year = str(int(match) + 1911)  # 民国年转公元年
            date_range_str = date_range_str.replace(match + '年', ad_year + '年', 1)
    return date_range_str

# 应用转换函数到DataFrame的指定列
result_df.iloc[:, 5] = result_df.iloc[:, 5].apply(convert_minguo_to_ad)
result_df.iloc[:, 9] = result_df.iloc[:, 9].apply(convert_minguo_to_ad)
result_df.iloc[:, 13] = result_df.iloc[:, 13].apply(convert_minguo_to_ad)
result_df.iloc[:, 17] = result_df.iloc[:, 17].apply(convert_minguo_to_ad)
result_df.iloc[:, 21] = result_df.iloc[:, 21].apply(convert_minguo_to_ad)
result_df.iloc[:, 25] = result_df.iloc[:, 25].apply(convert_minguo_to_ad)

####################################################################


#####################   轉成繁體中文  ########################

# 創建簡繁轉換器
cc = OpenCC('s2twp')  # 簡體中文轉換為繁體中文（台灣標準）

# 將 merged_df 中的所有欄位的每個值進行簡繁轉換
def convert_cell(x):
    if isinstance(x, list):
        return [cc.convert(str(item)) for item in x]
    elif isinstance(x, str):
        return cc.convert(x)
    else:
        return x

result_df = result_df.applymap(convert_cell)
#############################################################




############################ 整理履歷欄位 ###################### 

# Drop duplicate rows based on the first column (index 0)
result_df = result_df.drop_duplicates(subset=result_df.columns[0], keep='first')
# Create a new column containing the lists from the first four columns
result_df['33'] = result_df.apply(lambda row: row[2:6].tolist(), axis=1)
result_df['34'] = result_df.apply(lambda row: row[6:10].tolist(), axis=1)
result_df['35'] = result_df.apply(lambda row: row[10:14].tolist(), axis=1)
result_df['36'] = result_df.apply(lambda row: row[14:18].tolist(), axis=1)
result_df['37'] = result_df.apply(lambda row: row[18:22].tolist(), axis=1)
result_df['38'] = result_df.apply(lambda row: row[22:25].tolist(), axis=1)
result_df['edu'] = result_df.apply(lambda row: row[28:30].tolist(), axis=1)
result_df['experience'] = result_df.apply(lambda row: row[30:34].tolist(), axis=1)
result_df = result_df.drop(result_df.columns[2:26], axis=1)
result_df = result_df.drop(result_df.columns[4:10], axis=1)


# Rename columns
result_df = result_df.rename(columns={result_df.columns[0]: 'id',
                                      result_df.columns[1]: 'test',
                                      result_df.columns[2]: 'language_ability',
                                      result_df.columns[3]: 'license'})

# Reorder the columns to have the desired order
result_df = result_df[['test','id', 'edu', 'language_ability', 'experience', 'license']]
print(result_df)
####################################################################





#########################加入額外寫的工作經歷#############################

def work_exp_data(resume_row):
    work_exp = []
    data_section = []  
    while '服務機構' not in str(resume_row):
        resume_row = next(resume_tuples)
    resume_row = next(resume_tuples)  # 获取 '學歷背景' 下一行的数据

    while '合計年資' not in str(resume_row):
        data_section.append(resume_row)
        resume_row = next(resume_tuples)
    data_section.append(resume_row)

    for w in data_section:
        # Check if the elements in w is not NaN
        w = [str(x).replace(' ', '').replace('\r', '').replace('\n', '') for x in w if x == x]
        if len(w) == 7:
            work_exp.extend(w[1: 8])
        elif '合計年資' in w:
            desired_length = 60
            if len(work_exp) < desired_length:
                # 如果长度小于60，添加空字符串
                work_exp.extend([''] * (desired_length - len(work_exp)))
            elif len(work_exp) > desired_length:
                # 如果长度大于42，截断到前20个元素
                work_exp = work_exp[:desired_length]

            work_exp.append(w[1])
        else:
            work_exp.extend([''] * 6)

    return work_exp

all_data = []
directory_path_name = ['']
for path in directory_path_name:
    file_names = os.listdir(f'{path}')

    resume_names = [i for i in file_names if "工作經歷" in i]
    
    for name in resume_names:

        data = [f'{name[:10]}']
        work_exp_df = pd.read_csv(f'{path}/{name}', header=None)
        resume_tuples = work_exp_df.itertuples(index=False, name=None)

        resume_row = next(resume_tuples)

        try:
            data.extend(work_exp_data(resume_row))
        except StopIteration:
            pass

        all_data.append(data)
        





##########################加入自傳##########################
df3 = pd.read_csv('自傳.csv')
merged_df = pd.merge(result_df , df3[['id', "intro"]], on='id', how='left')
print(merged_df)

  test          id                                                edu  \
0   cv  A123456789  [[X大學, 財金系(日間部), 畢業, 自2014年9月至2018年6月], [T高中, ...   
1   cv  B123456789  [[T大學, 企管/夜, 畢業, 自1998年9月至2001年7月], [Y商工, 電機/日...   

                                   language_ability  \
0  測驗名稱:___多益______________測驗成績:_____600___________   
1                測驗名稱:                        測驗成績:   

                                          experience  \
0  [[Y銀行, 初辦, 存匯/放款/支存, 自2022年9月至2023年9月], [S銀行, ...   
1  [[Y銀行, 初辦, 存匯/放款, 自2018年10月至2018年11月], [T銀行, 中...   

                                             license  
0  [人身保險業務員, 銀行內部控制與內部稽核, 投資型保險商品概要、金融體系概述, 財產保險業...  
1                                                 []  
  test          id                                                edu  \
0   cv  A123456789  [[X大學, 財金系(日間部), 畢業, 自2014年9月至2018年6月], [T高中, ...   
1   cv  B123456789  [[T大學, 企管/夜, 畢業, 自1998年9月至2001年7月], [Y商工, 電機/日...   

                                   language_ability  \
0  測驗