# Psycopg


Let's import `psycopg` and make a simple query. This requires:

- preparing connection string (see [Postgres docs](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING))
- making a connection
- creating a cursor
- running a query on the cursor
- parsing results

See the code below and comments. As our query, we will read:

- total number of trains stopping in Amsterdam
- all trains that were late in Amsterdam


In [2]:
import psycopg

# build the connection string
host = "localhost"
username = "postgres"
password = "postgres"  # NEVER do this in real code, read from secure secrets file
db_name = "postgres"

db_conn_string = f"host={host} user={username} password={password} dbname={db_name}"
print(db_conn_string)

host=localhost user=postgres password=postgres dbname=postgres


In [5]:
query_total_trains = """
SELECT COUNT(*)
FROM services
WHERE "Stop:Station code" = 'ASD'
"""

query_late_departure_trains = """
SELECT *
FROM services
WHERE "Stop:Station code" = 'ASD' AND "Stop:Departure delay" > 0
"""

# connect to the database (context manager will automatically close connection)
with psycopg.connect(db_conn_string) as conn:
    # create a cursor to execute commands
    with conn.cursor() as cur:
        # send query and fetch result
        cur.execute(query_total_trains)
        total_trains = cur.fetchone()

        # send query and fetch results
        cur.execute(query_late_departure_trains)
        late_trains = cur.fetchall()

print("Total trains:", total_trains)

print("Late trains:")
for record in late_trains[:3]:
    print(record)

Total trains: (404915,)
Late trains:
('12690865', datetime.date(2024, 1, 1), 'Intercity', 'NS', '1410', False, False, 0, '114307598', 'ASD', 'Amsterdam Centraal', datetime.datetime(2024, 1, 1, 3, 18), 1, False, datetime.datetime(2024, 1, 1, 3, 20), 2, False, False, '4', '4')
('12690867', datetime.date(2024, 1, 1), 'Intercity', 'NS', '1409', False, False, 0, '114307612', 'ASD', 'Amsterdam Centraal', datetime.datetime(2024, 1, 1, 2, 44), 4, False, datetime.datetime(2024, 1, 1, 2, 46), 8, False, True, '5', '7')
('12690870', datetime.date(2024, 1, 1), 'Intercity', 'NS', '1414', False, False, 0, '114307633', 'ASD', 'Amsterdam Centraal', datetime.datetime(2024, 1, 1, 4, 17), 3, False, datetime.datetime(2024, 1, 1, 4, 19), 4, False, False, '4', '4')


**Connection** manages database session and network connection. It should always be used as a context manager, to close it after usage, since keeping "dangling" open connections wastes system resources. Opening and closing connections has some cost, so for batches of tasks it's good to keep the connection open.

**Connection string** defines the Postgres connection. We can either build a string, or pass keyword arguments to `.connect()`.

**Cursor** executes commands to the database. It will always use a transaction in psycopg. If you don't explicitly commit by calling `conn.commit()` inside the cursor block, it will roll back all changes. However, here we only read data, so this doesn't matter.

