In [1]:
import duckdb
from pathlib import Path
# from fastcore.net import urlsave
import urllib
from loguru import logger

In [2]:
url = "https://github.com/lerocha/chinook-database/raw/master/ChinookDatabase/DataSources/Chinook_Sqlite.sqlite"
# path = Path("chinook.sqlite")
# if not path.exists():
#     urlsave(url, path)

In [3]:
# path = Path("Chinook_Sqlite.sqlite")
# con = duckdb.connect()
# con.execute(f"ATTACH '{path}' AS sqlite_db (TYPE sqlite);")
# con.execute(f"USE sqlite_db")

# # # COMMENT: Could be DuckDB, CSV, Parquet, JSON etc data sources (not just SQLite)

In [4]:
# con.sql("SELECT * FROM Artist")

In [5]:
class TableInfo:
    def __init__(self, name, columns):
        self.name = name
        self.columns = columns

    def __repr__(self):
        return f"Table: {self.name}\nColumns: {', '.join(self.columns)}"

class Tables:
    def __init__(self, con, tables, source_type):
        self.con = con
        self.source_type = source_type
        self.table_names = tables
        for table in tables:
            if self.source_type == 'sqlite':
                columns = self.con.execute(f"PRAGMA table_info('{table}');").fetchall()
                column_names = [col[1] for col in columns]  # PRAGMA table_info returns column names in the second position
            else:
                columns = self.con.execute(f"SELECT column_name FROM system.information_schema.columns WHERE table_name='{table}'").fetchall()
                column_names = [col[0] for col in columns]
            setattr(self, table, TableInfo(table, column_names))

    def __repr__(self):
        return ", ".join(self.table_names)

    def __getitem__(self, tables):
        if isinstance(tables, str):
            tables = [tables]
        summaries = []
        for table in tables:
            if hasattr(self, table):
                summaries.append(repr(getattr(self, table)))
            else:
                summaries.append(f"Table '{table}' does not exist.")
        return "\n\n".join(summaries)

class Database:
    def __init__(self, source):
        self.source = source
        self.source_type = self._determine_source_type()
        self.con = self._connect()
        self._tables = self._get_tables()
        self.t = Tables(self.con, self._tables, self.source_type)

    def _determine_source_type(self):
        if self.source.startswith('http'):
            return 'remote'
        else:
            path = Path(self.source)
            if path.suffix == '.sqlite':
                return 'sqlite'
            elif path.suffix == '.csv':
                return 'csv'
            elif path.suffix == '.duckdb':
                return 'duckdb'
            elif path.suffix == '.json':
                return 'json'
            elif path.suffix == '.parquet':
                return 'parquet'
            else:
                raise ValueError("Unsupported file type")

    def _connect(self):
        if self.source_type == 'remote':
            filename = Path(self.source).name
            path = Path(filename)
            if not path.exists():
                print(f"Downloading {self.source}...")
                path.parent.mkdir(parents=True, exist_ok=True)  # Ensure the directory exists
                urllib.request.urlretrieve(self.source, path)
                print("Download complete.")
        else:
            path = Path(self.source)
            if not path.exists():
                raise FileNotFoundError(f"File {self.source} not found")

        con = duckdb.connect(database=':memory:')
        
        if path.suffix == '.sqlite':
            con.execute(f"ATTACH '{path}' AS sqlite_db (TYPE sqlite);")
            con.execute("USE sqlite_db;")
        elif path.suffix == '.csv':
            con.execute(f"CREATE TABLE attached_db AS SELECT * FROM read_csv_auto('{path}');")
        elif path.suffix == '.duckdb':
            con.execute(f"ATTACH DATABASE '{path}' AS attached_db;")
            con.execute("USE attached_db;")
        elif path.suffix == '.json':
            con.execute(f"CREATE TABLE attached_db AS SELECT * FROM read_json_auto('{path}');")
        elif path.suffix == '.parquet':
            con.execute(f"CREATE TABLE attached_db AS SELECT * FROM read_parquet('{path}');")
        
        print(f"Connected to database: {path}")
        return con

    def _get_tables(self):
        tables = self.con.execute("SHOW TABLES;").fetchall()
        print(f"Raw tables data: {tables}")
        tables = [row[0] for row in tables]
        print(f"Tables found: {tables}")
        return tables

    def tables(self):
        return ", ".join(self._tables)

    def show_tables(self):
        return self.con.execute("SHOW TABLES;").fetchall()

