<a href="https://colab.research.google.com/github/Airmareing/data_engineering_practice_4/blob/main/data_engineering_practice_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Task 1

In [19]:
import csv
import sqlite3
import json

file_path = "1/task_1_var_42_item.csv"
database_name = "1/database.db"
VAR = 10

class SQLiteConnection:
    def __init__(self, database_name):
        self.__database_name = database_name

    def __enter__(self):
        self.conn = sqlite3.connect(self.__database_name)
        self.conn.row_factory = sqlite3.Row
        self.cursor = self.conn.cursor()
        return self.cursor

    def __exit__(self, exc_type, exc_value, traceback):
        self.conn.commit()
        self.conn.close()

def create_table(cursor):
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS "Books" (
            "id" INTEGER PRIMARY KEY AUTOINCREMENT,
            "title" TEXT,
            "author" TEXT,
            "genre" TEXT,
            "pages" INTEGER,
            "published_year" INTEGER,
            "isbn" TEXT,
            "rating" REAL,
            "views" INTEGER
        );
        """
    )

def populate_db(cursor, file_path):
    with open(file_path, "r", encoding="utf-8") as csv_file:
        csv_reader = csv.DictReader(csv_file, delimiter=";")

        rows_to_insert = [
            {
                "title": row["title"],
                "author": row["author"],
                "genre": row["genre"],
                "pages": int(row["pages"]),
                "published_year": int(row["published_year"]),
                "isbn": row["isbn"],
                "rating": float(row["rating"]),
                "views": int(row["views"]),
            }
            for row in csv_reader
        ]

        cursor.executemany(
            """
            INSERT INTO Books (title, author, genre, pages, published_year, isbn, rating, views)
            VALUES  (:title, :author, :genre, :pages, :published_year, :isbn, :rating, :views)
        """,
            rows_to_insert,
        )

def query_and_export_to_json(cursor, query, params, output_file):
    cursor.execute(query, params)
    result = [dict(row) for row in cursor.fetchall()]

    with open(output_file, "w") as json_file:
        json_file.write(json.dumps(result, indent=2, ensure_ascii=False))

with SQLiteConnection(database_name) as cursor:
    create_table(cursor)
    populate_db(cursor, file_path)

    # Вывод первых (VAR+10) отсортированных по произвольному числовому полю строк в файл JSON
    query_and_export_to_json(
        cursor,
        """
        SELECT *
        FROM Books
        ORDER BY pages
        LIMIT ?
        """,
        (VAR + 10,),
        "1/10_int_sort.json"
    )

    # Вывод (сумма, мин, макс, среднее) по произвольному числовому полю
    query_and_export_to_json(
        cursor,
        """
        SELECT
            SUM(views) AS total_views,
            MIN(views) AS min_views,
            MAX(views) AS max_views,
            AVG(views) AS avg_views
        FROM Books
        """,
        (),
        "1/stats.json"
    )

    # Вывод частоты встречаемости для категориального поля
    query_and_export_to_json(
        cursor,
        """
        SELECT genre, COUNT(*) AS frequency
        FROM Books
        GROUP BY genre
        """,
        (),
        "1/freq.json"
    )

    # Вывод первых (VAR+10) отфильтрованных по произвольному предикату отсортированных по произвольному числовому полю строк
    query_and_export_to_json(
        cursor,
        """
        SELECT *
        FROM Books
        WHERE genre = 'роман'
        ORDER BY pages
        LIMIT ?
        """,
        (VAR + 10,),
        "1/10_sort.json"
    )


Task 2

In [26]:
import csv
import sqlite3
import json

file_path_books = "1/task_1_var_42_item.csv"
file_path_new_data = "2/task_2_var_42_subitem.csv"
database_name = "2/database.db"

class SQLiteConnection:
    def __init__(self, database_name):
        self.__database_name = database_name

    def __enter__(self):
        self.conn = sqlite3.connect(self.__database_name)
        self.conn.row_factory = sqlite3.Row
        self.cursor = self.conn.cursor()
        return self.cursor

    def __exit__(self, exc_type, exc_value, traceback):
        self.conn.commit()
        self.conn.close()

def create_books_table(cursor):
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS "Books" (
            "id" INTEGER PRIMARY KEY AUTOINCREMENT,
            "title" TEXT,
            "author" TEXT,
            "genre" TEXT,
            "pages" INTEGER,
            "published_year" INTEGER,
            "isbn" TEXT,
            "rating" REAL,
            "views" INTEGER
        );
        """
    )

