https://docs.ozon.ru/api/performance/

https://requests.readthedocs.io/en/latest/

https://en.wikipedia.org/wiki/List_of_file_signatures

In [None]:
import requests
import json
from datetime import datetime
from datetime import timedelta
from datetime import date
import time

In [None]:
class Ozon_performance:
    def __init__(self, client_id, client_secret,
                 methods = {'statistics': 'https://performance.ozon.ru:443/api/client/statistics',
                           'phrases': 'https://performance.ozon.ru:443/api/client/statistics/phrases',
                           'attribution': 'https://performance.ozon.ru:443/api/client/statistics/attribution',
                           'media': 'https://performance.ozon.ru:443/api/client/statistics/campaign/media',
                           'product': 'https://performance.ozon.ru:443/api/client/statistics/campaign/product',
                           'daily': 'https://performance.ozon.ru:443/api/client/statistics/daily'
                          },
                 day_lim = 62,
                 camp_lim = 10
                ):
        
        self.client_id = client_id
        self.client_secret = client_secret
        self.methods = methods
        self.day_lim = day_lim - 7
        self.camp_lim = camp_lim
        try:
            self.auth = self.get_token()
        except:
            print('Нет доступа к серверу')
        
        self.date_to = str(date.today())
        self.date_from = '2022-02-10' 
        try:
            self.campaigns = [camp['id'] for camp in self.get_campaigns()]
            self.objects = {}
            for camp in self.campaigns:
                self.objects[camp]=[obj['id'] for obj in self.get_objects(campaign_id=camp)]
        except:
            print('Ошибка при получении кампаний')
        
        self.st_camp = []
        self.st_ph = []
        self.st_attr = []
        self.st_med = []
        self.st_pr = []
        self.st_dai = []

    def get_token(self):
        url = 'https://performance.ozon.ru/api/client/token'
        head = {"Content-Type" : "application/json",
                "Accept" : "application/json"
               }
        body = {"client_id" : self.client_id,
                "client_secret" : self.client_secret,
                "grant_type" : "client_credentials"
               }
        response = requests.post(url, headers=head, data=json.dumps(body))
        if response.status_code == 200:
            print('Подключение успешно, токен получен')
            return response.json()
        else:
            print(response.text)
        
    
    def get_campaigns(self):
        """
        Возвращает список кампаний
        """
        url = 'https://performance.ozon.ru:443/api/client/campaign'
        head = {"Content-Type": "application/json",
                "Accept": "application/json",
                "Authorization": self.auth['token_type'] + ' ' + self.auth['access_token']
               }
        response = requests.get(url, headers=head)
        if response.status_code == 200:
            print(f"Найдено {len(response.json()['list'])} кампаний")
            return response.json()['list']
        else:
            print(response.text)
    

    def get_objects(self, campaign_id):
        """
        Возвращает список рекламируемых объектов в кампании
        """
        url = f"https://performance.ozon.ru:443/api/client/campaign/{campaign_id}/objects"
        head = {"Content-Type": "application/json",
                "Accept": "application/json",
                "Authorization": self.auth['token_type'] + ' ' + self.auth['access_token']
               }
        response = requests.get(url, headers=head)
        if response.status_code == 200:
            return response.json()['list']
        else:
            print(response.text)
            
    def split_data(self, camp_lim):
        """
        Разбивает данные в соответствии с ограничениями Ozon
        """
        if len(self.objects) > camp_lim:
            data = []
            for i in range(0, len(self.objects), camp_lim):
                data.append(dict(list(ozon.objects.items())[i:i+camp_lim]))
        else:
            data = self.objects
        return data
    
    def split_time(self, date_from, date_to, day_lim):
        """
        Разбивает временной промежуток в соответствии с лимитом Ozon
        """
        delta = datetime.strptime(date_to, '%Y-%m-%d') - datetime.strptime(date_from, '%Y-%m-%d')
        if delta.days > day_lim:
            tms = []
            for t in range(0, delta.days, day_lim):
                dt_fr = str((datetime.strptime(date_from, '%Y-%m-%d') + timedelta(days = t)).date())
                if (datetime.strptime(date_from, '%Y-%m-%d') + timedelta(days = t + day_lim-1)).date() >= (datetime.strptime(date_to, '%Y-%m-%d')).date():
                    dt_to = str((datetime.strptime(date_to, '%Y-%m-%d')).date())
                else:
                    dt_to = str((datetime.strptime(date_from, '%Y-%m-%d') + timedelta(days = t + day_lim-1)).date())        
                tms.append([dt_fr, dt_to]) 
        else:
            tms = [[date_from, date_to]]
            
        return tms
    
    def get_statistics(self, campaigns,
                       t_date_from = None,
                       t_date_to= None,
                       group_by = "NO_GROUP_BY",
                       n_attempts = 20,
                       delay = 3):
        """
        Возвращает статистику по кампании

        DATE — группировка по дате (по дням);
        START_OF_WEEK — группировка по неделям;
        START_OF_MONTH — группировка по месяцам.

        """
        url = self.methods['statistics']
        head = {"Authorization": self.auth['token_type'] + ' ' + self.auth['access_token'],
               "Content-Type": "application/json",
               "Accept": "application/json"
               }
        body = {"campaigns": campaigns,
                "dateFrom": t_date_from,
                "dateTo": t_date_to,
                "groupBy": group_by
               }

        response = requests.post(url, headers=head, data=json.dumps(body))
        if response.status_code == 200:
            print('Статистика по кампаниям получена')
            if len(campaigns) == 1:
                return [response.json()['UUID'], 'csv']
            else:
                return [response.json()['UUID'], 'zip']
        elif response.status_code == 429:
            n = 0
            while n < n_attempts:
                time.sleep(delay)
                response = requests.post(url, headers=head, data=json.dumps(body))
                print('кампании, статус', response.status_code)
                if response.status_code == 200:
                    print('Статистика по кампаниям получена')
                    if len(campaigns) == 1:
                        return [response.json()['UUID'], 'csv']
                    else:
                        return [response.json()['UUID'], 'zip']
                    break
                else:
                    n+=1
        else:
            print(response.text)
            
    def get_phrases(self, objects,
                    t_date_from = None,
                    t_date_to= None,
                    group_by = "NO_GROUP_BY",
                    n_attempts = 20,
                    delay = 3):
        """
        Возвращает отчет по фразам
        """
        url = self.methods['phrases']
        head = {"Authorization": self.auth['token_type'] + ' ' + self.auth['access_token'],
                "Content-Type": "application/json",
                "Accept": "application/json"
               }   
        res = []
        for camp, obj in objects.items():
            if len(obj) != 0:
                body = {"campaigns": [camp],
                        "objects": obj,
                        "dateFrom": t_date_from,
                        "dateTo": t_date_to,
                        "groupBy": group_by
                       }
                response = requests.post(url, headers=head, data=json.dumps(body))
                if response.status_code == 200:
                    print('Статистика по фразам получена')
                    res.append([response.json()['UUID'], 'csv'])
                elif response.status_code == 429:
                    n = 0
                    while n < n_attempts:
                        time.sleep(delay)
                        response = requests.post(url, headers=head, data=json.dumps(body))
                        print('фразы, статус', response.status_code)
                        if response.status_code == 200:
                            res.append([response.json()['UUID'], 'csv'])
                            break
                        else:
                            n+=1  
                else:
                    print(response.text)
        return res

    def get_attribution(self, campaigns,
                        t_date_from = None,
                        t_date_to= None,
                        group_by = "NO_GROUP_BY",
                        n_attempts = 20,
                        delay = 3):
        """
        Возвращает отчёт по заказам
        """
        url = self.methods['attribution']
        head = {"Authorization": self.auth['token_type'] + ' ' + self.auth['access_token'],
                "Content-Type": "application/json",
                "Accept": "application/json"
               }
        body = {"campaigns": campaigns,
                "dateFrom": t_date_from,
                "dateTo": t_date_to,
                "groupBy": group_by
               }
        response = requests.post(url, headers=head, data=json.dumps(body))
        if response.status_code == 200:
            print('Статистика по баннерам получена')
            if len(campaigns) == 1:
                return [response.json()['UUID'], 'csv']
            else:
                return [response.json()['UUID'], 'zip']
        elif response.status_code == 429:
            n = 0
            while n < n_attempts:
                time.sleep(delay)
                response = requests.post(url, headers=head, data=json.dumps(body))
                print('баннеры, статус', response.status_code)
                if response.status_code == 200:
                    if len(campaigns) == 1:
                        return [response.json()['UUID'], 'csv']
                    else:
                        return [response.json()['UUID'], 'zip']
                    break
                else:
                    n+=1
        else:
            print(response.text)
            
    def get_media(self, campaigns,
                  t_date_from = None,
                  t_date_to= None,
                  n_attempts = 10,
                  delay = 3):
        """
        Возвращает статистику по медийным кампаниям
        """
        url = self.methods['media']
        head = {"Authorization": self.auth['token_type'] + ' ' + self.auth['access_token'],
                "Content-Type": "application/json",
                "Accept": "application/json"
               }
        params = {"campaigns": campaigns,
                  "dateFrom": t_date_from,
                  "dateTo": t_date_to
                 }
        response = requests.get(url, headers=head, params=params)
        if response.status_code == 200:
            print('Статистика по медиа получена')
            return response
        else:
            print(response.text)
            
    def get_product(self, campaigns,
                    t_date_from = None,
                    t_date_to= None,
                    n_attempts = 10,
                    delay = 3):
        """
        Возвращает статистику по продуктовым кампаниям
        """
        url = self.methods['product']
        head = {"Authorization": self.auth['token_type'] + ' ' + self.auth['access_token'],
                "Content-Type": "application/json"
               }
        params = {"campaigns": campaigns,
                  "dateFrom": t_date_from,
                  "dateTo": t_date_to
                 }
        response = requests.get(url, headers=head, params=params)
        if response.status_code == 200:
            print('Статистика продуктовая получена')
            return response
        else:
            print(response.text)
       
    def get_daily(self, campaigns,
                  t_date_from = None,
                  t_date_to= None,
                  n_attempts = 10,
                  delay = 3):
        """
        Возвращает дневную статистику по кампаниям
        """
        url = self.methods['daily']
        head = {"Authorization": self.auth['token_type'] + ' ' + self.auth['access_token'],
                "Content-Type": "application/json"
               }
        params = {"campaigns": campaigns,
                  "dateFrom": t_date_from,
                  "dateTo": t_date_to
                 }
        response = requests.get(url, headers=head, params=params)
        if response.status_code == 200:
            print('Статистика дневная получена')
            return response
        else:
            print(response.text)
    
    def get_report(self, uuid):
        """
        Получить файл отчета
        """
        url = 'https://performance.ozon.ru:443/api/client/statistics/report?UUID=' + uuid
        head = {"Authorization": self.auth['token_type'] + ' ' + self.auth['access_token']}
        response = requests.get(url, headers=head)
        if response.status_code == 200:
            return response
        else:
            print(response.text)
    
    def collect_data(self):
        data = self.split_data(camp_lim = self.camp_lim)
        time = self.split_time(date_from = self.date_from, date_to = self.date_to, day_lim = self.day_lim)
        try:
            for d in data:
                for t in time:
                    self.st_camp.append(self.get_statistics(list(d.keys()), t_date_from=t[0], t_date_to=t[1]))
                    self.st_ph.append(self.get_phrases(d, t_date_from=t[0], t_date_to=t[1]))
                    self.st_attr.append(self.get_attribution(list(d.keys()), t_date_from=t[0], t_date_to=t[1]))
                    self.st_med.append(self.get_media(list(d.keys()), t_date_from=t[0], t_date_to=t[1]))
                    self.st_pr.append(self.get_product(list(d.keys()), t_date_from=t[0], t_date_to=t[1]))
                    self.st_dai.append(self.get_daily(list(d.keys()), t_date_from=t[0], t_date_to=t[1]))
        except TimeoutError:
            print('Нет ответа от сервера')
                
    def save_data(self, path = './data/'):
        for num, med in enumerate(self.st_med):
            try:
                file = open(path + f"media_{num}.csv", 'wb')
                file.write(med.content)
                file.close()
                print('Сохранен', path + f"media_{num}.csv")
            except:
                continue
        for num, pr in enumerate(self.st_pr):
            try:
                file = open(path + f"product_{num}.csv", 'wb')
                file.write(pr.content)
                file.close()
                print('Сохранен', path + f"product_{num}.csv")
            except:
                continue        
        for num, dai in enumerate(self.st_dai):
            try:
                file = open(path + f"daily_{num}.csv", 'wb')
                file.write(med.content)
                file.close()
                print('Сохранен', path + f"daily_{num}.csv")
            except:
                continue
        for num, camp in enumerate(self.st_camp):
            try:
                report = self.get_report(uuid=camp[0])
                file = open(path + f"campaigns_{num}.{camp[1]}", 'wb')
                file.write(report.content)
                file.close()
                print('Сохранен', path + f"campaigns_{num}.{camp[1]}")
            except:
                continue
        for num, ph in enumerate(self.st_ph):
            try:
                for n_camp, phrases in enumerate(ph):
                    try:
                        report = self.get_report(uuid=phrases[0])
                        file = open(path + f"phrases_{num}_{n_camp}.{phrases[1]}", 'wb')
                        file.write(report.content)
                        file.close()
                        print('Сохранен', path + f"phrases_{num}_{n_camp}.{phrases[1]}")
                    except:
                        continue
            except:
                continue

        for num, attr in enumerate(self.st_attr):
            try:
                report = self.get_report(uuid=attr[0])
                file = open(path + f"attr_{num}.{attr[1]}", 'wb')
                file.write(report.content)
                file.close()
                print('Сохранен', path + f"campaigns_{num}.{attr[1]}")
            except:
                continue                

In [None]:
client_id = '2471117-1649840283864@advertising.performance.ozon.ru'
client_secret = 'aydWhTlynReq4FiDELQhpWC2-j2kfE7n80uOO_ByHt-0A7zAlskxUQFyCa_cp33W0lPvsNZIzwXoSaxh2Q'

In [None]:
ozon = Ozon_performance(client_id, client_secret)

In [None]:
ozon.auth

In [None]:
ozon.campaigns

In [None]:
ozon.objects

In [None]:
ozon.split_data(camp_lim=ozon.camp_lim)

In [None]:
ozon.split_time(date_from=ozon.date_from, date_to=ozon.date_to, day_lim=ozon.day_lim)

In [None]:
ozon.collect_data()

In [None]:
ozon.st_camp

In [None]:
ozon.st_ph

In [None]:
ozon.st_attr

In [None]:
ozon.st_med

In [None]:
ozon.st_pr

In [None]:
ozon.st_dai

In [None]:
ozon.save_data()