In [2]:
import random
import time
from typing import List, Union

class properties_of:
    def __init__(self, name: str, engine: str = "pandas"):
        """
        Inicializuojama klasė. Palaikomi du varikliai: 'pandas' ir 'pyspark'.

        :param name: Objektų grupės pavadinimas.
        :param engine: Variklis ('pandas' arba 'pyspark').
        """
        self.name = name
        self.engine = engine

        if engine == "pandas":
            import pandas as pd
            self.pd = pd
            self.df_property = pd.DataFrame(columns=["id", "property_id", "value"])
            self.df_property_type = pd.DataFrame(columns=["property_id", "description"])
        elif engine == "pyspark":
            from pyspark.sql import SparkSession
            self.spark = SparkSession.builder.master("local").appName("PropertiesDB").getOrCreate()
            self.df_property = self.spark.createDataFrame([], schema="id STRING, property_id STRING, value STRING")
            self.df_property_type = self.spark.createDataFrame([], schema="property_id STRING, description STRING")
        else:
            raise ValueError("Nepalaikomas variklis: pasirinkite 'pandas' arba 'pyspark'.")

    def add_property_type(self, property_id: str, description: str) -> None:
        """
        Pridedamas naujas savybės tipas į tipų lentelę.

        :param property_id: Savybės ID.
        :param description: Savybės aprašymas.
        """
        if self.engine == "pandas":
            self.df_property_type = self.pd.concat([
                self.df_property_type,
                self.pd.DataFrame({"property_id": [property_id], "description": [description]})
            ], ignore_index=True).drop_duplicates(subset=["property_id"])
        elif self.engine == "pyspark":
            new_row = self.spark.createDataFrame([(property_id, description)], schema="property_id STRING, description STRING")
            self.df_property_type = self.df_property_type.union(new_row)

    def add_property(self, id: str, property_id: str, value: str, check_property_type: bool = False) -> None:
        """
        Pridedama savybė konkrečiam objektui.

        :param id: Objekto ID.
        :param property_id: Savybės ID.
        :param value: Savybės reikšmė.
        :param check_property_type: Tikrinti, ar savybės tipas egzistuoja.
        """
        if self.engine == "pandas":
            if check_property_type and property_id not in self.df_property_type["property_id"].values:
                raise ValueError(f"Savybės ID '{property_id}' nėra savybių tipų lentelėje.")
            self.df_property = self.pd.concat([
                self.df_property,
                self.pd.DataFrame({"id": [id], "property_id": [property_id], "value": [value]})
            ], ignore_index=True).drop_duplicates(subset=["id", "property_id"])
        elif self.engine == "pyspark":
            if check_property_type:
                existing = self.df_property_type.filter(f"property_id = '{property_id}'").count() > 0
                if not existing:
                    raise ValueError(f"Savybės ID '{property_id}' nėra savybių tipų lentelėje.")
            new_row = self.spark.createDataFrame([(id, property_id, value)], schema="id STRING, property_id STRING, value STRING")
            self.df_property = self.df_property.union(new_row)

    def import_from_wide(self, filepath: str, file_format: str, id_column: str = "id") -> None:
        """
        Importuoja duomenis iš plačios lentelės.

        :param filepath: Failo kelias.
        :param file_format: Failo formatas: 'csv', 'parquet', 'feather'.
        :param id_column: Objekto ID stulpelio pavadinimas.
        """
        start_time = time.time()
        if self.engine == "pandas":
            if file_format == "csv":
                wide_df = self.pd.read_csv(filepath)
            elif file_format == "parquet":
                wide_df = self.pd.read_parquet(filepath)
            elif file_format == "feather":
                wide_df = self.pd.read_feather(filepath)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su Pandas.")
            narrow_df = wide_df.melt(id_vars=id_column, var_name="property_id", value_name="value")
            self.df_property = self.pd.concat([self.df_property, narrow_df], ignore_index=True).drop_duplicates()
            rows = len(narrow_df)
        elif self.engine == "pyspark":
            if file_format in ["csv", "parquet"]:
                wide_df = self.spark.read.format(file_format).load(filepath)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su PySpark.")
            narrow_df = wide_df.selectExpr(f"{id_column} AS id", "stack(*) AS (property_id, value)")
            self.df_property = self.df_property.union(narrow_df)
            rows = narrow_df.count()
        print(f"Importuota {rows} eilučių iš {filepath}. Trukmė: {time.time() - start_time:.2f}s")

    def export_to_wide(self, filepath: str, file_format: str, id_column: str = "id") -> None:
        """
        Eksportuoja duomenis į plačią lentelę.

        :param filepath: Failo kelias.
        :param file_format: Failo formatas: 'csv', 'parquet', 'feather'.
        :param id_column: Objekto ID stulpelio pavadinimas.
        """
        start_time = time.time()
        if self.engine == "pandas":
            wide_df = self.df_property.pivot(index=id_column, columns="property_id", values="value").reset_index()
            if file_format == "csv":
                wide_df.to_csv(filepath, index=False)
            elif file_format == "parquet":
                wide_df.to_parquet(filepath, index=False)
            elif file_format == "feather":
                wide_df.to_feather(filepath)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su Pandas")
            rows = len(wide_df)
        elif self.engine == "pyspark":
            raise NotImplementedError("PySpark nepalaiko plačių lentelių eksporto.")
        print(f"Eksportuota {rows} eilučių į {filepath}. Trukmė: {time.time() - start_time:.2f}s")

    def import_from_narrow(self, filepath: str, file_format: str) -> None:
        """
        Importuoja duomenis iš siauros lentelės.

        :param filepath: Failo kelias.
        :param file_format: Failo formatas: 'csv', 'parquet', 'feather'.
        """
        start_time = time.time()
        if self.engine == "pandas":
            if file_format == "csv":
                new_data = self.pd.read_csv(filepath)
            elif file_format == "parquet":
                new_data = self.pd.read_parquet(filepath)
            elif file_format == "feather":
                new_data = self.pd.read_feather(filepath)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su Pandas")
            self.df_property = self.pd.concat([self.df_property, new_data], ignore_index=True).drop_duplicates()
            rows = len(new_data)
        elif self.engine == "pyspark":
            if file_format in ["csv", "parquet"]:
                new_data = self.spark.read.format(file_format).load(filepath)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su PySpark.")
            self.df_property = self.df_property.union(new_data)
            rows = new_data.count()
        print(f"Importuota {rows} eilučių iš {filepath}. Trukmė: {time.time() - start_time:.2f}s")

    def export_to_narrow(self, filepath: str, file_format: str) -> None:
        """
        Eksportuoja duomenis į siaurą lentelę.

        :param filepath: Failo kelias.
        :param file_format: Failo formatas: 'csv', 'parquet', 'feather'.
        """
        start_time = time.time()
        if self.engine == "pandas":
            if file_format == "csv":
                self.df_property.to_csv(filepath, index=False)
            elif file_format == "parquet":
                self.df_property.to_parquet(filepath, index=False)
            elif file_format == "feather":
                self.df_property.to_feather(filepath)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su Pandas.")
            rows = len(self.df_property)
        elif self.engine == "pyspark":
            if file_format in ["csv", "parquet"]:
                self.df_property.write.mode("overwrite").format(file_format).save(filepath)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su PySpark.")
            rows = self.df_property.count()
        print(f"Eksportuota {rows} eilučių į {filepath}. Trukmė: {time.time() - start_time:.2f}s")

    def import_from_file(self, filepath: str, file_format: str) -> None:
        """
        Importuoja duomenis iš failo.

        :param filepath: Failo kelias.
        :param file_format: Failo formatas: 'csv', 'parquet', 'feather', 'sqlite'.
        """
        start_time = time.time()
        if self.engine == "pandas":
            if file_format == "csv":
                self.df_property = self.pd.read_csv(filepath)
            elif file_format == "parquet":
                self.df_property = self.pd.read_parquet(filepath)
            elif file_format == "feather":
                self.df_property = self.pd.read_feather(filepath)
            elif file_format == "sqlite":
                import sqlite3
                with sqlite3.connect(filepath) as conn:
                    self.df_property = self.pd.read_sql(f"SELECT * FROM {self.name}_property", conn)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su Pandas.")
            rows = len(self.df_property)
        elif self.engine == "pyspark":
            if file_format in ["csv", "parquet"]:
                self.df_property = self.spark.read.format(file_format).load(filepath)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su PySpark.")
            rows = self.df_property.count()
        print(f"Importuota {rows} eilučių iš {filepath}. Trukmė: {time.time() - start_time:.2f}s")

    def export_to_file(self, filepath: str, file_format: str) -> None:
        """
        Eksportuoja duomenis į failą.

        :param filepath: Failo kelias.
        :param file_format: Failo formatas: 'csv', 'parquet', 'feather', 'sqlite'.
        """
        start_time = time.time()
        if self.engine == "pandas":
            if file_format == "csv":
                self.df_property.to_csv(filepath, index=False)
            elif file_format == "parquet":
                self.df_property.to_parquet(filepath, index=False)
            elif file_format == "feather":
                self.df_property.to_feather(filepath)
            elif file_format == "sqlite":
                import sqlite3
                with sqlite3.connect(filepath) as conn:
                    self.df_property.to_sql(f"{self.name}_property", conn, if_exists="replace", index=False)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su Pandas.")
            rows = len(self.df_property)
        elif self.engine == "pyspark":
            if file_format in ["csv", "parquet"]:
                self.df_property.write.mode("overwrite").format(file_format).save(filepath)
            else:
                raise ValueError(f"Failo formatas '{file_format}' nepalaikomas su PySpark.")
            rows = self.df_property.count()
        print(f"Eksportuota {rows} eilučių į {filepath}. Trukmė: {time.time()-start_time:.2f}s")

    def close(self) -> None:
        """Uždaroma PySpark sesija, jei naudojama."""
        if self.engine == "pyspark":
            self.spark.stop()

