<a href="https://colab.research.google.com/github/dataliszt/collecting-data/blob/main/GooglePatentsCrawler.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import Module

In [None]:
# Install chromedriver
!apt-get update
!apt install chromium-chromedriver
!cp /usr/lib/chromium-browser/chromedriver /usr/bin
!pip install selenium

# Module
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from typing import collections, Dict, Tuple, List

import datetime
import time
import sys
import re
import os
import requests
import platform
import calendar
import pickle
import random

import pandas as pd
import numpy as np

In [None]:
''' 
When start date is entered, automatically generate date range with monthly basis
The function generates date with 2-weeks-range
'''
def make_date_range(start_date: int, end_date: int)->str:
    start_date = str(start_date)
    end_date = str(end_date)

    start_year = int(start_date[:4])
    start_month = int(start_date[4:6])
    end_year = int(end_date[:4])
    end_month = int(end_date[4:6])

    year = [x for x in range(1990, 2022)]
    month = [x for x in range(1, 13)]

    new_year = year[year.index(start_year):year.index(end_year)+1]
    new_month = month[month.index(start_month):month.index(end_month)+1]

    new_year = [str(x) for x in new_year]
    new_month = [str(x) for x in new_month]

    ymd_list = []
    for year in new_year:
        for month in new_month:
            if len(month) == 1:
                month = '0' + month
            day = str(calendar.monthrange(int(year), int(month))[-1])
            start_date = year + month + '01'
            middle_date = year + month + '15'
            end_date = year + month + day
            ymd_list.append((start_date, middle_date))
            ymd_list.append((str(int(middle_date)+1), end_date))

    return ymd_list

# Front page parsing function 


In [None]:
# check function :  how many data is exisiting on current page
def content_block_parser():
    full_html = driver.page_source
    soup = BeautifulSoup(full_html, 'html.parser')
    content_block = soup.find_all('article', 'result style-scope search-result-item')
    #print("현재 페이지의 크롤링할 데이터 갯수 : {}".format(len(content_block)))
    return content_block

# parse patent ID
def get_id(article): 
    patent_id = article.find('h4', 'metadata style-scope search-result-item')\
    .find('span','style-scope search-result-item').get_text()
    return patent_id

# parse patent priority date
def get_priority(article):
    whole_date = article.find('h4','dates style-scope search-result-item').get_text().split('•')

    for date in whole_date:
        if "Priority" in date:
            return date.strip().split(' ')[-1]
        elif "priority" in date:
            return date.strip().split(' ')[-1]   
    return None

# parse patent filed date
def get_filed(article):
    whole_date = article.find('h4','dates style-scope search-result-item').get_text().split('•')

    for date in whole_date:
        if "Filed" in date:
            return date.strip().split(' ')[-1]
        elif "filed" in date:
            return date.strip().split(' ')[-1]
    return None

# parse patent granted date
def get_granted(article):
    whole_date = article.find('h4','dates style-scope search-result-item').get_text().split('•')

    for date in whole_date:
        if "Granted" in date:
            return date.strip().split(' ')[-1]
        elif "granted" in date:
            return date.strip().split(' ')[-1]
    return None

# parse patent published date
def get_published(article):
    whole_date = article.find('h4','dates style-scope search-result-item').get_text().split('•')

    for date in whole_date:
        if "Published" in date:
            return date.strip().split(' ')[-1]
        elif "published" in date:
            return date.strip().split(' ')[-1]
    return None

# Inner page function 

In [None]:
# parse patent title
def get_title(soup):
    try:
        title = soup.title.text.split(' - ')[1].strip()
        if not title:
            return None
    except:
        return None
    return title

# parse patent abstract
def get_abstract(soup):
    try:
        abstract = soup.find('div', class_='abstract').get_text()
        if not abstract:
            return None
    except:
        return None
    return abstract


# parse patent inventor 
def get_inventor(soup):
    string = ""
    try:
        inventors = soup.select('dd[itemprop=inventor]')
        for inventor in inventors:
            if inventor.text.strip() not in string:
                string += inventor.text.strip() + ','
        string = string[:-1]
    except:
        print('Inventor이 존재하지 않습니다.')
        return None
    return string

# parse patent assignee 
def get_assignee(soup):
    try:
        assignee = soup.select_one('dd[itemprop=assigneeCurrent]').text.strip()
        if not assignee:
            assignee = soup.select_one('dd[itemprop=assigneeOriginal]').text.strip()
    except:
        return None
    return assignee

# parse patent country 
def get_country(soup):
    try:
        country = soup.select_one('dd[itemprop=countryName]').text.strip()
        if not country:
            return None
    except:
        return None
    return country

