# Psycopg

Let's import `psycopg` and make a simple query. This requires:
- preparing connection string (see [Postgres docs](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING))
- making a connection
- creating a cursor
- running a query on the cursor
- parsing results

See the code below and comments. As our query, we will read:
- total number of trains stopping in Amsterdam
- all trains that were late in Amsterdam

In [1]:
import psycopg

# build the connection string
host = "localhost"
username = "postgres"
password = "postgres"  # NEVER do this in real code, read from secure secrets file
db_name = "postgres"

db_conn_string = f"host={host} user={username} password={password} dbname={db_name}"
print(db_conn_string)

host=localhost user=postgres password=postgres dbname=postgres


In [2]:
query_total_trains = """
SELECT COUNT(*)
FROM services
WHERE "Stop:Station code" = 'ASD'
"""

query_late_departure_trains = """
SELECT *
FROM services
WHERE "Stop:Station code" = 'ASD' AND "Stop:Departure delay" > 0
"""

# connect to the database (context manager will automatically close connection)
with psycopg.connect(db_conn_string) as conn:
    # create a cursor to execute commands
    with conn.cursor() as cur:
        # send query and fetch result
        cur.execute(query_total_trains)
        total_trains = cur.fetchone()

        # send query and fetch results
        cur.execute(query_late_departure_trains)
        late_trains = cur.fetchall()

print("Total trains:", total_trains)

print("Late trains:")
for record in late_trains[:3]:
    print(record)

Total trains: (404915,)
Late trains:
(15063522, datetime.date(2024, 12, 27), 'Intercity', 'NS', 1573, False, False, 1, 135937936, 'ASD', 'Amsterdam Centraal', None, None, None, datetime.datetime(2024, 12, 27, 20, 0), 1, False, True, '8a', '8')
(15063589, datetime.date(2024, 12, 27), 'Sprinter', 'NS', 8375, False, False, 2, 135938564, 'ASD', 'Amsterdam Centraal', None, None, None, datetime.datetime(2024, 12, 27, 20, 13), 2, False, False, '10a', '10a')
(15063590, datetime.date(2024, 12, 27), 'Intercity', 'NS', 3072, False, False, 0, 135938576, 'ASD', 'Amsterdam Centraal', datetime.datetime(2024, 12, 27, 21, 35), 7, False, datetime.datetime(2024, 12, 27, 21, 39), 4, False, False, '7a', '7a')


**Connection** manages database session and network connection. It should always be used as a context manager, to close it after usage, since keeping "dangling" open connections wastes system resources. Opening and closing connections has some cost, so for batches of tasks it's good to keep the connection open.

**Connection string** defines the Postgres connection. We can either build a string, or pass keyword arguments to `.connect()`.

**Cursor** executes commands to the database. It will always use a transaction in psycopg. If you don't explicitly commit by calling `conn.commit()` inside the cursor block, it will roll back all changes. However, here we only read data, so this doesn't matter.