In [3]:
def main() -> None:
    import string
    import os

    # NATO fonetinės abėcėlės sąrašas
    nato_phonetic_alphabet = [
        "Alpha", "Bravo", "Charlie", "Delta", "Echo", 
        "Foxtrot", "Golf", "Hotel", "India", "Juliett", 
        "Kilo", "Lima", "Mike", "November", "Oscar", "Papa", 
        "Quebec", "Romeo", "Sierra", "Tango", "Uniform", 
        "Victor", "Whiskey", "X-ray", "Yankee", "Zulu"
    ]

    # Testuojame Pandas variklį
    print("Testuojame Pandas variklį su wide ir narrow lentelėmis")
    pandas_obj = properties_of("pandas_test_objects", engine="pandas")

    # Įrašomos savybės
    for phonetic in nato_phonetic_alphabet:
        pandas_obj.add_property_type(property_id=phonetic.lower(), description=f"Savybė {phonetic}")

    # Generuojami 1000 objektų su atsitiktinėmis savybėmis
    num_objects = 1000
    for i in range(1, num_objects + 1):
        object_id = f"obj_{i}"
        for _ in range(3):  # Trijų savybių priskyrimas
            property_id = random.choice(nato_phonetic_alphabet).lower()
            value = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
            pandas_obj.add_property(object_id, property_id, value)

    # Eksportuojami ir importuojami duomenys Pandas
    pandas_formats = ["csv", "parquet", "feather"]
    for fmt in pandas_formats:
        wide_filepath = f"pandas_test_wide.{fmt}"
        narrow_filepath = f"pandas_test_narrow.{fmt}"

        # Eksportuojame wide ir narrow
        pandas_obj.export_to_wide(wide_filepath, file_format=fmt)
        pandas_obj.export_to_narrow(narrow_filepath, file_format=fmt)

        # Importuojame wide ir narrow
        pandas_obj.import_from_wide(wide_filepath, file_format=fmt)
        pandas_obj.import_from_narrow(narrow_filepath, file_format=fmt)

    pandas_obj.close()

    # Testuojame PySpark variklį
    num_objects = 100
    print("Testuojame PySpark variklį tik su narrow lentelėmis")
    pyspark_obj = properties_of("pyspark_test_objects", engine="pyspark")

    # Įrašomos savybės
    for phonetic in nato_phonetic_alphabet:
        pyspark_obj.add_property_type(property_id=phonetic.lower(), description=f"Savybė {phonetic}")

    # Generuojami 1000 objektų su atsitiktinėmis savybėmis
    for i in range(1, num_objects + 1):
        object_id = f"obj_{i}"
        for _ in range(3):  # Trijų savybių priskyrimas
            property_id = random.choice(nato_phonetic_alphabet).lower()
            value = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
            pyspark_obj.add_property(object_id, property_id, value)

    # Eksportuojami ir importuojami duomenys PySpark
    pyspark_formats = ["csv", "parquet"]
    for fmt in pyspark_formats:
        narrow_filepath = f"pyspark_test_narrow.{fmt}"

        # Eksportuojame narrow
        pyspark_obj.export_to_narrow(narrow_filepath, file_format=fmt)

        # Importuojame narrow
        pyspark_obj.import_from_narrow(narrow_filepath, file_format=fmt)

    pyspark_obj.close()

