In [1]:
import requests
from bs4 import BeautifulSoup as BS
import re as re
import os
import math
from datetime import datetime
import csv

In [5]:
class Parsing(object):
    def __init__(self):
        # Initialize the web_sites and visuals attributes
        self.web_sites = {
            1: "Egemen.kz",
            2: "Azattyk.org",
            3: "Rus.Azzatyk.org",
            4: "Tengri-news.kz",
            4: "Karavan.kz",
            5: "Kaz.Karavan.kz"
            5: "Informburo",
            6: "Nur-Kz",
            7: "Sputnik.kz",
            8: "Kazakstanskaya pravda"
        }
        
        self.visuals = {
            1: "Line-Chart",
            2: "Bar",
            3: "Pie-Chart",
            4: "Correlation Heat-Map"
        }

        # Map website choices to corresponding analysis functions
        self.analysis_functions = {
            1: self.analyze_egemen,
            2: self.analyze_azattyk,
            3: self.analyze_tengri_news,
            4: self.analyze_karavan,
            5: self.analyze_informburo,
            6: self.analyze_nur_kz,
            7: self.analyze_sputnik,
            8: self.analyze_kazakstanskaya_pravda
        }
        
        self.key_word = input("Введите ключевое слово по которому вы хотите сделать анализ сайтов:\n")

    def choose_website(self):
        print("Доступны следующие сайты для анализа:\n")
        for key, value in self.web_sites.items():
            print(f"{key}: {value}")
    
        try:
            # Уведомить пользователя о формате ввода
            user_input = input("Выберите один или несколько сайтов для анализа, указав их номера через запятую: ")
        
            # Парсинг пользовательского ввода
            choices = [
                int(choice.strip()) 
                for choice in user_input.split(',') 
                if choice.strip().isdigit()
            ]
        
            valid_choices = []
            invalid_choices = []
        
            for web_choice in choices:
                if web_choice in self.web_sites:
                    valid_choices.append(web_choice)
                else:
                    invalid_choices.append(web_choice)
        
            # Обработка корректных и некорректных выборов
            if valid_choices:
                for web_choice in valid_choices:
                    selected_site = self.web_sites[web_choice]
                    print(f"\nВы выбрали: {selected_site}")
                    # Вызов соответствующей функции анализа
                    self.analysis_functions[web_choice]()
        
            if invalid_choices:
                print(f"\nНекорректные номера сайтов: {', '.join(map(str, invalid_choices))}. "
                      "Пожалуйста, проверьте доступные сайты.")
    
        except Exception as e:
            print(f"Ошибка: {e}. Попробуйте снова.")



    # Example functions for each website analysis
    def analyze_egemen(self):
        print("Analyzing Egemen.kz...")

    def analyze_azattyk(self):
        print("Analyzing Azattyk.org...")

    def analyze_tengri_news(self):
        print("Analyzing Tengri-news.kz...")

    def analyze_karavan(self):
        print("Analyzing Karavan.kz...")

    def analyze_informburo(self):
        print("Analyzing Informburo...")

    def analyze_nur_kz(self):
        print("Analyzing Nur-Kz...")

    def analyze_sputnik(self):
        print("Analyzing Sputnik.kz...")

    def analyze_kazakstanskaya_pravda(self):
        print("Analyzing Kazakstanskaya pravda...")

# Example Usage
parser = Parsing()
parser.choose_website()


Введите ключевое слово по которому вы хотите сделать анализ сайтов:
Ljyjh
Доступны следующие сайты для анализа:

1: Egemen.kz
2: Azattyk.org
3: Tengri-news.kz
4: Karavan.kz
5: Informburo
6: Nur-Kz
7: Sputnik.kz
8: Kazakstanskaya pravda
Выберите один или несколько сайтов для анализа, указав их номера через запятую: 1,2,3

Вы выбрали: Egemen.kz
Analyzing Egemen.kz...

Вы выбрали: Azattyk.org
Analyzing Azattyk.org...

Вы выбрали: Tengri-news.kz
Analyzing Tengri-news.kz...


In [28]:
menu = {
    1: "Egemen.kz",
    2: "Azattyk"
}

print(menu)
num = input("Some:")
choices = [int(choice.strip()) for choice in num.split(",") if choice.strip().isdigit()]
print(choices)