def create_new_data_table(cursor):
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS "NewData" (
            "id" INTEGER PRIMARY KEY AUTOINCREMENT,
            "title" TEXT,
            "price" INTEGER,
            "place" TEXT,
            "date" TEXT,
            FOREIGN KEY("title") REFERENCES "Books"("title")
        );
        """
    )

def populate_books_table(cursor, file_path_books):
    with open(file_path_books, "r", encoding="utf-8") as csv_file:
        csv_reader = csv.DictReader(csv_file, delimiter=";")

        rows_to_insert = [
            {
                "title": row["title"],
                "author": row["author"],
                "genre": row["genre"],
                "pages": int(row["pages"]),
                "published_year": int(row["published_year"]),
                "isbn": row["isbn"],
                "rating": float(row["rating"]),
                "views": int(row["views"]),
            }
            for row in csv_reader
        ]

        cursor.executemany(
            """
            INSERT INTO Books (title, author, genre, pages, published_year, isbn, rating, views)
            VALUES  (:title, :author, :genre, :pages, :published_year, :isbn, :rating, :views)
            """,
            rows_to_insert,
        )

def populate_new_data_table(cursor, file_path_new_data):
    with open(file_path_new_data, "r", encoding="utf-8") as csv_file:
        csv_reader = csv.DictReader(csv_file, delimiter=";")

        rows_to_insert = [
            {
                "title": row["title"],
                "price": int(row["price"]),
                "place": row["place"],
                "date": row["date"],
            }
            for row in csv_reader
        ]

        cursor.executemany(
            """
            INSERT INTO NewData (title, price, place, date)
            VALUES  (:title, :price, :place, :date)
            """,
            rows_to_insert,
        )

def query_average_price_by_genre(cursor):
    cursor.execute(
        """
        SELECT Books.genre, AVG(NewData.price) AS avg_price
        FROM Books
        JOIN NewData ON Books.title = NewData.title
        GROUP BY Books.genre
        """
    )
    result = [dict(row) for row in cursor.fetchall()]
    print(result)

def query_total_price_by_author(cursor):
    cursor.execute(
        """
        SELECT Books.author, SUM(NewData.price) AS total_price
        FROM Books
        JOIN NewData ON Books.title = NewData.title
        GROUP BY Books.author
        """
    )
    result = [dict(row) for row in cursor.fetchall()]
    print(result)

def query_books_above_average_price(cursor):
    cursor.execute(
        """
        SELECT Books.title, Books.author, NewData.price
        FROM Books
        JOIN NewData ON Books.title = NewData.title
        WHERE NewData.price > (
            SELECT AVG(price)
            FROM NewData
        )
        LIMIT 10;
        """
    )
    result = [dict(row) for row in cursor.fetchall()]
    print(result)

with SQLiteConnection(database_name) as cursor:
    create_books_table(cursor)
    populate_books_table(cursor, file_path_books)

    create_new_data_table(cursor)
    populate_new_data_table(cursor, file_path_new_data)
    print("1. Средняя цена книг каждого жанра")
    query_average_price_by_genre(cursor)
    print("2. Суммарная стоимость книг для каждого автора")
    query_total_price_by_author(cursor)
    print("3.  Книги, цена которых превышает среднюю цену всех книг")
    query_books_above_average_price(cursor)


1. Средняя цена книг каждого жанра
[{'genre': 'биография', 'avg_price': 2866.7659574468084}, {'genre': 'детектив', 'avg_price': 2825.088992974239}, {'genre': 'детская литература', 'avg_price': 2801.1659836065573}, {'genre': 'исторический роман', 'avg_price': 2702.0668058455117}, {'genre': 'любовный роман', 'avg_price': 2844.1072727272726}, {'genre': 'научная фантастика', 'avg_price': 2646.8210227272725}, {'genre': 'приключения', 'avg_price': 2806.7624466571833}, {'genre': 'роман', 'avg_price': 2785.042750929368}, {'genre': 'триллер', 'avg_price': 2816.9365798414497}, {'genre': 'фэнтези', 'avg_price': 2739.659021406728}]
2. Суммарная стоимость книг для каждого автора
[{'author': 'Алан Александр Милн', 'total_price': 14793576}, {'author': 'Александр Пушкин', 'total_price': 9823296}, {'author': 'Алексей Толстой', 'total_price': 59930598}, {'author': 'Антон Чехов', 'total_price': 13067502}, {'author': 'Артур Конан Дойл', 'total_price': 14963298}, {'author': 'Борис Пастернак', 'total_price'

Task 3

In [36]:
import msgpack
import pickle
import sqlite3
import pandas as pd

database_name = "3/database.db"
msgpack_file_path = "3/task_3_var_42_part_1.msgpack"
pkl_file_path = "3/task_3_var_42_part_2.pkl"
VAR = 10

class SQLiteConnection:
    def __init__(self, database_name):
        self.__database_name = database_name

    def __enter__(self):
        self.conn = sqlite3.connect(self.__database_name)
        self.conn.row_factory = sqlite3.Row
        self.cursor = self.conn.cursor()
        return self.cursor

    def __exit__(self, exc_type, exc_value, traceback):
        self.conn.commit()
        self.conn.close()

def create_music_table(cursor):
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS "Music" (
            "id" INTEGER PRIMARY KEY AUTOINCREMENT,
            "artist" TEXT,
            "song" TEXT,
            "duration_ms" INTEGER,
            "year" INTEGER,
            "tempo" REAL,
            "genre" TEXT,
            "acousticness" REAL,
            "mode" INTEGER, -- Только в формате Msgpack
            "speechiness" REAL, -- Только в формате Msgpack
            "instrumentalness" REAL -- Только в формате Msgpack
        );
        """
    )

