In [1]:
import requests
import pandas as pd
import time
from bs4 import BeautifulSoup
import datetime
import json
from pprint import pprint
from tqdm import tnrange
import warnings
warnings.filterwarnings("ignore")

# Функции

In [2]:
def getData(brands, start_year, finish_year, list_feature):
    '''
    Возвращает датафрейм со спарсенными данными
    brands (str/list) - список марок автомобилей для парсинга
    min_year (int) - минимальный год выпуска авто, который нас интересует
    list_feature (list) - список признаков для парсинга

    Ограничение avto.ru - 3700 ссылок за 1 раз, поэтому по каждому бренду и 
    году будем делать отдельный запрос
    
    Задаём начальный фильт выдачи:
    с пробегом, фото - не важно, таможня - не важно, состояние не важно
    ссылка копируется с сайта
    '''       
    # начальная страница - фильтр
    url = 'https://auto.ru/cars/used/?damage_group=ANY&customs_state_group\=DOESNT_MATTER&has_image=false'

    # если вдруг на входе одна марка, а не список 
    if type(brands)==str:
        brands = brands.split()

    # иницализируем датафрейм
    df = pd.DataFrame(columns=list_feature)
    
    # на случай, если парсить по возрастанию или убыванию даты выпуска
    if start_year > finish_year:
        step = -1
    else: 
        step = 1
    finish_year += step

    # визуализируем прогресс выполнения
    print(list_brands)
    with tnrange(len(list_brands)) as brands:
        for brand in brands:
            brands.set_description(f'Общий ход выполнения')
        
            # Собираем все ссылки по бренду в заданном интервале дат:
            all_links = getAllLinksBrand(url, start_year, finish_year, step, list_brands[brand])
    
            with tnrange(len(all_links), leave=True, unit=' url ') as links:
                for lks in links:
                    links.set_description(f'{list_brands[brand]}')
                    
                    # переходим по найденным на странице ссылкам собираем данные
                    data_row = getAutoData(all_links[lks], list_feature)
                    df = df.append(data_row, ignore_index=True)

                # Сохраняем каждую марку в отдельный файл
                now = datetime.datetime.now()
                t = f'{now:%Y_%m_%d_%H_%M}'
                file_name = f'parsing _{list_brands[brand]}_{start_year}_{finish_year}_{t}'
            
            # df.to_csv(file_name + '.csv', sep='\t', index=False, encoding='utf-8')
            SaveToExcel(df, file_name + '.xlsx')

    return df

In [3]:
def getAllLinksBrand(url, start_year, finish_year, step, brand):
    '''
    возвращает список ссылок по бренду в интервале дат
    '''
    all_links = [] # список для всех ссылок по бренду
    # из-за ограничения собираем все возможные ссылки по 1 бренду и году за раз
    

    with tnrange(start_year,finish_year, step, leave=False) as years:
        for year in years:
            years.set_description(f'{year}')
            # динамическая часть ссылки
            part = str(brand).lower() + '/' + str(year) + '-year'
            # собираем ссылку для поиска по одному бренду и году 
            link = url.replace('cars/used','cars/' + part + '/used')
            
            # собираем ссылки с каждой страницы выдачи (максимум - 99 страниц)
            for n_page in range(1,100):
                page_links = getPageLinks(link, n_page) # список ссылок с текущей страницы
                if len(page_links) == 0: # выходим если не нашли ссылок
                    break
                else:
                    all_links += page_links
    return (all_links)

In [4]:
def getAutoData(auto_page, list_feature):
    '''
    Запрашивает данные по странице, возвращает обработанный словарь с данными
    auto_page (string) ссылка на страницу с объявлением
    '''   
    # словарь, для храения значений спарсенных значений признаков
    dict_row = {}
    
    # запрашиваем данные по ссылке , headers={'User-Agent': UserAgent().chrome}
    response = requests.get(auto_page)
    
    if not response.ok: # если сервер нам отказал, вернем статус ошибки 
        print(f'Сервер: {response.status_code}, {auto_page}')
    else:  
        soup = BeautifulSoup(response.content,'html.parser')

        # динамический список признаков для парсинга
        list_snippet = set(list_feature.copy())

        # используем функции для парсинга и вручную добавляем еще 2
        for func in range(6):
            if func == 0:
                dict_part_row = {'color':getColor(soup)}
            if func == 1:
                dict_part_row = {'image':getFoto(soup)}
            if func == 2:
                dict_part_row = getSkript_initial_state(soup, list_snippet, auto_page)
            elif func == 3:
                dict_part_row = getSkript_application_ld_json(soup, list_snippet, auto_page)
            elif func == 4:
                dict_part_row = getСharacteristics(soup, list_snippet)
            else:
                timestamp = int(time.time())
                dict_part_row = {'car_url':auto_page,
                                 'parsing_unixtime':timestamp}

            # исключим из списка ту часть признаков, для которой получены значения
            llist = set(list(dict_part_row.keys()))
            list_snippet -= llist

            # объединяем словари
            dict_row = merge_two_dicts(dict_row, dict_part_row)
        
    return (dict_row)

In [5]:
def getPageLinks(link, n_page):
    '''
    Возвращает список ссылок на странице выдачи
    link - ссылка на страницу поисковой выдачи
    n_page: int/string - номер страницы для парсинга
    '''
    auto_links = []
    # составляем ссылку на страницу поиска
    page_link = link + '&page=' + str(n_page)

    response = requests.get(page_link)
    if not response.ok: # если сервер нам отказал, вернем статус ошибки 
        print(f'Сервер: {response.status_code}, {link}')
    else:
        # получаем содержимое страницы
        soup = BeautifulSoup(response.content,'html.parser')

        # ищем ссылки и очищаем их от ненужных тэгов
        auto_links = soup.find_all('a', class_='Link ListingItemTitle__link')
        auto_links = [link.attrs['href'] for link in auto_links]

    return (auto_links)