for choice in choices:
    if choice in menu:
        selected = menu[choice]
        print(f"Selected:{selected}")
    else:
        print(f"{choice} does not exist")


{1: 'Egemen.kz', 2: 'Azattyk'}
Some:1,2
[1, 2]
Selected:Egemen.kz
<class 'int'>
Selected:Azattyk
<class 'int'>


In [None]:
from __future__ import print_function
import PyPEC
from serial import SerialException
import misc.pecutil as pecutil
import datetime, time
import os, traceback
import _winreg as winreg
import re

_raw_input = pecutil._raw_input
script_name = "CFR_EBC_testing"
version = (0, 2, "2024", "07", "16")  # (major, minor, "yyyy", "mm", "dd")
dir_top = os.path.dirname(os.path.realpath(__file__))

logtxt_dirname = "Log"  # the folder name where the separate sn logs will be stored
logtxt_dir = dir_top + "\\" + logtxt_dirname

# Need to initialize the connection 

def serial_ports():
    """ Uses the Win32 registry to return an
        iterator of serial (COM) ports
        existing on this computer.
    """
    path = 'HARDWARE\\DEVICEMAP\\SERIALCOMM'
    try:
        key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path)
    except WindowsError:
        raise NameError("Registry folder HKEY_LOCAL_MACHINE\\HARDWARE\\DEVICEMAP\\SERIALCOMM is not present.")
    result = []
    for i in xrange(1000):
        try:
            val = winreg.EnumValue(key, i)
            result.append(str(val[1]))
        except EnvironmentError:
            break
    return result
    
class TestError(Exception):
    def __init__(self, txt=None):
        self.txt = txt

    def __str__(self):
        return (self.txt)


