In [None]:
%pip install -r ../requirements.txt

In [132]:
import asyncio
import os
from asyncio import Queue
from math import ceil
from typing import Coroutine

import pandas as pd
from dotenv import load_dotenv
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import InsertOne, UpdateOne
import subprocess
import time
from pymongo import MongoClient

load_dotenv()

True

In [133]:
CONCURRENT_TASKS = 4
QUEUE_SIZE = 1
BIXI_DATA_URL = "https://bixi.com/en/open-data"
BIXI_CDN = "https://s3.ca-central-1.amazonaws.com/cdn.bixi.com/"
MONGO_URI = os.getenv("MONGO_URI")
BIXI_DB_NAME = os.getenv("BIXI_DB_NAME")
BIXI_HISTORIC_URLS_COLLECTION = os.getenv("BIXI_HISTORIC_URLS_COLLECTION")
LOCATION_COLLECTION = os.getenv("LOCATION_COLLECTION")
TRIP_COLLECTION = os.getenv("TRIP_COLLECTION")
DEFAULT_ZIP_PATH = "data/file.zip"
DEFAULT_EXTRACT_PATH = "data/"
CHUNK_SIZE = 250000


## Trip data model

![csv](img/csv.png)

In UC06, we want to be able to calculate the average trip duration on a given period of time.

Trip duration average requires individual trip durations. These can be computed before loading the data in db.

Station names are actually their location. And because stations might change name (location) over time, it's best if we use locations in the models.

![model](img/model.png)

For simplicity, we'll use the location names as id.

## Transformation and loading

There are several ways we can go about this.

We could use the `mongoimport` command-line tool but it's not very flexible when it come to transforming the data. Other tools could be used along `mongoimport` to help on that aspect but the learning curve becomes steep. 

An alternative could be using `Panda` and `Pymongo` to efficiently transform and load the data into the db.

### Testing `mongoimport`

In [124]:
def mongoimport_csv(csv_file, uri=MONGO_URI, db_name=BIXI_DB_NAME, collection="csv"):
    try:
        command = [
            'mongoimport',
            '--uri', uri+"?authSource=admin",
            '--db', db_name,
            '--collection', collection,
            '--type', 'csv',
            '--file', csv_file,
            '--headerline'
        ]
        process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = process.communicate()
        if process.returncode != 0:
            print(f"Error: {stderr.decode('utf-8')}")
        else:
            print(f"Output: {stdout.decode('utf-8')}")
    except Exception as e:
        print(f"An exception occurred: {e}")



In [125]:
csv_file = DEFAULT_EXTRACT_PATH + "DonneesOuvertes (1).csv"
t0 = time.time()
mongoimport_csv(csv_file)
t1 = time.time()
elapsed_time = t1 - t0
print("Elapsed time", elapsed_time, "seconds")

Output: 
Elapsed time 617.7822887897491 seconds


![mongoimport](img/mongoimport.png)

### `Pandas` and `Pymongo`

The CSV files are quite large (`1.5GB+`, `10M+` rows) and we need to prioritize efficiency to keep our resource consumption to the minimum. 

Luckily we can process each row individually. But loading them all at once would require large memory capacity. A strategy is to read the data by chunks. We found by trial that `250000` rows per chunk doesn't overwhelm a modest device like a raspberry pi (4 cores and 4GB of memory) but under utilize the CPU capacity. It then comes naturally that we should process the chunks in parallel. 