# parse cpc/ipc 
def get_cpc(soup):
    string = ""
    try:
        cpc_list = soup.select('ul[itemprop=cpcs]')
        if not cpc_list:
            return None
        for cpc in cpc_list:
            cpc = cpc.select('span[itemprop=Code]')[-1].text.strip()
            string += cpc+','
        string = string[:-1]
    except:
        return None
    
    return string
    
# parse patent citations
def get_citations(soup):
    string = ""
    try:
        citation_list = soup.select('tr[itemprop=backwardReferencesOrig]')
        if not citation_list:
            return None
        for citation in citation_list:
            citation = citation.select('span[itemprop=publicationNumber]')[0].text.strip()
            string += citation + ','
        string = string[:-1]
    except:
        return None
    return string

# parse cited by
def get_citedby(soup):
    string = ""
    try:
        citedby_list = soup.select('tr[itemprop=forwardReferencesFamily]')
        if not citedby_list:
            return None
        for cited in citedby_list:
            cited = cited.select('span[itemprop=publicationNumber]')[0].text.strip()
            string += cited + ','
        string = string[:-1]
    except:
        return None
    return string

In [None]:
if __name__=="__main__":
    # query setting 
    query_list = input('검색어를 입력하세요 :').split(',')
    query_list = [query.lstrip() for query in query_list]

    start_date = int(input('시작일을 입력하세요(예시 : 20160101)'))
    end_date = int(input('종료일을 입력하세요(예시 : 20160101)'))

    today = int(datetime.datetime.strftime(datetime.datetime.today(), '%Y%m%d'))

    try:    # if end_date exceed today's date, system will exit this script
        if end_date > today:
            raise Exception
    except Exception as e:
        print('종료일이 오늘 날짜를 초과할 수 없습니다.')
        sys.exit()

    ymd_list = make_date_range(start_date, end_date) # using date generating function, make date range with monthly basis

    if platform.system() == 'Windows':
        path =  'C:/Users/user/Desktop/code blue/chromedriver/chromedriver'

        option = webdriver.ChromeOptions()  
        option.add_argument('--headless') 
        option.add_argument('--no-sandbox') 
        option.add_argument('--disable-dev-shm-usage')

        option.add_argument("user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko")
        option.add_experimental_option("prefs", {"profile.default_content_setting_values.notifications": 1})

        driver = webdriver.Chrome(options = option, executable_path = path)
        driver.implicitly_wait(10)
        
        # change dir to save directory
        os.chdir('C:/Users/user/Desktop/code blue/google_paytent 크롤링/')
        # create dir 
        if not os.path.isdir('./{0}_{1}_데이터'.format(ymd_list[0][0], ymd_list[-1][-1])):
            os.mkdir('./{0}_{1}_데이터'.format(ymd_list[0][0], ymd_list[-1][-1]))
        os.chdir('./{0}_{1}_데이터'.format(ymd_list[0][0], ymd_list[-1][-1]))
        
    elif platform.system() == 'Linux': # In Colab env, there is no screen. So "headless" option must be needed
        option = webdriver.ChromeOptions() 
        option.add_argument('--headless') 
        option.add_argument('--no-sandbox') 
        option.add_argument('--disable-dev-shm-usage')

        driver = webdriver.Chrome('chromedriver',options = option)
        driver.implicitly_wait(10)
        
        # change dir to save directory
        os.chdir('/content/drive/MyDrive/노석현 박사님')
        # create dir 
        if not os.path.isdir('./{0}_{1}_데이터'.format(ymd_list[0][0], ymd_list[-1][-1])):
            os.mkdir('./{0}_{1}_데이터'.format(ymd_list[0][0], ymd_list[-1][-1]))
        os.chdir('./{0}_{1}_데이터'.format(ymd_list[0][0], ymd_list[-1][-1]))
        
    elif platform.systme() == 'Darwin':
        path =  'C:/Users/user/Desktop/code blue/chromedriver/chromedriver'

        option = webdriver.ChromeOptions()  
        option.add_argument('--headless') 
        option.add_argument('--no-sandbox') 
        option.add_argument('--disable-dev-shm-usage')

        option.add_argument("user-agent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko")
        option.add_experimental_option("prefs", {"profile.default_content_setting_values.notifications": 1})

        driver = webdriver.Chrome(options = option, executable_path = path)
        driver.implicitly_wait(10)
        
        # change dir to save directory
        os.chdir('/content/drive/MyDrive/노석현_박사님_데이터크롤링')
        # create dir 
        if not os.path.isdir('./{0}_{1}_데이터'.format(ymd_list[0][0], ymd_list[-1][-1])):
            os.mkdir('./{0}_{1}_데이터'.format(ymd_list[0][0], ymd_list[-1][-1]))
        os.chdir('./{0}_{1}_데이터'.format(ymd_list[0][0], ymd_list[-1][-1]))
        
    print(query_list)

    # setting url
    basic_url = 'https://patents.google.com/'

    # setting data save variable 
    data = collections.defaultdict(dict)

    ################### Front page Crawling ###########################
    for query in query_list:
        print("Ccollecting {}".format(query))
        #start = time.time()
        for date in ymd_list:
            page = 0
            start_date = date[0]
            end_date = date[1]

            while page < 10:
                #start = time.time()
                url  = basic_url+'?'+'q='+query+'&'+'before=priority:'+end_date+'&'+'after=priority:'+start_date+'&'+'num=100'+'&'+'page='+str(page)
                driver.get(url) # enter front page 
                time.sleep(5)

                content_block = content_block_parser()  # check whether there is data to collect or not
                if not content_block: # if there is no data, get out of loop
                    break

                for article in content_block:
                    uniq_id = get_id(article) # parse patent ID
                    data[uniq_id]  # making patent ID key in defaultdict

                    # data structure example  ->  {uniq_id : {uniq_id : US1234}, {priority : 20210202}} / key : {key : value}, {key : value}
                    data[uniq_id]['query'] = query
                    data[uniq_id]['start_date'] = start_date
                    data[uniq_id]['end_date'] = end_date
                    data[uniq_id]['id'] = get_id(article)
                    data[uniq_id]['priority']  = get_priority(article)
                    data[uniq_id]['filed'] = get_filed(article)
                    data[uniq_id]['granted'] = get_granted(article)
                    data[uniq_id]['published'] = get_published(article)
                page += 1
        #end = time.time()
        #epoch_time = round(end - start, 2)
        #print('{} front crawling 소요시간: {}초'.format(query, epoch_time))
                
        # saving point when one query is done!      
        today = datetime.datetime.strftime(datetime.datetime.today(), '%Y%m%d_%H%m')
        with open(f'{today}_frontdata.pickle', 'wb') as f:
            pickle.dump(data, f)

