Устанавливаем зависимости

In [11]:
!pip3 install requests beautifulsoup4

Defaulting to user installation because normal site-packages is not writeable
Collecting ischedule
  Downloading ischedule-1.2.7-py3-none-any.whl (5.3 kB)
Installing collected packages: ischedule
Successfully installed ischedule-1.2.7


Создаем базу и применяем миграции

In [28]:
import sqlite3, os

create_table_statement = [
    """
    CREATE TABLE IF NOT EXISTS parse_iteration_info(
        id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
        started_at DATETIME DEFAULT CURRENT_TIMESTAMP,
        ended_at DATETIME DEFAULT NULL,
        successfully BOOL DEFAULT FALSE,
        errors TEXT DEAFULT ""
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS phishing_urls (
        id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
        url TEXT,
        brand VARCHAR(256),
        time DATETIME,
        iterationId INTEGER NOT NULL,
        FOREIGN KEY(iterationId) REFERENCES parse_iteration_info(id)
    );
    """
]

def create_table(conn):
    cur = conn.cursor()
    for st in create_table_statement:
        res = cur.execute(st)
    cur.close()
    conn.commit()


def create_sqlite_database(filename):
    conn = None
    try:
        conn = sqlite3.connect(filename)
    except sqlite3.Error as e:
        print(e)
    return conn

db_name = "openfish_parsing_data.db"

conn = create_sqlite_database(db_name)
if conn == None:
    print(f"Error: Can't connect to db {db_name}")
    os.exit(1)

print(f"Successfuly connect to db {db_name}")

create_table(conn)
print(f"Successfuly create tables if not exists in {db_name}")

conn.close()


Successfuly connect to db openfish_parsing_data.db
Successfuly create tables if not exists in openfish_parsing_data.db


Запускаем парсер

In [18]:
import sqlite3, time, traceback, pytz
from datetime import datetime, timezone, timedelta
import requests as re
from bs4 import BeautifulSoup as bs

db_name = "openfish_parsing_data.db"

def create_parse_iteration_info(conn, start_time):
    try:
        query = "INSERT INTO parse_iteration_info (started_at)  VALUES (?)"
        cur = conn.cursor()
        cur.execute(query , (start_time,))
        cur.close()
        conn.commit()
    except sqlite3.Error as e:
        print(traceback.format_exc())
        print(f"Failed when try to deal sql query: {query}")
        return e

    return cur.lastrowid


def update_parse_iteration_info(conn, id, ended_time = datetime.now(timezone.utc), succesfully = True, error_text = ""):
    try:
        query = "UPDATE parse_iteration_info SET ended_at = ?, successfully = ?, errors = ? where id = ?"
        cur = conn.cursor()
        cur.execute(query , (ended_time, succesfully, error_text, id))
        cur.close()
        conn.commit()
    except sqlite3.Error as e:
        print(traceback.format_exc())
        print(f"Failed when try to deal sql query: {query}")
        return e

    return cur.lastrowid
    

def get_parse_iteration_info_last_timestamp(conn):
    try:
        query = "SELECT ended_at FROM parse_iteration_info WHERE successfully ORDER BY id DESC LIMIT 1"
        cur = conn.cursor()
        res = cur.execute(query)
        ts = res.fetchall()
        cur.close()
        conn.commit()
    except sqlite3.Error as e:
        print(traceback.format_exc())
        print(f"Failed when try to deal sql query: {query}")
        return e

    if len(ts) == 0:
        return None
    
    ts = datetime.strptime(ts[0][0], "%Y-%m-%d %H:%M:%S.%f%z").replace(tzinfo=pytz.utc)
    return ts

def create_phishing_urls(conn, data):
    try:
        query = "INSERT INTO phishing_urls (url, brand, time, iterationId)  VALUES (?, ?, ?, ?)"
        cur = conn.cursor()
        cur.executemany(query , data)
        cur.close()
        conn.commit()
    except sqlite3.Error as e:
        print(traceback.format_exc())
        print(f"Failed when try to deal sql query: {query}")
        return e
    
    return cur.lastrowid

def select_phising_urls(conn, url, brand, time=None):
    try:
        cur = conn.cursor()
        if time == None:
            query = "SELECT * FROM phishing_urls WHERE url=? AND brand=?"
            res = cur.execute(query, (url, brand,)).fetchall()
        else:
            query = "SELECT * FROM phishing_urls WHERE url=? AND brand=? AND time=?"
            res = cur.execute(query, (url, brand, time)).fetchall()
        cur.close()
        conn.commit()
    except sqlite3.Error as e:
        print(traceback.format_exc())
        print(f"Failed when try to deal sql query: {query}")
        return e
    
    return res
    
def get_html(url):
    headers = {
        "Accept": "text/html",
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 12_3_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.4 Safari/605.1.15"
    }
    req = re.get(url, headers)
    if req.status_code != 200:
        print(f"Status code != 200. Url: {url}, status_code: {req.status_code}")
        return ""
    return req.text

def parse_data(html):
    url_data = []

    try:
        soup = bs(html)
        table_rows = soup.find("table", class_="pure-table pure-table-striped").find("tbody").find_all("tr")
        # tbody = table.find("tbody")
        for table_row in table_rows:
            tds = table_row.find_all("td")
            url, brand, time = tds[0].string, tds[1].string, tds[2].string
            time_formated = datetime.strptime(time, "%H:%M:%S")
            time_formated = datetime.now(timezone.utc).replace(hour=time_formated.hour, minute=time_formated.minute, second=time_formated.second, microsecond=0)
            
            # на случай если запуск придется на переходный период между сутками
            if time_formated > datetime.now(timezone.utc):
                time_formated = time_formated - timedelta(days=1)

            url_data.append({"url": url, "brand": brand, "time_formated": time_formated})
    except Exception as e:
        print(traceback.format_exc())
        return e

    return url_data