So on one hand, we need to pace the amount of data we load in memory, and on the other, we need to load enough chunks to process them in parallel. Fortunately, ready-made solutions exist, including [asyncio Queues and Workers](https://docs.python.org/3/library/asyncio-queue.html).

Our implementation looks like: 

- The `transform_load_csv` function iteratively reads `chunks` from the CSV file and schedules the processing tasks on a `queue` which has a limited size (1 by default). When the `queue` is full, the function must wait for a slot to open up before scheduling another task.

![main](img/mainfunction.png)

- The `workers` continuously poll the queue for available tasks and execute them. The `workers` work in parallel. We can adjust the number of `workers` and the `queue` size to control memory and processor usage to some extent.

![work](img/work.png)

- The location processing tasks extract location information from the received chunk according to the model and bulk write the generated documents in `MongoDB`. The location processing tasks don't bother removing duplicates because `MongoDB` prevents duplicated IDs anyway.

![locations](img/locations.png)

- The trip processing tasks remove information that are already extracted in location documents, keeping the station names for reference. The trip processing tasks then compute the duration of each trip in another column and bulk write the generated documents to `MongoDB`.

![trips](img/trips.png)



In [134]:
async def count_lines_async(filename):
    process = await asyncio.create_subprocess_exec(
        "wc", "-l", filename, stdout=asyncio.subprocess.PIPE
    )
    stdout, _ = await process.communicate()
    lines = int(stdout.split()[0])
    print(lines, "lines")
    return lines


async def process_locations(chunk, station_location_collection):
    stations = (
        pd.concat(
            [
                chunk[
                    [
                        "STARTSTATIONNAME",
                        "STARTSTATIONARRONDISSEMENT",
                        "STARTSTATIONLATITUDE",
                        "STARTSTATIONLONGITUDE",
                    ]
                ].rename(
                    columns={
                        "STARTSTATIONNAME": "name",
                        "STARTSTATIONARRONDISSEMENT": "arrondissement",
                        "STARTSTATIONLATITUDE": "latitude",
                        "STARTSTATIONLONGITUDE": "longitude",
                    }
                ),
                chunk[
                    [
                        "ENDSTATIONNAME",
                        "ENDSTATIONARRONDISSEMENT",
                        "ENDSTATIONLATITUDE",
                        "ENDSTATIONLONGITUDE",
                    ]
                ].rename(
                    columns={
                        "ENDSTATIONNAME": "name",
                        "ENDSTATIONARRONDISSEMENT": "arrondissement",
                        "ENDSTATIONLATITUDE": "latitude",
                        "ENDSTATIONLONGITUDE": "longitude",
                    }
                ),
            ]
        )
        .drop_duplicates(subset=["name"])
        .reset_index(drop=True)
    )

    operations = [
        UpdateOne(
            {"_id": row["name"]},
            {
                "$setOnInsert": {
                    "arrondissement": row["arrondissement"],
                    "latitude": row["latitude"],
                    "longitude": row["longitude"],
                }
            },
            upsert=True,
        )
        for _, row in stations.iterrows()
    ]
    if operations:
        await station_location_collection.bulk_write(operations, ordered=False)


async def process_trips(chunk, trip_collection):
    chunk["DURATION"] = chunk["ENDTIMEMS"] - chunk["STARTTIMEMS"]
    trip_docs = chunk[
        ["STARTSTATIONNAME", "ENDSTATIONNAME", "STARTTIMEMS", "ENDTIMEMS", "DURATION"]
    ].to_dict("records")
    if trip_docs:
        operations = [InsertOne(doc) for doc in trip_docs]
        await trip_collection.bulk_write(operations, ordered=False)


async def do(task: Coroutine, chunk, index, collection):
    print("🚀 started:", task.__name__, index)
    result = await task(chunk, collection)
    print(task.__name__, index, "✅")
    return result


async def queue_task(task: Coroutine, chunk, index, collection, queue: Queue):
    await queue.put(
        (
            do,
            (
                task,
                chunk,
                index,
                collection,
            ),
        )
    )
    print("➡️ queued:", task.__name__, index)


async def worker(queue: Queue):
    while True:
        task_func, args = await queue.get()
        await task_func(*args)
        queue.task_done()


async def transform_load_csv(
    filename,
    path=DEFAULT_EXTRACT_PATH,
    mongo_uri=MONGO_URI,
    db_name=BIXI_DB_NAME,
    locations_collection=LOCATION_COLLECTION,
    trips_collection=TRIP_COLLECTION,
    chunk_size=CHUNK_SIZE,
    concurrent_tasks=CONCURRENT_TASKS,
    queue_size=QUEUE_SIZE,
):
    print(
        "transform_load_csv",
        filename,
        "chunk_size",
        chunk_size,
        "concurrency",
        concurrent_tasks,
        "queue_size",
        queue_size,
    )
    client = AsyncIOMotorClient(mongo_uri)
    db = client[db_name]
    location_collection = db[locations_collection]
    trip_collection = db[trips_collection]
    csv_file_path = os.path.join(path, filename)

    queue = Queue(maxsize=queue_size)
    workers = [asyncio.create_task(worker(queue)) for _ in range(concurrent_tasks)]

    nb_lines = await count_lines_async(csv_file_path)
    total_chunks = ceil(nb_lines / chunk_size)
    print(total_chunks, "chunks")

    for index, chunk in enumerate(pd.read_csv(csv_file_path, chunksize=chunk_size)):
        await queue_task(process_locations, chunk, index, location_collection, queue)
        await queue_task(process_trips, chunk, index, trip_collection, queue)

    await queue.join()
    for w in workers:
        w.cancel()
    await asyncio.gather(*workers, return_exceptions=True)


In [135]:
csv_file = "DonneesOuvertes (1).csv"
t0 = time.time()
await transform_load_csv(csv_file)
t1 = time.time()
elapsed_time = t1 - t0
print("Elapsed time", elapsed_time, "seconds")

transform_load_csv DonneesOuvertes (1).csv chunk_size 250000 concurrency 4 queue_size 1
11790841 lines
48 chunks
➡️ queued: process_locations 0
🚀 started: process_locations 0
➡️ queued: process_trips 0
🚀 started: process_trips 0
➡️ queued: process_locations 1
process_locations 0 ✅
🚀 started: process_locations 1
➡️ queued: process_trips 1
🚀 started: process_trips 1
➡️ queued: process_locations 2
process_locations 1 ✅
🚀 started: process_locations 2
➡️ queued: process_trips 2
🚀 started: process_trips 2
➡️ queued: process_locations 3
process_locations 2 ✅
🚀 started: process_locations 3
➡️ queued: process_trips 3
process_locations 3 ✅
🚀 started: process_trips 3
➡️ queued: process_locations 4
process_trips 0 ✅
🚀 started: process_locations 4
➡️ queued: process_trips 4
process_locations 4 ✅
🚀 started: process_trips 4
➡️ queued: process_locations 5
process_trips 1 ✅
🚀 started: process_locations 5
➡️ queued: process_trips 5
process_locations 5 ✅
🚀 started: process_trips 5
➡️ queued: process_loca

![pandapymongo](img/pandaspymongo.png)

From the results, our `transform_load_csv` python function not only is faster but manages storage space more efficiently than `mongoimport`:

| Metric               | `mongoimport`      | `transform_load_csv` |
|----------------------|--------------------|----------------------|
| Transform and Load   | 10 mins            | 3 mins               |
| Database Size After  | 1.52GB             | ~0.8GB               |

⚠️ These tests were conducted with a high performance machine and a local instance of Mongodb. They are not representative of our production environment where we need to consider round trip delays to `Atlas` and limit our resource consumption. 

### Test on pi

```bash
(myenv) ➜  pytransit-bixi-extract du -sh . && /usr/bin/time -v python transform_load.py "DonneesOuvertes.csv" && du -sh .
2.2G    .
transform_load_csv DonneesOuvertes.csv chunk_size 250000 concurrency 10 queue_size 1
11790841 lines
48 chunks
        Command being timed: "python transform_load.py DonneesOuvertes.csv"
        User time (seconds): 507.42
        System time (seconds): 47.30
        Percent of CPU this job got: 107%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 8:37.03
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 941552
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 1134596
        Voluntary context switches: 596784
        Involuntary context switches: 42199
        Swaps: 0
        File system inputs: 2596768
        File system outputs: 0
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0
2.2G    .
```


## Drop the test collections

In [131]:
def drop_test_collections():
    client = MongoClient(MONGO_URI)
    db = client[BIXI_DB_NAME]
    db[TRIP_COLLECTION].drop()
    db[LOCATION_COLLECTION].drop()
    db["csv"].drop()

drop_test_collections()

## Encountered issues

- We use a free Atlas tier database for tests. It has a size limit of `~500MB`. The free tier also uses shared public resources like network, CPU.. so writing to the database can be slow.