def insert_data_from_msgpack(cursor, msgpack_file_path):
    with open(msgpack_file_path, "rb") as file:
        data = msgpack.unpack(file, raw=False)
        df = pd.DataFrame(data)
        df = df.drop(columns=["energy", "popularity"], errors="ignore")
        df.to_sql("Music", con=cursor.connection, if_exists="append", index=False)

def insert_data_from_pkl(cursor, pkl_file_path):
    with open(pkl_file_path, "rb") as file:
        data = pickle.load(file)
        df = pd.DataFrame(data)
        df = df.drop(columns=["energy", "popularity"], errors="ignore")
        df.to_sql("Music", con=cursor.connection, if_exists="append", index=False)


def query_and_export_to_json(cursor, query, params, output_file):
    cursor.execute(query, params)
    result = [dict(row) for row in cursor.fetchall()]

    with open(output_file, "w") as json_file:
        json_file.write(pd.Series(result).to_json(orient="records", indent=2))

# Основной блок кода
with SQLiteConnection(database_name) as cursor:
    create_music_table(cursor)

    insert_data_from_msgpack(cursor, msgpack_file_path)
    insert_data_from_pkl(cursor, pkl_file_path)

    query_and_export_to_json(
        cursor,
        """
        SELECT *
        FROM Music
        ORDER BY duration_ms
        LIMIT ?
        """,
        (VAR + 10,),
        "3/10_int_sort.json"
    )

    query_and_export_to_json(
        cursor,
        """
        SELECT
            SUM(duration_ms) AS total_duration,
            MIN(duration_ms) AS min_duration,
            MAX(duration_ms) AS max_duration,
            AVG(duration_ms) AS avg_duration
        FROM Music
        """,
        (),
        "3/stats.json"
    )

    query_and_export_to_json(
        cursor,
        """
        SELECT genre, COUNT(*) AS frequency
        FROM Music
        GROUP BY genre
        """,
        (),
        "3/freq.json"
    )

    query_and_export_to_json(
        cursor,
        """
        SELECT *
        FROM Music
        WHERE year > 2000
        ORDER BY tempo
        LIMIT ?
        """,
        (VAR + 15,),
        "3/15_sort.json"
    )