class Test(object):
    header = ["9NC", "Firmware", "WritePecprom", "PwrCurr[mA]", "FlashCurr[mA]", "Flash","VM5V[V]", "VM3V3[V]", "VM12V[V]",
              "PowerSupplyTest", "RelayTest", "CONSAF1", "RPM[rpm]", "FAN1[rpm]", "FAN2[rpm]", "FAN3[rpm]", "FAN4[rpm]", "CANADDRESS", "BackupTest" ]
             
    
    menu = {0: ('Save data to CSV file and abort', 'UserAbort'),
            1: ('Flash', 'Flash'),
            2: ('USB service port test', 'UsbServicePort'),
            3: ('Write PECPROM', 'WritePecprom'),
            4: ('Power Supply Test', 'PowerSupplyTest'),        
            5: ('Relay Test', 'RelayTest'),
            6: ('FANs Test', 'FANsTest'),
            7: ('CAN+ADDRESS', 'CANADDRESS'),
            8: ('Backup test', 'BackupTest')}

    dict_pattern = {1 : [1, 2, 3, 4, 5, 6, 7, 8]}

    def __init__(self, cardSN, config):
        # Log
        logcsv = dir_top + "\\" + "{}_yyyymmdd.csv".format(script_name)
        self.LOGCSV = pecutil.Result(cardSN, self.header, logcsv)
        self.LOGCSV.add("Result", "FAILED")
        self.LOGCSV.add("ScriptName", script_name)
        self.LOGCSV.add("ScriptVersion", "{0[0]}.{0[1]}-{0[2]}{0[3]}{0[4]}".format(version))
        self.LOGCSV.add("SerialNumber", cardSN)
        # Configuration
        self.cardSN = cardSN
        self.cardNC = config.get("cardNC")
        self.LOGCSV.add("9NC", self.cardNC)
        # Devices
        self.SER = None
        # Measurement specifications
        self.limitdict = {}
        self.limitdict["VM5V[V]"] = [4, 6]           # [V]
        self.limitdict["VM3V3[V]"] = [3, 4]          # [V]
        self.limitdict["VM12V[V]"] = [11, 13]        # [V]
        self.limitdict["RPM[rpm]"] = [1850, 2200]         # [rpm]
        self.limitdict["FAN1[rpm]"] = [1850, 2200]         # [rpm]
        self.limitdict["FAN2[rpm]"] = [1850, 2200]         # [rpm]
        self.limitdict["FAN3[rpm]"] = [1850, 2200]         # [rpm]
        self.limitdict["FAN4[rpm]"] = [1850, 2200]         # [rpm]
        
    def WritePecprom(self):
        self.serial_init()
        print("\nWriting EEPROM. Please wait for 10sec ... ", end='')
        timeout_default =  self.SER.com.timeout
        self.SER.com.timeout = 1
        eep_fac = self.SER.writeserial("eep -fac -i")
        self.SER.writeserial("eep -fw -i")
        self.SER.writeserial("fac -a 4 -i")
        self.SER.writeserial("fac -a 4 -ser {}".format(self.cardSN))
        self.SER.writeserial("fac -a 4 -9nc {}".format(self.cardNC))
        lines = self.SER.writeserial("fac -a 4 -sd")
        self.SER.com.timeout = timeout_default
        info = self.SER.writeserial("facinfo -p")
        print(info)
        lines = lines.replace(",", ";").split(self.SER.EOL_READ)
        if (lines[3].split()[-1] == "{}".format(self.cardNC)) and (lines[4].split()[-1] == "{}".format(self.cardSN)):
            self.LOGCSV.add("WritePecprom", "OK")
            print("PASSED")
        else:
            print("FAILED (Serial number, 9NC cannot be stored in the EEPROM.)")
            self.stop_test("WritePecprom")
            
    def Flash(self):
        self.serial_init()
        if not pecutil.get_user_input("Does this device flutil.get_user_input("Does this device flashed before ?", type="boolean"):
            print("Need to flash board before testing ... FAILED")
            self.stop_test("Flash")
        
        pwr_curr = pecutil.get_user_input("Type in the current consumption before flashing [mA]:", type="integer", min=0)
        self.LOGCSV.add("PwrCurr[mA]",pwr_curr)
        flash_curr = pecutil.get_user_input("Type in the current consumption after flashing [mA]:", type="integer", min=0)
        self.LOGCSV.add("FlashCurr[mA]",flash_curr)
        print("Flash... PASSED")
        self.LOGCSV.add("Flash","OK")
    
    def PowerSupplyTest(self):
        self.serial_init()
        adc = self.SER.writeserial("adc")
        adc = adc.replace(",", ";").split(self.SER.EOL_READ)
        VM5V = adc[6].split()[-1]
        VM3V3 = adc[7].split()[-1]
        VM12V = adc[8].split()[-1]
        
        dict_text = "VM5V[V]"
        self.LOGCSV.add(dict_text, VM5V)
        if self.check_limit(dict_text):
            print("Measured {} V".format(VM5V))
        else:
            print("FAILED (measured {} V, accepted range is {} - {} V)".format( VM5V, *self.limitdict[dict_text]))
            self.stop_test(dict_text)
        dict_text = "VM3V3[V]"
        self.LOGCSV.add(dict_text, VM3V3)
        if self.check_limit(dict_text):
            print("Measured {} V".format(VM3V3))
        else:
            print("FAILED (measured {} V, accepted range is {} - {} V)".format( VM3V3, *self.limitdict[dict_text]))
            self.stop_test(dict_text)
        dict_text = "VM12V[V]"
        self.LOGCSV.add(dict_text, VM12V)
        if self.check_limit(dict_text):
            print("Measured {} V".format(VM12V))
            print("Power Supply test PASSED...")
            self.LOGCSV.add("PowerSupplyTest", "OK")
        else:
            print("FAILED (measured {} V, accepted range is {} - {} V)".format( VM12V, *self.limitdict[dict_text]))
            self.stop_test(dict_text)
            
    def RelayTest(self):
        self.serial_init()
        pecutil.get_user_input("Power 24V on CONSAF1 and short CONSAF2, press Enter")
        print("GPI on test ...", end='')
        lines = self.SER.writeserial("gpio -p")
        lines_conv = (re.findall("\d+", lines))
        lines_consaf_on = int(lines_conv[1])
        lines_off = int(lines_conv[2])
        print(lines_conv)
        print(lines_off)
        pecutil.get_user_input("Open CONSAF2 and press Enter")
        print("GPI on test ...", end='')
        print("Safety Relay is turned off...")
        lines = self.SER.writeserial("gpio -p")
        lines_conv = (re.findall("\d+", lines))
        print(lines_conv)
        lines_on = int(lines_conv[2])
        lines_consaf_off = int(lines_conv[1])
        if lines_off == 0 and lines_on == 1:
            if lines_consaf_off == 0 and lines_consaf_on == 1:
                print("CONSAF1... PASSED")
                print("Relay test.... PASSED ")
            else: 
                print("CONSAF1...FAILED")
                self.stop_test("RelayTest")
            print(lines_on)
            self.LOGCSV.add("RelayTest", "OK")
            self.LOGCSV.add("CONSAF1", "OK")
        else:
            print("FAILED")
            self.stop_test("RelayTest")
        
            
    def FANsTest(self):
        self.serial_init()
        pecutil.get_user_input("Power the fan with 24V power supply and press Enter")
        print("Fans on test ... wait 10 sec", end='')
        time.sleep(12)
        lines = self.SER.writeserial("fan -p")
        lines_conv = (re.findall("\d+",lines))
        fan = lines_conv[3]
        fan_int = int(fan)
        print (lines_conv)
        dict_text = "RPM[rpm]"
        self.LOGCSV.add(dict_text, fan_int)
        if self.check_limit(dict_text):
            print("\nSpeed of FANs {} rpm... PASSED".format(fan_int))
        else:
            print("\nFAILED ... Measured speed is {} rpm, accepted range is {} - {} rpm".format(fan_int, *self.limitdict[dict_text]))
            self.stop_test("FANsTest")
        pecutil.get_user_input("Switch position to FAN1 (CON9) and press Enter")
        print("Wait 3 sec....", end = '')
        time.sleep(3)
        dict_text = "FAN1[rpm]"
        lines = self.SER.writeserial("fan -p")
        print(lines)
        lines_conv = (re.findall("\d+",lines))
        fan1 = lines_conv[3]
        fan1_int = int(fan1)
        self.LOGCSV.add(dict_text, fan1_int)
        if self.check_limit(dict_text):
            print("\nSpeed of FAN1 is {} rpm... PASSED".format(fan1_int))
        else:
            print("\nFAILED ... Measured speed is {} rpm, accepted range is {} - {} rpm".format(fan1_int, *self.limitdict[dict_text]))
            self.stop_test("FANsTest")
        pecutil.get_user_input("Switch position to FAN2 (CON7) and press Enter")
        print("Wait 3 sec....", end = '')
        time.sleep(3)
        dict_text = "FAN2[rpm]"
        lines = self.SER.writeserial("fan -p")
        lines_conv = (re.findall("\d+",lines))
        fan2 = lines_conv[5]
        fan2_int = int(fan2)
        self.LOGCSV.add(dict_text, fan2_int)
        if self.check_limit(dict_text):
            print("\nSpeed of FAN2 is {} rpm... PASSED".format(fan2_int))
        else:
            print("\nFAILED ... Measured speed is {} rpm, accepted range is {} - {} rpm".format(fan2_int, *self.limitdict[dict_text]))
            self.stop_test("FANsTest")
        pecutil.get_user_input("Switch position to FAN3 (CON4) and press Enter")
        print("Wait 3 sec....", end = '')
        time.sleep(3)
        dict_text = "FAN3[rpm]"
        lines = self.SER.writeserial("fan -p")
        lines_conv = (re.findall("\d+",lines))
        fan3 = lines_conv[7]
        fan3_int = int(fan3)
        self.LOGCSV.add(dict_text, fan3_int)
        if self.check_limit(dict_text):
            print("\nSpeed of FAN3 is {} rpm... PASSED".format(fan3_int))
        else:
            print("\nFAILED ... Measured speed is {} rpm, accepted range is {} - {} rpm".format(fan3_int, *self.limitdict[dict_text]))
            self.stop_test("FANsTest")
        pecutil.get_user_input("Switch position to FAN4 (CON2) and press Enter")
        print("Wait 3 sec....", end = '')
        time.sleep(3)
        dict_text = "FAN4[rpm]"
        lines = self.SER.writeserial("fan -p")
        lines_conv = (re.findall("\d+",lines))
        fan4 = lines_conv[9]
        fan4_int = int(fan4)
        self.LOGCSV.add(dict_text, fan4_int)
        if self.check_limit(dict_text):
            print("\nSpeed of FAN4 is {} rpm... PASSED".format(fan4_int))
        else:
            print("\nFAILED ... Measured speed is {} rpm, accepted range is {} - {} rpm".format(fan4_int, *self.limitdict[dict_text]))
            self.stop_test("FANsTest")
        
    def CANADDRESS(self):
        self.serial_init()
        if not pecutil.get_user_input("Was the CAN+ADDRESS tested ?", type="boolean"):
            print("Need to test ... FAILED")
            self.stop_test("CANADDRESS")
        if not pecutil.get_user_input("Does the communication present on Address 1,2,3 ?", type="boolean"):
            print ("No communication .... FAILED")
        print("CAN+ADDRESS ... PASSED")
        self.LOGCSV.add("CANADDRESS", "OK")
   
    
    def BackupTest(self):
        """
        Back up  test
        """
        self.serial_init()
        print("\nBack-up test is started ...")
        pecutil.get_user_input("Turn on the backup 12V DC supply and press Enter")
        dict_text = "BackupTest"
        if not pecutil.get_user_input("Back-up is connected and D704 LED lights up?", type="boolean"):
            print("Back-up test ... FAILED")
            self.stop_test(dict_text + " (init)")
        pecutil.get_user_input("Turn off the 24V power supply and press Enter")
        lines = self.SER.writeserial()
        if lines.find("CFR>") < 0:
            print("No communication with the board")
            if not pecutil.get_user_input("RED LED next to MCU turned on and GREEN LED started blinking", type="boolean"):
                print("Back-up test ... FAILED")
                self.stop_test(dict_text + " (init)")
            pecutil.get_user_input("Press reset button and press Enter")
            if not pecutil.get_user_input("Relay switched off and are all LED's off?", type="boolean"):
                print("Back-up test ... FAILED")
                self.stop_test(dict_text + " (init)")
            print("Back-up test ... PASSED")
            self.LOGCSV.add(dict_text, "OK")
        else:
            print ("Still communication with the board")
            print ("Back-up test ...FAILED")
            self.stop_test(dict_text)
    
    def UsbServicePort(self):
        self.serial_init()
        lines = self.SER.writeserial()
        if lines.find("CFR>") > -1:
            version = self.SER.get_fw()
            self.LOGCSV.add("Firmware", version)
            if version.find("Built") > -1:
                print("\nChecking RS232 communication ... PASSED")
            else:
                print("\nChecking RS232 communication ... FAILED")
                self.stop_test("UsbServicePort (firmware)")
        else:
            print("\nChecking RS232 communication ... FAILED")
            print("Try restarting the program if the failure occurs many times.")
            self.stop_test("UsbServicePort")    
            
    def call(self, name):
        return self.__getattribute__(self.menu[name][1])

    def stop_test(self, error_message=""):
        self.LOGCSV.add("Error", error_message)
        raise TestError(txt=error_message)


    def serial_init(self):
        if self.SER is None:
            base_ports = serial_ports()
            print("Press the reset button (or turn on the power) and wait 10sec.")
            while True:
                new_ports = serial_ports()
                if len(new_ports) - len(base_ports) == 0:
                    pass
                elif len(new_ports) - len(base_ports) == -1:
                    base_ports = new_ports
                elif len(new_ports) - len(base_ports) == 1:
                    time.sleep(5)
                    comport = list(set(new_ports) - set(base_ports))[0]
                    break
                else:
                    raise NotImplementedError("Too many COM ports appeared.")
            while True:
                try:
                    self.SER = PyPEC.cfr.pb_serial_wa(comport)
                    #instead of 
                    #  pc.init(PyPEC.serial, comport)
                    #  pc.ser.init(PyPEC.cfr.pb)
                    #  self.SER = pc.ser.pb
                    break
                except AttributeError:
                    print("Serial line is not connected.")
                    comport = "COM{}".format(getCOM())
                    
    def UserAbort(self):
        '''
        When the user selects the required tests from the menu, UserAbort is used to terminate the tests
        of the current serial number and save the log into the CSV logfile.
        '''
        print("User Abort.")
        self.stop_test("User Abort")
        
    def check_limit(self, hdr):
        try:
            if self.limitdict[hdr][0] <= float(self.LOGCSV.get(hdr)) <= self.limitdict[hdr][1]:
                return True
            else:
                return False
        except (KeyError, ValueError, TypeError):
            return False

    
    def UsbServicePort(self):
        self.serial_init()
        lines = self.SER.writeserial()
        if lines.find("CFR>") > -1:
            version = self.SER.get_fw()
            self.LOGCSV.add("Firmware", version)
            if version.find("Built") > -1:
                print("\nChecking RS232 communication ... PASSED")
            else:
                print("\nChecking RS232 communication ... FAILED")
                self.stop_test("UsbServicePort (firmware)")
        else:
            print("\nChecking RS232 communication ... FAILED")
            print("Try restarting the program if the failure occurs many times.")
            self.stop_test("UsbServicePort")
            
        
    
                    
if __name__ == '__main__':
    try:
        print("ScriptVersion {0[0]}.{0[1]}-{0[2]}{0[3]}{0[4]}".format(version))
        CFG = pecutil.ConfigFile(script_name + ".cfg")
        pattern = CFG.test_pattern
        com = CFG.com

        # Make TXT log folder
        if not(os.path.exists(logtxt_dir)):
            os.makedirs(logtxt_dir)

        # Initializing Serial line
        # pc = PyPEC.pc.pc()  # serial line is initialized in a different way in case of the CFR boards

        while True:
            # Board specific commands
            try:
                TestRun.SER.com_close()
                TestRun.SER = None
            except (NameError, AttributeError):
                pass
            
            # Initial settings
            cardSN = pecutil.geFirmwaretSN("Scan in the SerialNumber of the board > ")
            runtime_start = time.time()
            TestRun = Test(cardSN, CFG)
            TestRun.LOGCSV.add("TestPattern", pattern)

            # Generate TXT log filename
            logtxt_name = "{}.txt".format(cardSN)
            logtxt = logtxt_dir + "\\" + logtxt_name

            # Run the desired tests
            retry = ['retry', 'r', 'R']
            abort = ['abort', 'a', 'A']
            skip = ['skip', 's', 'S']
            skipped = []
            while True:
                try:
                    if pattern == 0:  # manually selected tests by the user
                        tests_to_run = pecutil.usermenu(TestRun.menu)
                    else:  # tests in the predefined order
                        tests_to_run = TestRun.dict_pattern[pattern]
                    for x in tests_to_run:
                        if x == 0:
                            TestRun.UserAbort()
                        inp = 'retry'
                        while inp in retry:
                            try:
                                TestRun.call(x)()
                                inp = 'next'
                            except PyPEC.cfr.ReadSerialError:
                                # Saving results into the TXT logfile
                                try:
                                    TestRun.SER.LOG.save(logtxt)
                                except (AttributeError, NameError):
                                    pass
                                print("\nRS232 communication has been broken. The latest test will be restarted after the reset.")
                                TestRun.SER.com_close()
                                TestRun.SER = None
                            except (KeyboardInterrupt, TestError):  # exception during test run
                                while True:
                                    try:
                                        inp = _raw_input(">>> {} > Retry/Abort/Skip (r/a/s) > ".format(TestRun.menu[x][0]))
                                        if inp in retry + abort + skip:
                                            break
                                    except KeyboardInterrupt:
                                        print()
                                if inp in retry:
                                    TestRun.LOGCSV.remove("Error")
                                elif inp in skip:
                                    skipped.append(str(x))
                                    break
                                else:
                                    TestRun.UserAbort()
                    if pattern == 0:
                        TestRun.LOGCSV.add("Result", "n/a")
                    else:
                        if len(skipped) > 0:
                            print("\nA couple of tests were SKIPPED.")
                            TestRun.LOGCSV.add("Skipped", ",".join(skipped))
                        else:  
                            print("\nThe test has been finished SUCCESSFULLY. :)")
                            TestRun.LOGCSV.add("Result", "PASSED")
                        break
                except KeyboardInterrupt:  # exception during user menu selection
                    print("\nUser keyboard interrupt.")
                    TestRun.LOGCSV.add("Error", "KeyboardInterrupt")
                    break
                except TestError:
                    break
                except SerialException:
                    print("\n-----------------------------------------------------------------------------------")
                    print("RS232 communication has been broken and it cannot be recovered. Test is terminated.")
                    TestRun.LOGCSV.add("Error", traceback.format_exc())
                    break
                except Exception:  # catching all remaining exceptions
                    print("\n ----------------------------------\n Unexpected failure in the program.\n")
                    print(traceback.format_exc())
list[1]                    TestRun.LOGCSV.add("Error", traceback.format_exc())
                    break

            # Saving results into the TXT logfile
            try:
                TestRun.SER.LOG.save(logtxt)
            except (AttributeError, NameError):
                pass

            # Saving results into the CSV logfile
            TestRun.LOGCSV.add("Runtime", int(time.time() - runtime_start))
            while True:
                try:
                    TestRun.LOGCSV.save()
                    print("\nSN {} results are saved in {}.\n".format(cardSN, TestRun.LOGCSV.csvfile))
                    del TestRun
                    break
                except IOError:
                    try:
                        _raw_input("The CSV file is open in anopecutilther program. Close it and press Enter to continue saving the log.")
                    except KeyboardInterrupt:
                        print()
                except (NameError, AttributeError) as er:
                    print("\n ----- No data was saved. -----\n", er, "\n")
                    print(traceback.format_exc())
                    break
            

    except KeyboardInterrupt:  # exception during initialization and serial number input
        print("\n ----- No data was saved. -----\n User keyboard interrupt.\n")
        time.sleep(2)
    except Exception:  # catching all remaining exceptions
        print("\n ----- No data was saved. -----\n Unexpected failure in the program.\n")
        print(traceback.format_exc())
        _raw_input("\nPress Enter to close the session and terminate the SERIAL connection (if they were initialized).")
list[1]

In [6]:
import requests
from bs4 import BeautifulSoup as BS
import re
import os
import math
from datetime import datetime
import csv


class Parsing:
    def __init__(self):
        self.web_sites = {
            1: "Egemen.kz",
            2: "Azattyk.org",
            3: "Tengri-news.kz",
            4: "Karavan.kz",
            5: "Informburo",
            6: "Nur-Kz",
            7: "Sputnik.kz",
            8: "Kazakstanskaya pravda"
        }

        self.analysis_functions = {
            1: self.analyze_egemen,
            # Additional functions for other sites can be added here
        }

        self.key_word = input("Введите ключевое слово по которому вы хотите сделать анализ сайтов:\n")

    def choose_website(self):
        print("Доступны следующие сайты для анализа:\n")
        for key, value in self.web_sites.items():
            print(f"{key}: {value}")

        try:
            user_input = input("Выберите один или несколько сайтов для анализа, указав их номера через запятую: ")
            choices = [int(choice.strip()) for choice in user_input.split(',') if choice.strip().isdigit()]
            
            valid_choices = []
            invalid_choices = []

            for choice in choices:
                if choice in self.web_sites:
                    valid_choices.append(choice)
                else:
                    invalid_choices.append(choice)

            if valid_choices:
                for choice in valid_choices:
                    print(f"\nВы выбрали: {self.web_sites[choice]}")
                    if choice in self.analysis_functions:
                        self.analysis_functions[choice]()
                    else:
                        print(f"Функция анализа для {self.web_sites[choice]} еще не реализована.")
            
            if invalid_choices:
                print(f"Некорректные номера сайтов: {', '.join(map(str, invalid_choices))}.")
        
        except Exception as e:
            print(f"Ошибка: {e}. Попробуйте снова.")

    def analyze_egemen(self):
        print("Начинается анализ сайта Egemen.kz...")
        base_url = "https://egemen.kz"
        search_url = f"{base_url}/search?q={'+'.join(self.key_word.split())}"
        print(f"Ссылка по вашему запросу: {search_url}")

        storage_dir = os.path.join(os.getcwd(), self.key_word)
        os.makedirs(storage_dir, exist_ok=True)

        response = requests.get(search_url)
        soup = BS(response.text, "html.parser")
        article_founded = soup.find('small').text
        num_articles = int(re.search(r'\d+', article_founded).group())
        articles_per_page = 5
        pages = math.ceil(num_articles / articles_per_page)

        print(f"Число найденных статей: {num_articles}, страниц: {pages}")
        urls_list = []

        for page in range(1, pages + 1):
            full_url = f"{search_url}&page={page}"
            print(f"Обработка страницы: {full_url}")
            urls_list.extend(self.extract_urls(base_url, full_url))

        print(f"Всего URL для обработки: {len(urls_list)}")
        for url in urls_list:
            self.article_contents(url, storage_dir)

        self.save_to_csv(urls_list, storage_dir)

    @staticmethod
    def extract_urls(base_url, search_url):
        response = requests.get(search_url)
        soup = BS(response.text, "html.parser")
        divs = soup.find_all('div', class_='clearfix news-t flexBlock')
        return [base_url.rstrip('/') + div.a['href'] for div in divs if div.a]

    @staticmethod
    def article_contents(article_url, storage_dir):
        try:
            response = requests.get(article_url)
            soup = BS(response.text, "html.parser")

            title = soup.find('h1').get_text(strip=True) if soup.find('h1') else "No Title"
            author = soup.find('div', class_='name-auth').get_text(strip=True) if soup.find('div', class_='name-auth') else "No Author"
            date_tag = soup.find('meta', itemprop="datePublished")
            date_published = date_tag['content'] if date_tag and date_tag.has_attr('content') else "No Date"
            article_body = soup.find("div", itemprop="articleBody")
            article_text = article_body.get_text(separator="\n", strip=True) if article_body else "No Content"

            valid_title = re.sub(r'[\\/:"*?<>|]+', '', title)
            filename = os.path.join(storage_dir, f"{valid_title}.txt")
            with open(filename, "w", encoding="utf-8") as file:
                file.write(f"{title}\n\n")
                file.write(article_text)

            print(f"Статья '{title}' сохранена.")
        except Exception as e:
            print(f"Ошибка при обработке {article_url}: {e}")

    @staticmethod
    def save_to_csv(urls, storage_dir):
        csv_file = os.path.join(storage_dir, "articles.csv")
        headers = ["Title", "Date Published", "Author", "URL"]

        with open(csv_file, mode='w', newline='', encoding='utf-8') as file:
            writer = csv.DictWriter(file, fieldnames=headers)
            writer.writeheader()

            for url in urls:
                article_details = Parsing.extract_article_details(url)
                writer.writerow(article_details)

        print("CSV файл успешно сохранен.")

    @staticmethod
    def extract_article_details(article_url):
        try:
            response = requests.get(article_url)
            soup = BS(response.text, "html.parser")

            title = soup.find('h1').get_text(strip=True) if soup.find('h1') else "No Title"
            date_tag = soup.find('meta', itemprop="datePublished")
            date_published = date_tag['content'] if date_tag and date_tag.has_attr('content') else "No Date"
            author = soup.find('div', class_='name-auth').get_text(strip=True) if soup.find('div', class_='name-auth') else "No Author"

            return {"Title": title, "Date Published": date_published, "Author": author, "URL": article_url}
        except Exception as e:
            print(f"Ошибка при обработке {article_url}: {e}")
            return {"Title": "Error", "Date Published": "Error", "Author": "Error", "URL": article_url}


if __name__ == "__main__":
    parser = Parsing()
    parser.choose_website()


Введите ключевое слово по которому вы хотите сделать анализ сайтов:
Донор
Доступны следующие сайты для анализа:

1: Egemen.kz
2: Azattyk.org
3: Tengri-news.kz
4: Karavan.kz
5: Informburo
6: Nur-Kz
7: Sputnik.kz
8: Kazakstanskaya pravda
Выберите один или несколько сайтов для анализа, указав их номера через запятую: 1

Вы выбрали: Egemen.kz
Начинается анализ сайта Egemen.kz...
Ссылка по вашему запросу: https://egemen.kz/search?q=Донор
Число найденных статей: 73, страниц: 15
Обработка страницы: https://egemen.kz/search?q=Донор&page=1
Обработка страницы: https://egemen.kz/search?q=Донор&page=2
Обработка страницы: https://egemen.kz/search?q=Донор&page=3
Обработка страницы: https://egemen.kz/search?q=Донор&page=4
Обработка страницы: https://egemen.kz/search?q=Донор&page=5
Обработка страницы: https://egemen.kz/search?q=Донор&page=6
Обработка страницы: https://egemen.kz/search?q=Донор&page=7
Обработка страницы: https://egemen.kz/search?q=Донор&page=8
Обработка страницы: https://egemen.kz/searc