In [6]:
def getColor(soup):
    try:
        block = soup.find('li',class_='CardInfoRow CardInfoRow_color')
        color = block.find('a').text
        # color = block.find('a', class_='Link_color_black').text
    except:
        color = ''
    return (color)

In [7]:
def getFoto(soup):
    try:
        images = soup.find_all('img',
                               {'class': 'ImageGalleryDesktop__image ImageGalleryDesktop__image_hidden'})
        foto_links = [img.get('src') for img in images]
    except:
        foto_links=[]
    return (foto_links)

In [8]:
def getSkript_initial_state(soup, list_features, url):
    '''
    Возвращает словарь значений признаков
    soup - html-страница с объявлением
    list_features - список названий признаков
    '''
    my_dict ={}
    try:
        json_data = json.loads(soup.find('script', attrs={'id': 'initial-state'}).string)
        for i in list_features:
            for t in find_key(json_data, i):
                my_dict[i] = t[1]
                if i == 'model_info':# берём  model_name из словаря model_info
                    my_dict['model_name'] = t[1]['code']
    except AttributeError:
        print(f'Ошибка: {url}')
        
    return (my_dict)

In [9]:
def getSkript_application_ld_json(soup,list_features, url):
    '''
    Возвращает словарь значений признаков
    soup - html-страница с объявлением
    list_features - список названий признаков
    '''
    my_dict ={}
    try:
        all_blocks = soup.find_all('script',type='application/ld+json')
        for block in all_blocks:
            json_data = json.loads(block.string)
            for i in list_features:
                for t in find_key(json_data, i):
                    my_dict[i] = t[1]
    except AttributeError:
        print(f'Ошибка: {url}')
                
    return (my_dict)

In [10]:
def getСharacteristics(soup, list_features):
    '''
    Возвращает со страницы автомобиля значение заданного параметра:
    soup - веб-страница
    sign - изменяемая часть наименования класса списка
    '''
    # словарь подстановок для парсинга
    dict_param = {'Привод':'drive','Руль':'wheel','Состояние':'state','Владельцы':'ownersCount',
                  'ПТС':'pts','Таможня':'customs','Владение':'owningTime'}
    my_dict ={}
    for feature in list_features:
        if feature in dict_param:
            cl = 'CardInfoRow CardInfoRow_' + str(dict_param[feature])
            try:
                value = soup.find('li', class_=cl).contents[1].text
            except:
                value = ''
            my_dict[feature] = value
            
    return (my_dict)

In [11]:
def merge_two_dicts(x, y):
    '''
    Объединение словарей
    '''
    z = x.copy()
    z.update(y)
    return z

In [12]:
def SaveToExcel(df, name):
    with pd.ExcelWriter(name) as writer:
        df.to_excel(writer, sheet_name='Лист 1', index=False)

In [13]:
# Рекурсивные генераторы для поиска по объекту, состоящему из словарей и списков
def find_key(obj, key):
    if isinstance(obj, dict):
        yield from iter_dict(obj, key, [])
    elif isinstance(obj, list):
        yield from iter_list(obj, key, [])

def iter_dict(d, key, indices):
    for k, v in d.items():
        if k == key:
            yield indices + [k], v
        if isinstance(v, dict):
            yield from iter_dict(v, key, indices + [k])
        elif isinstance(v, list):
            yield from iter_list(v, key, indices + [k])

def iter_list(seq, key, indices):
    for k, v in enumerate(seq):
        if isinstance(v, dict):
            yield from iter_dict(v, key, indices + [k])
        elif isinstance(v, list):
            yield from iter_list(v, key, indices + [k])

# Списки

In [14]:
original_columns = ['bodyType', 'brand', 'car_url', 'color', 'complectation_dict', 'description', 'engineDisplacement', 
                    'enginePower', 'equipment_dict', 'fuelType', 'image', 'mileage', 'modelDate', 'model_info', 
                    'model_name', 'name', 'numberOfDoors', 'parsing_unixtime', 'priceCurrency', 'productionDate', 
                    'sell_id', 'super_gen', 'vehicleConfiguration', 'vehicleTransmission', 'vendor', 'Владельцы', 
                    'Владение', 'ПТС', 'Привод', 'Руль', 'Состояние', 'Таможня']
original_columns = original_columns + ['price']

Некоторые признаки из тестового датафрейма (по ошибке, либо в силу времени) в структуре сайта называются иначе, либо отсутсвуют. Создадим словарь для переименованных признаков, так будет проще работать и вернуться к первоначальному состоянию

In [15]:
# словарь переименований
dict_rename = {
    'complectation_dict':'complectation',
    'equipment_dict':'equipment',
    'name':'human_name',
    'sell_id':'sale_id',
    'super_gen':'tech_param'
}

In [16]:
# Теперь создадим список параметров, с которым будем работать
work_list = [dict_rename[key] if dict_rename.get(key) else key for key in original_columns]

# Парсер
Лучше запускать по 1 марке параллельно в разных ноутбуках.

In [17]:
# # Список марок для парсинга
# list_brands = ['LEXUS','MERCEDES','BMW','MITSUBISHI',
#                'TOYOTA','SKODA','HONDA','VOLVO','AUDI','VOLKSWAGEN','INFINITI','NISSAN']

list_brands = ['INFINITI']
# Задаём начальные параметры для парсинга (бренды, интервал дат, признаки)
df = getData(list_brands, 2021, 2021, work_list)

# вернём названия в прежнее состояние
for key in dict_rename:
    if dict_rename[key] in df.columns:
        df.rename({dict_rename[key]: key}, axis=1, inplace=True)
df = df[original_columns]

df.head()