# Writing Core SQL

![Core Layer](./images/core_layer.png)

Before we can write some SQL, we need to have some data and we need to define some tables.

We have some data from [Open Data DK](https://www.opendata.dk/city-of-copenhagen/parkeringstaelling-i-zoner) which counts total parking spots available and how many were used at given points in time. 

## Creating tables
First we need to define the tables we will use - remember these aren't connected to a database, they're just definitions

In [None]:
import sqlalchemy as sa

meta = sa.MetaData()

fkt_parking = sa.Table("fkt_parking", 
                       meta, 
                       sa.Column("id", sa.Integer, primary_key=True),
                       sa.Column("area_id", sa.Integer, sa.ForeignKey("dim_area.area_id")), 
                       sa.Column("year_month", sa.VARCHAR(20)),
                       sa.Column("count_type", sa.Integer, sa.ForeignKey("dim_parking_types.type_id")),
                       sa.Column("count", sa.Integer),
                       sa.Column("hour", sa.Integer)
                      )

dim_area = sa.Table("dim_area", 
                    meta,
                    sa.Column("area_id", sa.Integer, primary_key=True),
                    sa.Column("city", sa.VARCHAR(50)),
                    sa.Column("street_name", sa.VARCHAR(200)),
                    sa.Column("postnr", sa.VARCHAR(4)),
                    sa.Column("nr", sa.Integer)
                   )
                       
dim_parking_types = sa.Table("dim_parking_types",
                             meta,
                             sa.Column("type_id", sa.Integer, primary_key=True),
                             sa.Column("name", sa.VARCHAR(50), unique=True)
                            )

Given that we're starting a new database, we can use SQLAlchemy to create the tables for us. 

The `MetaData` catalogue let's SQLAlchemy know what dependencies each table has, so it can create and drop constraints in the correct order

In [None]:
# If you have docker installed - uncomment these lines
# !docker run -e POSTGRES_PASSWORD=postgres -p 5432:5432 -d postgres
conn_string = "postgresql://postgres:postgres@localhost:5432"
# Otherwise, use the sqlite conn_string
# conn_string = "sqlite:///parking.db"

`Metadata` contains a listing of all the available tables exposed as a dictionary-like interface

In [None]:
meta.tables

To actually interact with our database, we need our engine (and we want 2.0-style querying!)

In [None]:
engine = sa.create_engine(conn_string, future=True)

Because the `Metadata` object knows about all the tables, we can ask it to create them for us. It even knows the dependencies between tables, generating them in the correct order to avoid missing foreign key relationships

In [None]:
meta.create_all(engine)

# Inserting data

To write the SQL, we use SQLAlchemy's constructs to map to our familiar SQL concepts

In [None]:
insert_sql = sa.insert(dim_parking_types).values(type_id=0, name="legal")
print(insert_sql)

(Look Ma, no SQL Injections!)

The table class also has some helper methods

In [None]:
insert_sql = dim_parking_types.insert().values(type_id=0, name="legal")
print(insert_sql)

The SQL has been generated, now we need to send it to the database by connecting - our first "work-being-done". Always make sure your database connection is closed properly - the easiest way in python is using the `with` block:

In [None]:
with engine.connect() as conn:
    conn.execute(insert_sql)

If we go check the database - nothing has happened!

When altering data, the changes will be rolled back unless we actively commit

In [None]:
with engine.connect() as conn:
    conn.execute(insert_sql)
    conn.commit()

Let's delete the row with `delete()`

In [None]:
delete_sql = dim_parking_types.delete().where(dim_parking_types.c.type_id == 0)
print(delete_sql)

In [None]:
with engine.connect() as conn:
    conn.execute(delete_sql)
    conn.commit()

By default, SQLAlchemy requires you to **commit as you go** - every `execute` will be sent to the database, but it will only be persisted by calling `conn.commit()`. Calling `conn.rollback()` will roll back any non-committed changes.

In [None]:
with engine.connect() as conn:
    conn.execute(insert_sql)
    conn.commit()
    conn.execute(delete_sql)
    conn.rollback()

## Unit of Work

![deepalchemy](images/deepalchemy.png)

SQLAlchemy uses a **Unit of Work** pattern. 

SQLAlchemy expects us to prepare all of our changes, and then send it all to the database at the same time. This allows SQLAlchemy to optimize the resulting communication with the database, as it knows what work needs to be done. The active commit is part of this pattern, allowing greater control of when the changes are flushed to the database.

**Unit of Work** means that SQLAlchemy is usually working in a transaction. We can also explicitly manage our own transactions

In [None]:
with engine.connect() as conn:
    conn.execute(delete_sql)
    conn.commit()
    transaction = conn.begin()
    conn.execute(insert_sql)
    transaction.rollback()

Usually we don't need this level of control, so we can use the **Begin Once** style of commit

In [None]:
with engine.begin() as conn:
    conn.execute(insert_sql)
    conn.execute(delete_sql)

The transaction is automatically committed at the end of the `with` block

In [None]:
with engine.begin() as conn:
    conn.execute(insert_sql)
    raise Exception("Something bad happened")

If there's an exception, the transaction is automatically rolled back

# Inserting many rows

Let's read our csv files and insert them into the database using SQLAlchemy

In [None]:
import csv
import pathlib

First we pair up the file paths and the corresponding table

In [None]:
data = [(pathlib.Path("../data/dim_parking_types.csv"), dim_parking_types),
        (pathlib.Path("../data/dim_area.csv"), dim_area),
        (pathlib.Path("../data/fkt_parking.csv"), fkt_parking)]

In [None]:
# Run everything inside one transaction - Unit of Work
with engine.begin() as conn:
    for data_file, table in data:
        # Read the CSV file as a list of dictionaries
        with data_file.open(encoding="utf-8") as f:
            rows = list(csv.DictReader(f))
        # Create the Insert statement
        sql = table.insert()
        
        conn.execute(sql, parameters=rows)

By passing a list of dictionaries to parameters, SQLAlchemy knows to use the `.executemany()` method of the underlying DB-API library, which optimizes for inserting multiple rows

# Writing some logic

Now that we have some data in tables to work with, let's try to write some business logic.

## Task 1: Get only counts for Gilleleje

In [None]:
sql = sa.select([fkt_parking.c.count, fkt_parking.c.hour, dim_area]).join(dim_area).where(dim_area.c.city == "Gilleleje")
print(sql)

Because of the foreign key relationship, SQLAlchemy automatically infers the join condition - pretty handy!

In [None]:
with engine.connect() as conn:
    results = conn.execute(sql).all()

In [None]:
[dict(row) for row in results[:5]]


We can also throw it into a Dataframe directly to get a nice representation

In [None]:
import pandas as pd

pd.DataFrame(results)

(In fact, Pandas supports SQLAlchemy queries directly)

In [None]:
with engine.connect() as conn:
    df = pd.read_sql(sql, conn)
df

## Task 2: Get the sum of counts per hour per city

Any SQL function is available under the `func` namespace - these will be dynamically generated based on the backend. In this case, we want a `sum`

In [None]:
sql = (sa.select([sa.func.sum(fkt_parking.c.count).label("total_count"), 
                  dim_area.c.city, 
                  fkt_parking.c.hour])
       .join(dim_area)
       .group_by(dim_area.c.city, fkt_parking.c.hour))
print(sql)

In [None]:
with engine.connect() as conn:
    result = conn.execute(sql)
    df = pd.DataFrame(result)
df

### Refactoring our SQL

The SQL is getting a bit longer and we have some repetition - good thing this is Python and not SQL! We can create variables and functions to manipulate the SQL

In [None]:
table = fkt_parking.join(dim_area)
total_count = sa.func.sum(fkt_parking.c.count).label("total_count")
dimension_cols = [dim_area.c.city, fkt_parking.c.hour]

In [None]:
sql = sa.select([total_count, *dimension_cols]).select_from(table).group_by(*dimension_cols)

Previous code for comparison
```
sql = (sa.select([sa.func.sum(fkt_parking.c.count).label("total_count"), 
                  dim_area.c.city,
                  fkt_parking.c.hour])
         .join(dim_area)
         .group_by(dim_area.c.city, fkt_parking.c.hour))
```

### ASIDE: 
We can actually dynamically update the SQL on-the-fly given our requirements - so we could put any of these into a helper function or pull out common calculations into a module

In [None]:
# Add an additional column to the groupby
print(sql.group_by(fkt_parking.c.count_type))

In [None]:
# Add an additional colum to the select
print(sql.add_columns(fkt_parking.c.count_type))

In [None]:
# Notice that this doesn't alter the SQL statement, it creates a new SQL statement
print(sql)

### END ASIDE

In [None]:
with engine.connect() as conn:
    # Dynamically add a limit statement
    result = conn.execute(sql.limit(10))
    df = pd.DataFrame(result)
df

# Task 3: Calculate Occupancy rate

Now we're getting a bit more complicated - to solve this, we will create two CTEs and join them together to match up the two different parking types

## ASIDE
A CTE (Common Table Expression) is way of defining an intermediate table in SQL - another option is to use a subquery


In [None]:
# We can reuse our base select
base_select = sa.select([fkt_parking.c.count, fkt_parking.c.area_id, fkt_parking.c.year_month, fkt_parking.c.hour]).join(dim_parking_types)

In [None]:
# Create a CTE named available_spaces filtered on parking_types
available_spaces = base_select.where(dim_parking_types.c.name == "legal").cte("available_spaces")

In [None]:
# Create another CTE named occupied spaces filtered on parking_types
counted_spaces = base_select.where(dim_parking_types.c.name == "counted").cte("occupied_spaces")

In [None]:
# Define the metric of interest
occupancy_rate = (100 * sa.cast(counted_spaces.c.count, sa.Float) / available_spaces.c.count).label("occupancy_rate")

In [None]:
# Define the join - the CTE's don't have Foreignkeys SQLAlchemy can use to infer the relationship
cte_join_condition = sa.and_(counted_spaces.c.area_id == available_spaces.c.area_id,
                        counted_spaces.c.year_month == available_spaces.c.year_month,
                        counted_spaces.c.hour == available_spaces.c.hour
                        )

In [None]:
# Create a join
joined_ctes = available_spaces.join(counted_spaces, onclause=cte_join_condition)

In [None]:
sql = (sa.select([occupancy_rate, dim_area.c.city, available_spaces.c.year_month, available_spaces.c.hour])
       .select_from(joined_ctes)
       .join(dim_area) # SQLAlchemy can infer the ForeignKey here through the CTE
      )
print(sql)

In [None]:
with engine.connect() as conn:
    df = pd.read_sql(sql.order_by(occupancy_rate.desc()), conn)
df

As this example illustrates, SQLAlchemy Core is close to SQL, but with the added convenience of a full programming language behind it