In [1]:
url_list = [
#'http://www.kimtex.com',
'http://swamyweb.com',

'http://www.kurashiki.com.br',
# 'http://www.unitexbd.com',

# 'http://www.beddinghouse.com',
#'http://www.nineandco.com',
# 'http://www.carcemal.pt',
# 'http://www.silsa.pt',
]

In [2]:
import vertexai
from google.cloud import aiplatform
from langchain.llms.vertexai import VertexAI

PROJECT_ID = "grainscanner"  # @param {type:"string"}
REGION = "asia-northeast3"  # @param {type:"string"}

# Initialize Vertex AI SDK
vertexai.init(project=PROJECT_ID, location=REGION)

# Text model instance integrated with langChain
llm = VertexAI(
    model_name="gemini-pro",#"text-bison",
    max_output_tokens=4096,
    temperature=0.4,
    top_p=0.8,
    top_k=40,
    verbose=True,
)


  warn_deprecated(


In [3]:
from langchain.chains import LLMChain, SimpleSequentialChain
from langchain.prompts import PromptTemplate
from langchain.output_parsers.json import SimpleJsonOutputParser
import json

RETRY_COUNT = 3

def get_string_item(dict_item, key):
    if isinstance(dict_item, dict):
        value = dict_item.get(key, '')
        if not value:
            return ""
        
        while isinstance(value, list) and value:
            value = value[0]
        return value
    
    return ""

def get_list_item(dict_item, key):
    if isinstance(dict_item, dict):
        value = dict_item.get(key, [])
        return value
    elif isinstance(dict_item, list):
        return dict_item
    
    return []


def get_url_list_with_llm(llm, a_tag_list):
    json_parser = SimpleJsonOutputParser()

    filter_template = """
        당신은 유능하고 경험 많은 웹 제작자입니다.
        다음 global 회사 홈페이지에 태그된  a태그 리스트를 보고 
        회사에 contact 할 수 있는 정보를 담은 a태그와 
        취급 product를 설명하는 a태그만 골라서 href attribute를 추출해주세요.
        판단은 엄격하게 진행해주세요.

        Format instructions:
        ["href attribute", "href attribute"]

        -----------------------
        content :
        {a_tag_list}
    """

    filter_prompt_template = PromptTemplate(
        input_variables=["a_tag_list"], 
        template=filter_template, 
        output_parser=json_parser,
        partial_variables={
            "format_instructions": json_parser.get_format_instructions()
        }
    )

    chain = filter_prompt_template|llm|json_parser

    retry_count = 0
    last_error = None
    result_additional_filter_json = None
    while retry_count < RETRY_COUNT:
        try:
            result_additional_filter_json = chain.invoke({"a_tag_list": a_tag_list})
            break
        except Exception as e:
            print(f'    ** retry[{retry_count+1}] - ', e)
            last_error = str(e)
        retry_count += 1

    if result_additional_filter_json:
        print('   ', result_additional_filter_json)
        return result_additional_filter_json
    else:
        return {'error': last_error}


def get_firm_info_with_llm(llm, page_text):
    json_parser = SimpleJsonOutputParser()
    html_template = """
        당신은 유능한 global html parsor 입니다. 
        주어진 html 은 global 회사의 웹페이지입니다. 
        회사에 contact 할 수 있는 email, phone, fax, address 을 추출해주세요. 
        더불어 취급 products가 있다면 핵심 물품만 keyword list로 추출해주세요.
        
        결과는 json 으로 구성해서 전달해주세요.

        Format instructions:
        {format_instructions}

        ------------
        content:
        {page_text}
    """

    html_prompt_template = PromptTemplate(
        input_variables=["page_text"], 
        template=html_template, 
        output_parser=json_parser,
        partial_variables={
            "format_instructions": json_parser.get_format_instructions()# + "\n" + "json key are (email, phone, address, items)"
        }
    )

    html_chain = html_prompt_template | llm | json_parser
    retry_count = 0
    result_html_json = None
    last_error = None
    while retry_count < RETRY_COUNT:
        try :
            result_html_json = html_chain.invoke({"page_text": page_text})
            break
        except Exception as e:
            print(f'    ** retry[{retry_count+1}] - ', e)
            last_error = str(e)
        retry_count += 1

    if result_html_json:
        print('   ', '\n    '.join(json.dumps(result_html_json, indent=4).split('\n')))
        return result_html_json
    else:
        return {'error': last_error}

In [4]:
import requests
from bs4 import BeautifulSoup
from google.cloud import aiplatform
import time

from requests.adapters import HTTPAdapter, Retry

requests.adapters.DEFAULT_POOLSIZE = 100
retries = Retry(total=20)
# 세션을 생성하고 ConnectionPool 크기를 설정
session = requests.Session()
session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100, max_retries=retries))

from urllib.parse import urlparse

get_domain = lambda url: urlparse(url).netloc.replace('www.', '')

