# Projekt MAGN

### Autorzy:

- Szymon Jurecki
- Dominik Breksa

In [None]:
%pip install .
%pip install pandas
%pip install numpy
%pip install kaggle

Pobranie oraz Przygotowanie Danych

In [None]:
from pathlib import Path
from typing import Final

resources_dir: Final[Path] = Path('resources/data/')
zip_data_file: Final[Path] = resources_dir.joinpath('pitchfork-data.zip')
database_path: Final[Path]  = resources_dir.joinpath('database.sqlite')

Pobranie danych z platformy Kaggle

[Link do danych z platformy Kaggle](https://www.kaggle.com/datasets/nolanbconaway/pitchfork-data)

In [None]:
from os.path import exists

if not exists(zip_data_file):
    !kaggle datasets download -d nolanbconaway/pitchfork-data -p resources/data

Rozpakowanie danych do odpowiedniego katalogu

In [None]:
from zipfile import ZipFile

if not exists(database_path):
    with ZipFile(zip_data_file, 'r') as zip_ref:
        zip_ref.extractall(resources_dir)

Dodanie kluczy głównych i obcych do bazy danych, ponieważ nie zostały one dodane podczas tworzenia na platformie Kaggle, a jest to ważne podczas tworzenia MAGN.

In [None]:
from typing import List, Tuple
import sqlite3

def get_table_columns_types(db_cursor: sqlite3.Cursor, table_name: str) -> List[Tuple[str, str]]:
    """Returns a list of tuples containing the column name and data type of given table in a database."""
    
    return list(map(
        lambda x: (x[1], x[2]),
        db_cursor.execute(f"""
            PRAGMA
                table_info({table_name});
        """).fetchall(),
    ))

def delete_duplicates(db_cursor: sqlite3.Cursor, table_name: str, column_name: str) -> None:
    """Deletes duplicates from a given table column in a database."""
    
    db_cursor.execute(f"""
        DELETE FROM
            {table_name}
        WHERE
            {column_name} IN (
                SELECT
                    {column_name}
                FROM
                    {table_name}
                GROUP BY
                    {column_name}
                HAVING
                    COUNT(*) > 1
            );
    """)

def create_table_with_primary_key(db_cursor: sqlite3.Cursor, table_name: str, temporary_table_name: str, column_name: str) -> None:
    """Creates a table with a primary key in a database."""
    
    table_columns = get_table_columns_types(db_cursor, table_name)
    
    db_cursor.execute(f"""
        CREATE TABLE {temporary_table_name} (
            {
                ', '.join(
                    f'{column} {data_type}'
                    if column != column_name
                    else f'{column} {data_type} PRIMARY KEY'
                    for column, data_type in table_columns
                )
            }
        );
    """)
    
def move_data_and_switch_tables(db_cursor: sqlite3.Cursor, table_name: str, temporary_table_name: str) -> None:
    """Moves data from a temporary table to the original table and switches the tables in a database."""
    
    db_cursor.executescript(f"""
        -- Move the data
        INSERT INTO
            {temporary_table_name}
        SELECT
            *
        FROM
            {table_name};
        
        -- Drop the old table
        DROP TABLE
            {table_name};
        
        -- Rename the new table
        ALTER TABLE
            {temporary_table_name}
        RENAME TO
            {table_name};
    """)

def add_primary_key(db_cursor: sqlite3.Cursor, table_name: str, column_name: str) -> None:
    """Adds a primary key to a given table column in a database."""
    temporary_table_name = f'{table_name}_'
    
    delete_duplicates(db_cursor, table_name, column_name)
    create_table_with_primary_key(db_cursor, table_name, temporary_table_name, column_name)
    move_data_and_switch_tables(db_cursor, table_name, temporary_table_name)

def create_table_with_foreign_key(db_cursor: sqlite3.Cursor, table_name: str, column_name: str, foreign_table_name: str, foreign_column_name: str, temporary_foreign_table_name: str) -> None:
    """Creates a table with a foreign key in a database."""

    foreign_table_columns = get_table_columns_types(db_cursor, foreign_table_name)
    
    db_cursor.execute(f"""
        CREATE TABLE {temporary_foreign_table_name} (
            {
            ', '.join(
                f'{column} {data_type}'
                for column, data_type in foreign_table_columns
            )
            },
            CONSTRAINT fk_{foreign_table_name}_{foreign_column_name}
                FOREIGN KEY
                    ({foreign_column_name})
                REFERENCES
                    {table_name} ({column_name})
        );
    """)

def add_foreign_key(db_cursor: sqlite3.Cursor, table_name: str, column_name: str, foreign_table_name: str, foreign_column_name: str) -> None:
    """Adds a foreign key to a given table column in a database."""

    temporary_foreign_table_name = f'{foreign_table_name}_'
        
    db_cursor.execute(f"""
        PRAGMA
            foreign_keys = OFF;
    """)
    
    create_table_with_foreign_key(db_cursor, table_name, column_name, foreign_table_name, foreign_column_name, temporary_foreign_table_name)
    move_data_and_switch_tables(db_cursor, foreign_table_name, temporary_foreign_table_name)

    db_cursor.execute(f"""
        PRAGMA
            foreign_keys = ON;
    """)

In [None]:
with sqlite3.connect(database_path) as conn:
    cursor = conn.cursor()

    cursor.execute(f"""BEGIN;""")
    try:
        add_primary_key(cursor, 'reviews', 'reviewid')

        add_foreign_key(cursor, 'reviews', 'reviewid', 'artists', 'reviewid')
        add_foreign_key(cursor, 'reviews', 'reviewid', 'content', 'reviewid')
        add_foreign_key(cursor, 'reviews', 'reviewid', 'genres', 'reviewid')
        add_foreign_key(cursor, 'reviews', 'reviewid', 'labels', 'reviewid')
        add_foreign_key(cursor, 'reviews', 'reviewid', 'years', 'reviewid')
    except Exception as error:
        conn.rollback()
        raise error
    else:
        conn.commit()

![ERD Diagram for the database schema](docs/images/database_erd.png)

In [None]:
from magn.database.sqlite3 import get_table_names

all_tables = get_table_names(database_path)

all_tables

In [None]:
from magn.database.sqlite3 import SQLite3KeysReader

keys_reader = SQLite3KeysReader(database_path, all_tables)
keys_reader.read()

In [None]:
from magn.database.sqlite3 import SQLite3DataReader

data_reader = SQLite3DataReader(database_path, all_tables, keys_reader.read())
data_reader.read()

In [None]:
dfs = data_reader.read()

dfs['artists']

In [None]:
dfs['reviews']

In [None]:
from magn.database.database import Database

dataframe_database = Database.from_sqlite3(database_path)

In [None]:
from magn.database.topological_sort import TopologicalSorter

sort_data = {
    '0': ['1'],
    '1': ['2'],
    '2': [],
    '3': ['1', '2']
}

topological_sort = TopologicalSorter()
for x in topological_sort.sort(sort_data):
    print(x)

In [None]:
list(dataframe_database.sort())

In [None]:
dataframe_database

In [None]:
dataframe_database['artists']