################### Inner page Crawling ###########################
bad_url = [] # url which http response is not 200 
count = 0
for value in list(data.values()):
    patent_id = value['id']
    query = value['query']
    start_date = value['start_date']
    end_date = value['end_date']

    inner_url = basic_url+'patent'+'/'+ patent_id+'/'+'en?'+'q='+query+'&'+\
                'before=priority:'+end_date+'&'+'after=priority:'+start_date

    # enter inner url
    response = requests.get(inner_url)
    if response.status_code != 200:
        bad_url.append(inner_url)
        continue
    soup = BeautifulSoup(response.content, 'html.parser')

    data[patent_id]['title'] = get_title(soup)
    data[patent_id]['abstract'] = get_abstract(soup)
    data[patent_id]['inventor'] = get_inventor(soup)
    data[patent_id]['assignee'] = get_assignee(soup)
    data[patent_id]['country'] = get_country(soup)
    data[patent_id]['cpc'] = get_cpc(soup)
    data[patent_id]['citations'] = get_citations(soup)
    data[patent_id]['citedby'] = get_citedby(soup)
    count += 1

    # every 100th, randomly sleep the system
    if count % 100 == 0:
        time.sleep(random.randrange(1,2))
        print(count)

    # every 10000th data, save it
    if count % 10000 == 0:
        today = datetime.datetime.strftime(datetime.datetime.today(), '%Y%m%d_%H%m')
        with open(f'{today}_innerdata_{count}.pickle', 'wb') as f:
            pickle.dump(data, f)

# when finished, save final data
today = datetime.datetime.strftime(datetime.datetime.today(), '%Y%m%d_%H%m')
with open(f'{today}_innerdata_{count}.pickle', 'wb') as f:
    pickle.dump(data, f)


In [None]:
# When crawling is done, preprocess some strings if it is not English
df = pd.DataFrame(data.values())
df['abstract'] = df['abstract'].apply(lambda x:re.sub('[^a-z A-Z 1-9]+','', str(x)).strip()).loc[3]

# save it as csv file
df.to_csv('./{0}_{1}_데이터.csv'.format(ymd_list[0][0], ymd_list[-1][-1]), encoding='utf-8')

# save us as excek fuke 
df.to_excel('./{0}_{1}_데이터.xlsx'.format(ymd_list[0][0], ymd_list[-1][-1]), engine='openpyxl')