def change_protocol(url):
    # URL 파싱
    parsed_url = urlparse(url)
    # 프로토콜 추출
    protocol = parsed_url.scheme
    if protocol == 'http':
        return parsed_url._replace(scheme='https').geturl()
    elif protocol == 'https':
        return parsed_url._replace(scheme='http').geturl()

    return url

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
}

def extract_content(url, is_recursive=False):
    # 페이지 콘텐츠를 가져옵니다.
    time.sleep(0.5)
    response = requests.get(url, headers=headers, timeout=5)
    if response.status_code == 200:
        soup = BeautifulSoup(response.content, 'html.parser')
        return response.status_code, soup
    elif response.status_code == 403:
        if not is_recursive:
            print(f'    ** page 조회 - retry')
            return extract_content(change_protocol(url), is_recursive=True)
    return response.status_code, None


def run(main_url):
    print(f'** [main page] 조회 - {main_url}')
    status, soup = extract_content(main_url)

    result = []
    if status == 200:
        print(f'  ** [llm] contact 정보 & products 추출')
        firm_info = {'homepage': main_url,
                     'extract_url': main_url}
        try :
            info = get_firm_info_with_llm(llm, soup.get_text())
            if info:
                firm_info.update(info)
        except Exception as e:
            firm_info['error'] = str(e)
        result.append(firm_info)

        print(f'  ** a tag 추출')
        a_list = soup.find_all('a', href=True)
        
        main_domain = get_domain(main_url)
        distinct_a_list = {}
        for link in a_list:
            if get_domain(link.get('href')).startswith(main_domain):
                distinct_a_list[link.get('href').strip('/')] = link
        
        if distinct_a_list:
            print(f'  ** [llm] 회사 정보와 관련있는 a tag 선별')
            page_list = get_url_list_with_llm(llm, [distinct_a_list.values()])
        
            for sub_url in page_list:
                print(f'  ** [sub page] 조회 - {sub_url}')
                sub_status, sub_soup = extract_content(sub_url)
                if sub_status == 200:
                    print(f'  ** [llm] contact 정보 & products 추출')
                    sub_firm_info = {'homepage': main_url,
                                    'extract_url': sub_url}
                    try :
                        info = get_firm_info_with_llm(llm, sub_soup.get_text())
                        if info:
                            sub_firm_info.update(info)
                    except Exception as e:
                        sub_firm_info['error'] = str(e)
                    result.append(sub_firm_info)
                else:
                    print(f'    [{sub_status}] : 조회 실패')
        else:
            print(f'    [empty]')
    else:
        print(f'    [{status}] : 조회 실패')

    return result


In [7]:
result = []
for url in url_list:
    sub_result = run(url)
    if sub_result:
        result.extend(sub_result)
print('[finish!]')

** [main page] 조회 - http://swamyweb.com
  ** [llm] contact 정보 & products 추출
    {
        "contact": {
            "email": "info@swamycottonmill.co.in",
            "phone": "(91) 421 2344042",
            "fax": "(91) 421 2345750",
            "address": "S.F.No.407/2, Peerchangadu, Tirupur-641663."
        },
        "products": [
            "Cotton",
            "Viscose",
            "Linen",
            "Woven Fabrics"
        ]
    }
  ** a tag 추출
  ** [llm] 회사 정보와 관련있는 a tag 선별
    ['https://swamyweb.com/contact-us/', 'http://swamyweb.com/demo/contact-us/']
  ** [sub page] 조회 - https://swamyweb.com/contact-us/
  ** [llm] contact 정보 & products 추출
    {
        "contact": {
            "email": "info@swamycottonmill.co.in",
            "phone": "+ 91 421 2344042",
            "fax": "+ 91 421 2345750",
            "address": "S.F.No.407/2,\nPeerchangadu,\nMangalam,\nTirupur \u2013 641663."
        },
        "products": []
    }
  ** [sub page] 조회 - http://swamyweb.com/demo/cont

In [None]:
import pandas as pd

# 제공된 JSON 데이터
json_data = result
normalized_df = pd.json_normalize(json_data)

target_column_list = ['homepage', 'extract_url', 'company_name', 'mail', 'phone', 'address', 'fax', 'products', 'error']
total_column_list = list(normalized_df.columns)

merged_df = pd.DataFrame(columns=target_column_list)

for target_col in target_column_list:
    related_cols = [col for col in total_column_list if target_col in col]

    def merge_row(row):
        merged_data = []
        for item in row.dropna():
            if isinstance(item, dict):
                merged_data.append(json.dumps(item, ensure_ascii=False))
            elif isinstance(item, list):
                merged_data.extend([str(x) for x in item])
            elif not pd.isnull(item):
                merged_data.append(item)
        if merged_data:
            return ';\n'.join(merged_data)
        else:
            return ''

    if related_cols:
        merged_df[target_col] = normalized_df[related_cols].apply(merge_row, axis=1)

In [None]:
import os
file_path = './output/beautifulsoup_sample.csv'
merged_df.to_csv(file_path, index=False)

# Numbers 애플리케이션으로 CSV 파일 열기
os.system(f'open -a Numbers {file_path}')