**Results** are by default returned as a tuple for `.fetchone()` (even a single element), or list of tuples for `.fetchall()`. Types are automatically parsed, e.g. dates are `datetime.date` objects, with exact rules [in the documentation](https://www.psycopg.org/psycopg3/docs/basic/adapt.html).

Tuples are not always convenient, and we have two main options:

- use [row factory](https://www.psycopg.org/psycopg3/docs/advanced/rows.html#row-factories) to return dictionaries, named tuples, or a custom dataclass (you need to write it first)
- use an ORM like SQLAlchemy or peewee

Which one makes the most sense depends on a use case. If you use a custom dataclass, you are really close to using an ORM, so the line is often blurry.

**Efficiency** typically has tradeoffs for large data. Processing data in sizes is typically necessary, as we can't easily load all data into memory like here. Smaller batches are sometimes more efficient within the database, particularly when we have proper indexes. For example, you may want to query for a single month at a time. However, too small batches will waste resources on network requests.

**Column names** are often useful for automation, and we can get them without wasting a lot of memory for dictionaries. In fact, creating Pandas DataFrames from default tuples is actually faster and easier. We can get the list of column names from the cursor `description` attribute:


In [6]:
with psycopg.connect(db_conn_string) as conn:
    with conn.cursor() as cur:
        # LIMIT 0 for efficiency - we don't want rows, just column names
        cur.execute("SELECT * FROM services LIMIT 0")
        colnames = [desc[0] for desc in cur.description]
        print(colnames)

['Service:RDT-ID', 'Service:Date', 'Service:Type', 'Service:Company', 'Service:Train number', 'Service:Completely cancelled', 'Service:Partly cancelled', 'Service:Maximum delay', 'Stop:RDT-ID', 'Stop:Station code', 'Stop:Station name', 'Stop:Arrival time', 'Stop:Arrival delay', 'Stop:Arrival cancelled', 'Stop:Departure time', 'Stop:Departure delay', 'Stop:Departure cancelled', 'Stop:Platform change', 'Stop:Planned platform', 'Stop:Actual platform']


**Exercise**

1. Write a query to get all trains that both arrived and departured late.

2. Use a row factory to return dictionaries.

```python
    (from psycopg.rows import dict_row)
```

3. Save the result to file `data/late_train.jsonl` in JSON Lines format. `json` module will be useful.

Since JSON does not have a default date or datetime format, we need to provide an explicit encoder for that. Pass this class to `json.dump()` as `cls` argument.


In [7]:
import datetime
import json


class DateEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat()
        return json.JSONEncoder.default(self, obj)

Exercise 1


In [13]:
with psycopg.connect(db_conn_string) as conn:
    with conn.cursor() as cur:
        cur.execute(
            'SELECT * FROM services WHERE "Stop:Arrival delay" > 0 AND "Stop:Departure delay" > 0 LIMIT 100'
        )

        for record in cur:
            print(record)

('12690865', datetime.date(2024, 1, 1), 'Intercity', 'NS', '1410', False, False, 0, '114307598', 'ASD', 'Amsterdam Centraal', datetime.datetime(2024, 1, 1, 3, 18), 1, False, datetime.datetime(2024, 1, 1, 3, 20), 2, False, False, '4', '4')
('12690865', datetime.date(2024, 1, 1), 'Intercity', 'NS', '1410', False, False, 0, '114307599', 'ASB', 'Amsterdam Bijlmer ArenA', datetime.datetime(2024, 1, 1, 3, 31), 2, False, datetime.datetime(2024, 1, 1, 3, 31), 2, False, False, '8', '8')
('12690866', datetime.date(2024, 1, 1), 'Nightjet', 'NS Int', '420', False, False, 0, '114307605', 'DUSSEL', 'Düsseldorf Hbf', datetime.datetime(2024, 1, 1, 7, 23), 1, False, datetime.datetime(2024, 1, 1, 7, 25), 6, False, False, None, None)
('12690866', datetime.date(2024, 1, 1), 'Nightjet', 'NS Int', '420', False, False, 0, '114307606', 'DUISB', 'Duisburg Hbf', datetime.datetime(2024, 1, 1, 7, 39), 4, False, datetime.datetime(2024, 1, 1, 7, 41), 4, False, False, None, None)
('12690867', datetime.date(2024, 1, 

Exercise 2


In [4]:
from psycopg.rows import dict_row

with psycopg.connect(db_conn_string) as conn:
    with conn.cursor(row_factory=dict_row) as cur:
        cur.execute("SELECT * FROM services LIMIT 10")

        for row in cur:
            print(row)

{'Service:RDT-ID': '12690865', 'Service:Date': datetime.date(2024, 1, 1), 'Service:Type': 'Intercity', 'Service:Company': 'NS', 'Service:Train number': '1410', 'Service:Completely cancelled': False, 'Service:Partly cancelled': False, 'Service:Maximum delay': 2, 'Stop:RDT-ID': '114307592', 'Stop:Station code': 'RTD', 'Stop:Station name': 'Rotterdam Centraal', 'Stop:Arrival time': None, 'Stop:Arrival delay': None, 'Stop:Arrival cancelled': None, 'Stop:Departure time': datetime.datetime(2024, 1, 1, 2, 0), 'Stop:Departure delay': 0, 'Stop:Departure cancelled': False, 'Stop:Platform change': False, 'Stop:Planned platform': '9', 'Stop:Actual platform': '9'}
{'Service:RDT-ID': '12690865', 'Service:Date': datetime.date(2024, 1, 1), 'Service:Type': 'Intercity', 'Service:Company': 'NS', 'Service:Train number': '1410', 'Service:Completely cancelled': False, 'Service:Partly cancelled': False, 'Service:Maximum delay': 0, 'Stop:RDT-ID': '114307593', 'Stop:Station code': 'DT', 'Stop:Station name': 'D

Exercise 3


In [7]:
import json

json_path = "data/results.jsonl"

with open(json_path, "w", encoding="utf-8") as f:
    with psycopg.connect(db_conn_string) as conn:
        with conn.cursor(row_factory=dict_row) as cur:
            cur.execute("SELECT * FROM services LIMIT 100")

            for row in cur:
                json.dump(row, f, ensure_ascii=False, default=str)
                f.write("\n")