def start_parse_iteration():
    conn = sqlite3.connect(db_name)

    start_datetime = datetime.now(timezone.utc)
    print(f"New parse iteration started at {start_datetime}")

    last_time = get_parse_iteration_info_last_timestamp(conn)

    if last_time == sqlite3.Error:
        print(f"Can't start parsing due error: {last_time}")
        conn.close()
        return
    
    print(f"Last time of parse is {last_time}")
    
    id = create_parse_iteration_info(conn, start_datetime)
    if id == sqlite3.Error:
        last_id = update_parse_iteration_info(id, succesfully=False, error_text=id)
        if last_id == sqlite3.Error:
            print("Can't write failed status to db.")
        conn.close()
        return
    
    print(f"Parse iteration ID is {id}")
    
    html = get_html("https://openphish.com/")
    if html == "":
        print(f"Can't parse data from site. Error: Empty html")
        last_id = update_parse_iteration_info(id, succesfully=False, error_text="Empty html")
        if last_id == sqlite3.Error:
            print("Can't write failed status to db.")
        conn.close()
        return
    
    url_data = parse_data(html)
    if not isinstance(url_data, list):
        print(f"Can't parse data from site. Error: {url_data}")
        last_id = update_parse_iteration_info(id, succesfully=False, error_text=url_data)
        if last_id == sqlite3.Error:
            print("Can't write failed status to db.")
        conn.close()
        return
    
    print(f"Successfully got {len(url_data)} new urls from site")
    
    query_data = []

    for data in url_data:
        if last_time is not None and data['time_formated'] < last_time:
            continue

        existing_url = select_phising_urls(conn, data['url'], data['brand'])
        if existing_url == sqlite3.Error:
            print("Can't check unique.")
            last_id = update_parse_iteration_info(id, succesfully=False, error_text=existing_url)
            if last_id == sqlite3.Error:
                print("Can't write failed status to db.")
            conn.close()
            return
         
        if len(existing_url) != 0:
            continue

        query_data.append((data['url'], data['brand'], data['time_formated'], id))
    
    print(f"Keep {len(query_data)} after filter by time and unique")
    
    if len(query_data) > 0:
        last_id = create_phishing_urls(conn, query_data)
        if last_id == sqlite3.Error:
            print("Can't write urls to db.")
            last_id = update_parse_iteration_info(id, succesfully=False, error_text=last_id)
            if last_id == sqlite3.Error:
                print("Can't write failed status to db.")
            conn.close()
            return


    
    last_id = update_parse_iteration_info(conn, id, ended_time=datetime.now(timezone.utc))
    if last_id == sqlite3.Error:
        print("Can't write success status to db.")
    conn.close()

    print(f"Parse iteration finished successfully")

while True:
    start_parse_iteration()
    time.sleep(300)



New parse iteration started at 2024-09-15 21:41:01.365476+00:00
Last time of parse is 2024-09-15 21:16:07.607605+00:00
Parse iteration ID is 242
Successfully got 30 new urls from site
Keep 10 after filter by time and unique
Parse iteration finished successfully


KeyboardInterrupt: 

Получаем необходимую информацию

In [36]:
import sqlite3, time, traceback, pytz

conn = sqlite3.connect(db_name)

query_get_parsing_time = """
SELECT MIN(started_at) AS f,
       MAX(ended_at) AS l
FROM parse_iteration_info;
"""

unique_urls_query = """
SELECT COUNT(*) FROM phishing_urls;
"""

top_attacking_brand_query = """
SELECT brand, COUNT(brand) FROM phishing_urls
GROUP BY brand 
ORDER BY COUNT(brand) DESC
LIMIT 3
"""

cur = conn.cursor()
parsing_time_query_result = cur.execute(query_get_parsing_time).fetchall()
cur.close()

cur = conn.cursor()
unique_urls_query_result = cur.execute(unique_urls_query).fetchall()
cur.close()

cur = conn.cursor()
top_attacking_brand_query_result = cur.execute(top_attacking_brand_query).fetchall()
cur.close()

conn.close()

parsing_start_time = datetime.strptime(parsing_time_query_result[0][0], "%Y-%m-%d %H:%M:%S.%f%z").replace(tzinfo=pytz.utc)
parsing_end_time = datetime.strptime(parsing_time_query_result[0][1], "%Y-%m-%d %H:%M:%S.%f%z").replace(tzinfo=pytz.utc)
print("Результаты работы:")
print(f"Время начала парсинга: {parsing_start_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Время окончания парсинга: {parsing_end_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Количество уникальных URL сайтов за данный период: {unique_urls_query_result[0][0]}")
print("Топ 3 Наиболее часто атакуемых брендов:")
i = 1
for d in top_attacking_brand_query_result:
    print(f'{i}: {d[0]} - {d[1]}')
    i += 1


Результаты работы:
Время начала парсинга: 2024-09-14 22:18:48
Время окончания парсинга: 2024-09-15 21:41:01
Количество уникальных URL сайтов за данный период: 441
Топ 3 Наиболее часто атакуемых брендов:
1: Generic/Spear Phishing - 76
2: Crypto/Wallet - 48
3: Facebook, Inc. - 32
