# Writing Core SQL

![Core Layer](./images/core_layer.png)

Before we can write some SQL, we need to have some data and we need to define some tables. 

We have some data from [Open Data DK](https://www.opendata.dk/city-of-copenhagen/parkeringstaelling-i-zoner) counting parking spots available and how many were used at given points in time. 

First we need to define the tables we will use - remember these aren't connected to a database, they're just definitions

In [1]:
import sqlalchemy as sa

meta = sa.MetaData()

fkt_parking = sa.Table("fkt_parking", 
                       meta, 
                       sa.Column("id", sa.Integer, primary_key=True),
                       sa.Column("area_id", sa.Integer, sa.ForeignKey("dim_area.area_id")), 
                       sa.Column("year_month", sa.VARCHAR(20)),
                       sa.Column("count_type", sa.Integer, sa.ForeignKey("dim_parking_types.type_id")),
                       sa.Column("count", sa.Integer),
                       sa.Column("hour", sa.Integer)
                      )

dim_area = sa.Table("dim_area", 
                    meta,
                    sa.Column("area_id", sa.Integer, primary_key=True),
                    sa.Column("city", sa.VARCHAR(50)),
                    sa.Column("street_name", sa.VARCHAR(200)),
                    sa.Column("postnr", sa.VARCHAR(4)),
                    sa.Column("nr", sa.Integer)
                   )
                       
dim_parking_types = sa.Table("dim_parking_types",
                             meta,
                             sa.Column("type_id", sa.Integer, primary_key=True),
                             sa.Column("name", sa.VARCHAR(50), unique=True)
                            )

Given that we're starting a new database, we can use SQLAlchemy to create the tables for us. 

The `MetaData` catalogue let's SQLAlchemy know what dependencies each table has, so it can create and drop constraints in the correct order

In [2]:
# If you have docker installed - uncomment these lines
# !docker run -e POSTGRES_PASSWORD=postgres -p 5432:5432 -d postgres
conn_string = "postgresql://postgres:postgres@localhost:5432"
# Otherwise, use the sqlite conn_string
# conn_string = "sqlite:///parking.db"

In [3]:
meta.tables

FacadeDict({'fkt_parking': Table('fkt_parking', MetaData(), Column('id', Integer(), table=<fkt_parking>, primary_key=True, nullable=False), Column('area_id', Integer(), ForeignKey('dim_area.area_id'), table=<fkt_parking>), Column('year_month', VARCHAR(length=20), table=<fkt_parking>), Column('count_type', Integer(), ForeignKey('dim_parking_types.type_id'), table=<fkt_parking>), Column('count', Integer(), table=<fkt_parking>), Column('hour', Integer(), table=<fkt_parking>), schema=None), 'dim_area': Table('dim_area', MetaData(), Column('area_id', Integer(), table=<dim_area>, primary_key=True, nullable=False), Column('city', VARCHAR(length=50), table=<dim_area>), Column('street_name', VARCHAR(length=200), table=<dim_area>), Column('postnr', VARCHAR(length=4), table=<dim_area>), Column('nr', Integer(), table=<dim_area>), schema=None), 'dim_parking_types': Table('dim_parking_types', MetaData(), Column('type_id', Integer(), table=<dim_parking_types>, primary_key=True, nullable=False), Colum

In [4]:
engine = sa.create_engine(conn_string, future=True)

In [5]:
meta.create_all(engine)

# Inserting data

To write the SQL, we use SQLAlchemy's constructs to map to our familiar SQL concepts

In [6]:
insert_sql = sa.insert(dim_parking_types).values(type_id=0, name="legal")
print(insert_sql)

INSERT INTO dim_parking_types (type_id, name) VALUES (:type_id, :name)


The table class also has some helper methods

In [7]:
insert_sql = dim_parking_types.insert().values(type_id=0, name="legal")
print(insert_sql)

INSERT INTO dim_parking_types (type_id, name) VALUES (:type_id, :name)


The SQL has been generated, now we need to send it to the database by connecting - our first "work-being-done". Always make sure your database connection is closed properly - the easiest way in python is using the `with` block:

In [8]:
with engine.connect() as conn:
    conn.execute(insert_sql)

If we go check the database - nothing has happened!

When altering data, the changes will be rolled back unless we actively commit

In [9]:
with engine.connect() as conn:
    conn.execute(insert_sql)
    conn.commit()

Let's delete the row with `delete()`

In [10]:
delete_sql = dim_parking_types.delete().where(dim_parking_types.c.type_id == 0)
print(delete_sql)

DELETE FROM dim_parking_types WHERE dim_parking_types.type_id = :type_id_1


In [11]:
with engine.connect() as conn:
    conn.execute(delete_sql)
    conn.commit()

By default, SQLAlchemy requires you to **commit as you go** - every `execute` will be sent to the database, but it will only be persisted by calling `conn.commit()`. Calling `conn.rollback()` will roll back any non-committed changes.

In [12]:
with engine.connect() as conn:
    conn.execute(insert_sql)
    conn.commit()
    conn.execute(delete_sql)
    conn.rollback()

## Unit of Work

![deepalchemy](images/deepalchemy.png)

SQLAlchemy uses a **Unit of Work** pattern. 

SQLAlchemy expects us to prepare all of our changes, and then send it all to the database at the same time. This allows SQLAlchemy to optimize the resulting communication with the database, as it knows what work needs to be done. The active commit is part of this pattern, allowing greater control of when the changes are flushed to the database.

**Unit of Work** means that SQLAlchemy is usually working in a transaction. We can also explicitly manage our own transactions

In [13]:
with engine.connect() as conn:
    conn.execute(delete_sql)
    conn.commit()
    transaction = conn.begin()
    conn.execute(insert_sql)
    transaction.rollback()

Usually we don't need this level of control, so we can use the **Begin Once** style of commit

In [14]:
with engine.begin() as conn:
    conn.execute(insert_sql)
    conn.execute(delete_sql)

The transaction is automatically committed at the end of the `with` block

In [15]:
with engine.begin() as conn:
    conn.execute(insert_sql)
    raise Exception("Something bad happened")

Exception: Something bad happened

If there's an exception, the transaction is automatically rolled back

# Inserting many rows

Let's read our csv files and insert them into the database using SQLAlchemy

In [16]:
import csv
import pathlib

First we pair up the file paths and the corresponding table

In [17]:
data = [(pathlib.Path("data/dim_parking_types.csv"), dim_parking_types),
        (pathlib.Path("data/dim_area.csv"), dim_area),
        (pathlib.Path("data/fkt_parking.csv"), fkt_parking)]

In [18]:
# Run everything inside one transaction - Unit of Work
with engine.begin() as conn:
    for data_file, table in data:
        # Read the CSV file as a list of dictionaries
        with data_file.open(encoding="utf-8") as f:
            rows = list(csv.DictReader(f))
        # Create the Insert statement
        sql = table.insert()
        
        conn.execute(sql, parameters=rows)

By passing a list of dictionaries, SQLAlchemy knows to use the `.executemany()` method of the underlying DB-API library, which optimizes for inserting multiple rows

# Writing some logic

Now that we have some data in tables to work with, let's try to write some business logic.

## Task 1: Get only counts for Gilleleje

In [19]:
sql = sa.select([fkt_parking.c.count, fkt_parking.c.hour, dim_area]).join(dim_area).where(dim_area.c.city == "Gilleleje")
print(sql)

SELECT fkt_parking.count, fkt_parking.hour, dim_area.area_id, dim_area.city, dim_area.street_name, dim_area.postnr, dim_area.nr 
FROM fkt_parking JOIN dim_area ON dim_area.area_id = fkt_parking.area_id 
WHERE dim_area.city = :city_1


Because of the foreign key relationship, SQLAlchemy automatically infers the join condition

In [20]:
with engine.connect() as conn:
    results = conn.execute(sql).all()

In [21]:
[dict(row) for row in results[:5]]

[{'count': 1004,
  'hour': 12,
  'area_id': 12,
  'city': 'Gilleleje',
  'street_name': 'Australiens',
  'postnr': '5549',
  'nr': 5},
 {'count': 1018,
  'hour': 12,
  'area_id': 12,
  'city': 'Gilleleje',
  'street_name': 'Australiens',
  'postnr': '5549',
  'nr': 5},
 {'count': 1019,
  'hour': 12,
  'area_id': 12,
  'city': 'Gilleleje',
  'street_name': 'Australiens',
  'postnr': '5549',
  'nr': 5},
 {'count': 1028,
  'hour': 12,
  'area_id': 12,
  'city': 'Gilleleje',
  'street_name': 'Australiens',
  'postnr': '5549',
  'nr': 5},
 {'count': 1032,
  'hour': 12,
  'area_id': 12,
  'city': 'Gilleleje',
  'street_name': 'Australiens',
  'postnr': '5549',
  'nr': 5}]


We can also throw it into a Dataframe directly to get a nice representation

In [22]:
import pandas as pd

pd.DataFrame(results)

Unnamed: 0,count,hour,area_id,city,street_name,postnr,nr
0,1004,12,12,Gilleleje,Australiens,5549,5
1,1018,12,12,Gilleleje,Australiens,5549,5
2,1019,12,12,Gilleleje,Australiens,5549,5
3,1028,12,12,Gilleleje,Australiens,5549,5
4,1032,12,12,Gilleleje,Australiens,5549,5
...,...,...,...,...,...,...,...
109,769,22,12,Gilleleje,Australiens,5549,5
110,766,22,12,Gilleleje,Australiens,5549,5
111,737,22,12,Gilleleje,Australiens,5549,5
112,743,22,12,Gilleleje,Australiens,5549,5


(In fact, Pandas supports SQLAlchemy queries directly)

In [23]:
with engine.connect() as conn:
    df = pd.read_sql(sql, conn)
df

Unnamed: 0,count,hour,area_id,city,street_name,postnr,nr
0,1004,12,12,Gilleleje,Australiens,5549,5
1,1018,12,12,Gilleleje,Australiens,5549,5
2,1019,12,12,Gilleleje,Australiens,5549,5
3,1028,12,12,Gilleleje,Australiens,5549,5
4,1032,12,12,Gilleleje,Australiens,5549,5
...,...,...,...,...,...,...,...
109,769,22,12,Gilleleje,Australiens,5549,5
110,766,22,12,Gilleleje,Australiens,5549,5
111,737,22,12,Gilleleje,Australiens,5549,5
112,743,22,12,Gilleleje,Australiens,5549,5


## Task 2: Get sum of counts per hour per city

Any SQL function is available under the `func` namespace - these will be dynamically generated based on the backend. In this case, we want at sum

In [24]:
sql = (sa.select([sa.func.sum(fkt_parking.c.count).label("total_count"), 
                  dim_area.c.city, 
                  fkt_parking.c.hour])
       .join(dim_area)
       .group_by(dim_area.c.city, fkt_parking.c.hour))
print(sql)

SELECT sum(fkt_parking.count) AS total_count, dim_area.city, fkt_parking.hour 
FROM fkt_parking JOIN dim_area ON dim_area.area_id = fkt_parking.area_id GROUP BY dim_area.city, fkt_parking.hour


In [25]:
with engine.connect() as conn:
    result = conn.execute(sql.limit(10))
    df = pd.DataFrame(result)
df

Unnamed: 0,total_count,city,hour
0,29225,Sunds,17
1,29857,Værløse,12
2,39881,Bøvlingbjerg,17
3,2104,Hundested,22
4,20620,Maribo,12
5,2433,Ryomgård,17
6,51211,Høng,17
7,35831,Ansager,17
8,4601,Svinninge,12
9,37421,Tarm,22


The SQL is getting a bit longer and we have some repetition - good thing this is Python and not SQL! We can create variables and functions to manipulate the SQL

In [26]:
table = fkt_parking.join(dim_area)
total_count = sa.func.sum(fkt_parking.c.count).label("total_count")
groupby_cols = [dim_area.c.city, fkt_parking.c.hour]

In [27]:
sql = sa.select([total_count, *groupby_cols]).select_from(table).group_by(*groupby_cols)

We can dynamically update the SQL on-the-fly given our requirements

In [28]:
# Add an additional column to the groupby
print(sql.group_by(fkt_parking.c.count_type))

SELECT sum(fkt_parking.count) AS total_count, dim_area.city, fkt_parking.hour 
FROM fkt_parking JOIN dim_area ON dim_area.area_id = fkt_parking.area_id GROUP BY dim_area.city, fkt_parking.hour, fkt_parking.count_type


In [29]:
# Add an additional colum to the select
print(sql.add_columns(fkt_parking.c.count_type))

SELECT sum(fkt_parking.count) AS total_count, dim_area.city, fkt_parking.hour, fkt_parking.count_type 
FROM fkt_parking JOIN dim_area ON dim_area.area_id = fkt_parking.area_id GROUP BY dim_area.city, fkt_parking.hour


In [30]:
# Notice that this doesn't alter the SQL statement, it creates a new SQL statement
print(sql)

SELECT sum(fkt_parking.count) AS total_count, dim_area.city, fkt_parking.hour 
FROM fkt_parking JOIN dim_area ON dim_area.area_id = fkt_parking.area_id GROUP BY dim_area.city, fkt_parking.hour


In [31]:
with engine.connect() as conn:
    # Dynamically add a limit statement
    result = conn.execute(sql.limit(10))
    df = pd.DataFrame(result)
df

Unnamed: 0,total_count,city,hour
0,29225,Sunds,17
1,29857,Værløse,12
2,39881,Bøvlingbjerg,17
3,2104,Hundested,22
4,20620,Maribo,12
5,2433,Ryomgård,17
6,51211,Høng,17
7,35831,Ansager,17
8,4601,Svinninge,12
9,37421,Tarm,22


# Task 3: Calculate Occupancy rate

Now we're getting a bit more complicated - to solve this, we will create two CTEs and join them together to match up the two different parking types

In [32]:
# We can reuse our base select
base_select = sa.select([fkt_parking.c.count, fkt_parking.c.area_id, fkt_parking.c.year_month, fkt_parking.c.hour]).join(dim_parking_types)

In [32]:
# Create a CTE named available_spaces filtered on parking_types
available_spaces = base_select.where(dim_parking_types.c.name == "legal").cte("available_spaces")

In [32]:
# Create another CTE named occupied spaces filtered on parking_types
counted_spaces = base_select.where(dim_parking_types.c.name == "counted").cte("occupied_spaces")

In [32]:
# Define the metric of interest
occupancy_rate = (100 * counted_spaces.c.count / available_spaces.c.count).label("occupancy_rate")

In [32]:
# Define the join - the CTE's don't have Foreignkeys SQLAlchemy can use to infer the relationship
cte_join_condition = sa.and_(counted_spaces.c.area_id == available_spaces.c.area_id,
                        counted_spaces.c.year_month == available_spaces.c.year_month,
                        counted_spaces.c.hour == available_spaces.c.hour
                        )

In [32]:
# Create a join
joined_ctes = available_spaces.join(counted_spaces, onclause=cte_join_condition)

In [32]:
sql = (sa.select([occupancy_rate, dim_area.c.city, available_spaces.c.year_month, available_spaces.c.hour])
       .select_from(joined_ctes)
       .join(dim_area) # SQLAlchemy can infer the ForeignKey here through the CTE
      )
print(sql)

WITH available_spaces AS 
(SELECT fkt_parking.count AS count, fkt_parking.area_id AS area_id, fkt_parking.year_month AS year_month, fkt_parking.hour AS hour 
FROM fkt_parking JOIN dim_parking_types ON dim_parking_types.type_id = fkt_parking.count_type 
WHERE dim_parking_types.name = :name_1), 
occupied_spaces AS 
(SELECT fkt_parking.count AS count, fkt_parking.area_id AS area_id, fkt_parking.year_month AS year_month, fkt_parking.hour AS hour 
FROM fkt_parking JOIN dim_parking_types ON dim_parking_types.type_id = fkt_parking.count_type 
WHERE dim_parking_types.name = :name_2)
 SELECT (:count_1 * occupied_spaces.count) / available_spaces.count AS occupancy_rate, dim_area.city, available_spaces.year_month, available_spaces.hour 
FROM available_spaces JOIN occupied_spaces ON occupied_spaces.area_id = available_spaces.area_id AND occupied_spaces.year_month = available_spaces.year_month AND occupied_spaces.hour = available_spaces.hour JOIN dim_area ON dim_area.area_id = occupied_spaces.area_

In [33]:
with engine.connect() as conn:
    df = pd.read_sql(sql, conn)
df

Unnamed: 0,occupancy_rate,city,year_month,hour
0,96,Løgumkloster,2021-09,12
1,115,Løgumkloster,2021-03,12
2,91,Løgumkloster,2020-09,12
3,94,Løgumkloster,2019-09,12
4,110,Løgumkloster,2019-03,12
...,...,...,...,...
4453,64,Tikøb,2021-03,22
4454,76,Horslunde,2021-09,22
4455,74,Horslunde,2021-03,22
4456,72,Vemb,2021-09,22


As this example illustrates, SQLAlchemy Core is close to SQL, but with the added convenience of a full programming language behind it

# Exercise

Find all rows where occupancy rate is above 100