**Results** are by default returned as a tuple for `.fetchone()` (even a single element), or list of tuples for `.fetchall()`. Types are automatically parsed, e.g. dates are `datetime.date` objects, with exact rules [in the documentation](https://www.psycopg.org/psycopg3/docs/basic/adapt.html).

Tuples are not always convenient, and we have two main options:
- use [row factory](https://www.psycopg.org/psycopg3/docs/advanced/rows.html#row-factories) to return dictionaries, named tuples, or a custom dataclass (you need to write it first)
- use an ORM like SQLAlchemy or peewee

Which one makes the most sense depends on a use case. If you use a custom dataclass, you are really close to using an ORM, so the line is often blurry.

**Efficiency** typically has tradeoffs for large data. Processing data in sizes is typically necessary, as we can't easily load all data into memory like here. Smaller batches are sometimes more efficient within the database, particularly when we have proper indexes. For example, you may want to query for a single month at a time. However, too small batches will waste resources on network requests.

**Column names** are often useful for automation, and we can get them without wasting a lot of memory for dictionaries. In fact, creating Pandas DataFrames from default tuples is actually faster and easier. We can get the list of column names from the cursor `description` attribute:

In [3]:
with psycopg.connect(db_conn_string) as conn:
    with conn.cursor() as cur:
        # LIMIT 0 for efficiency - we don't want rows, just column names
        cur.execute("SELECT * FROM services LIMIT 0")
        colnames = [desc[0] for desc in cur.description]
        print(colnames)

['Service:RDT-ID', 'Service:Date', 'Service:Type', 'Service:Company', 'Service:Train number', 'Service:Completely cancelled', 'Service:Partly cancelled', 'Service:Maximum delay', 'Stop:RDT-ID', 'Stop:Station code', 'Stop:Station name', 'Stop:Arrival time', 'Stop:Arrival delay', 'Stop:Arrival cancelled', 'Stop:Departure time', 'Stop:Departure delay', 'Stop:Departure cancelled', 'Stop:Platform change', 'Stop:Planned platform', 'Stop:Actual platform']


**Exercise**


1. Write a query to get all trains that both arrived and departured late.

2. Use a row factory to return dictionaries. 
```python
    (from psycopg.rows import dict_row)
```

3. Save the result to file `data/late_train.jsonl` in JSON Lines format. `json` module will be useful.

Since JSON does not have a default date or datetime format, we need to provide an explicit encoder for that. Pass this class to `json.dump()` as `cls` argument.

In [4]:
import datetime
import json


class DateEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime.date, datetime.datetime)):
            return obj.isoformat()
        return json.JSONEncoder.default(self, obj)

In [None]:
from psycopg.rows import dict_row
import os
import json
import time

QUERY = 'SELECT * FROM services s WHERE s."Stop:Arrival delay" > 0 AND s."Stop:Departure delay" > 0'

RESULT_FILE = "labs/mlops_lab2/data/late_train.jsonl"
os.makedirs(os.path.dirname(RESULT_FILE), exist_ok=True)

### Naive Try

In [8]:
naive_start_time = time.perf_counter()
count = 0

with psycopg.connect(db_conn_string, row_factory=dict_row) as conn:
    with conn.cursor() as cur:
        cur.execute(QUERY)

        with open(RESULT_FILE, "w", encoding="utf-8") as f:
            for row in cur:
                json.dump(row, fp=f, cls=DateEncoder)
                f.write("\n")

                count += 1

                if count % 1000000 == 0:
                    print(
                        f"Written {count} rows in {time.perf_counter() - naive_start_time:.2f}"
                    )

naive_end_time = time.perf_counter()
duration = naive_end_time - naive_start_time
print(f"Duration: {duration:.2f}, count: {count}")


Written 1000000 rows in 124.40
Written 2000000 rows in 247.80
Written 3000000 rows in 373.31
Written 4000000 rows in 496.98
Written 5000000 rows in 604.99
Duration: 616.44, count: 5098078


### Pandas solution

In [7]:
import pandas

CHUNKS_SIZE = 250_000
pandas_start_time = time.perf_counter()
count = 0

with psycopg.connect(db_conn_string) as conn:
    for chunk_df in pandas.read_sql_query(QUERY, con=conn, chunksize=CHUNKS_SIZE):
        chunk_df.to_json(RESULT_FILE, orient="records", lines=True, mode="a")
        count += chunk_df.shape[0]

        print(f"Loaded {count} rows")


pandas_end_time = time.perf_counter()
pandas_duration = pandas_end_time - pandas_start_time
print(f"Duration: {pandas_duration:.2f}, count: {count}")


  for chunk_df in pandas.read_sql_query(QUERY, con=conn, chunksize=CHUNKS_SIZE):


Loaded 250000 rows
Loaded 500000 rows
Loaded 750000 rows
Loaded 1000000 rows
Loaded 1250000 rows
Loaded 1500000 rows
Loaded 1750000 rows
Loaded 2000000 rows
Loaded 2250000 rows
Loaded 2500000 rows
Loaded 2750000 rows
Loaded 3000000 rows
Loaded 3250000 rows
Loaded 3500000 rows
Loaded 3750000 rows
Loaded 4000000 rows
Loaded 4250000 rows
Loaded 4500000 rows
Loaded 4750000 rows
Loaded 5000000 rows
Loaded 5098078 rows
Duration: 402.06, count: 5098078


### Results

Pandas solution is faster :)