In [6]:
db = Database(url)

Connected to database: Chinook_Sqlite.sqlite
Raw tables data: [('Album',), ('Artist',), ('Customer',), ('Employee',), ('Genre',), ('Invoice',), ('InvoiceLine',), ('MediaType',), ('Playlist',), ('PlaylistTrack',), ('Track',)]
Tables found: ['Album', 'Artist', 'Customer', 'Employee', 'Genre', 'Invoice', 'InvoiceLine', 'MediaType', 'Playlist', 'PlaylistTrack', 'Track']


In [7]:
type(db)

__main__.Database

In [8]:
db.show_tables()

[('Album',),
 ('Artist',),
 ('Customer',),
 ('Employee',),
 ('Genre',),
 ('Invoice',),
 ('InvoiceLine',),
 ('MediaType',),
 ('Playlist',),
 ('PlaylistTrack',),
 ('Track',)]

In [9]:
db.con.sql("SHOW TABLES;")

┌───────────────┐
│     name      │
│    varchar    │
├───────────────┤
│ Album         │
│ Artist        │
│ Customer      │
│ Employee      │
│ Genre         │
│ Invoice       │
│ InvoiceLine   │
│ MediaType     │
│ Playlist      │
│ PlaylistTrack │
│ Track         │
├───────────────┤
│    11 rows    │
└───────────────┘

In [10]:
db.tables()

'Album, Artist, Customer, Employee, Genre, Invoice, InvoiceLine, MediaType, Playlist, PlaylistTrack, Track'

In [11]:
db.t

Album, Artist, Customer, Employee, Genre, Invoice, InvoiceLine, MediaType, Playlist, PlaylistTrack, Track

In [12]:
db.t.Artist.name

'Artist'

In [13]:
db.t.table_names

['Album',
 'Artist',
 'Customer',
 'Employee',
 'Genre',
 'Invoice',
 'InvoiceLine',
 'MediaType',
 'Playlist',
 'PlaylistTrack',
 'Track']

In [14]:
dt = db.t
dt

Album, Artist, Customer, Employee, Genre, Invoice, InvoiceLine, MediaType, Playlist, PlaylistTrack, Track

In [15]:
dt['Artist','Album','Track','Genre','MediaType']

'Table: Artist\nColumns: ArtistId, Name\n\nTable: Album\nColumns: AlbumId, Title, ArtistId\n\nTable: Track\nColumns: TrackId, Name, AlbumId, MediaTypeId, GenreId, Composer, Milliseconds, Bytes, UnitPrice\n\nTable: Genre\nColumns: GenreId, Name\n\nTable: MediaType\nColumns: MediaTypeId, Name'

### Dictionary of data example

In [16]:
# Create a Python dictionary with three people
data_dict = [
    {
        "name": "John Doe",
        "age": 30,
        "email": "john.doe@example.com",
        "is_employee": True,
        "hobbies": ["reading", "playing soccer", "traveling"],
        "address": {
            "street": "123 Main Street",
            "city": "New York",
            "state": "NY",
            "zip": "10001"
        }
    },
    {
        "name": "Jane Smith",
        "age": 25,
        "email": "jane.smith@example.com",
        "is_employee": False,
        "hobbies": ["painting", "cycling"],
        "address": {
            "street": "456 Elm Street",
            "city": "Los Angeles",
            "state": "CA",
            "zip": "90001"
        }
    },
    {
        "name": "Alice Johnson",
        "age": 35,
        "email": "alice.johnson@example.com",
        "is_employee": True,
        "hobbies": ["cooking", "hiking"],
        "address": {
            "street": "789 Oak Street",
            "city": "Chicago",
            "state": "IL",
            "zip": "60601"
        }
    }
]