if __name__ == "__main__":
    main()


Testuojame Pandas variklį su wide ir narrow lentelėmis
Eksportuota 1000 eilučių į pandas_test_wide.csv. Trukmė: 0.02s
Eksportuota 2874 eilučių į pandas_test_narrow.csv. Trukmė: 0.00s
Importuota 26000 eilučių iš pandas_test_wide.csv. Trukmė: 0.03s
Importuota 2874 eilučių iš pandas_test_narrow.csv. Trukmė: 0.02s
Eksportuota 1000 eilučių į pandas_test_wide.parquet. Trukmė: 0.05s
Eksportuota 26000 eilučių į pandas_test_narrow.parquet. Trukmė: 0.01s
Importuota 26000 eilučių iš pandas_test_wide.parquet. Trukmė: 0.05s
Importuota 26000 eilučių iš pandas_test_narrow.parquet. Trukmė: 0.02s
Eksportuota 1000 eilučių į pandas_test_wide.feather. Trukmė: 0.03s
Eksportuota 26000 eilučių į pandas_test_narrow.feather. Trukmė: 0.01s
Importuota 26000 eilučių iš pandas_test_wide.feather. Trukmė: 0.04s
Importuota 26000 eilučių iš pandas_test_narrow.feather. Trukmė: 0.02s
Testuojame PySpark variklį tik su narrow lentelėmis


24/12/07 19:06:53 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.91 instead (on interface wlo1)
24/12/07 19:06:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/07 19:07:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/07 19:07:39 WARN DAGScheduler: Broadcasting large task binary with size 1575.3 KiB
24/12/07 19:09:03 WARN DAGScheduler: Broadcasting large task binary with size 1168.4 KiB
                                                                                

Eksportuota 300 eilučių į pyspark_test_narrow.csv. Trukmė: 144.32s


                                                                                

Importuota 300 eilučių iš pyspark_test_narrow.csv. Trukmė: 9.07s


24/12/07 19:10:06 WARN DAGScheduler: Broadcasting large task binary with size 1581.0 KiB
24/12/07 19:11:08 WARN DAGScheduler: Broadcasting large task binary with size 1174.6 KiB
                                                                                

Eksportuota 600 eilučių į pyspark_test_narrow.parquet. Trukmė: 111.78s


                                                                                

Importuota 600 eilučių iš pyspark_test_narrow.parquet. Trukmė: 1.97s
