# Ingestion Experimentation notebook

This notebook largely exists to experiment with different config methods

## Configuration

These are some fields to configure if you wish to configure how the data is injected.

In [1]:
from lib.service.docker.defaults import INSTANCE_2_IMAGE_CONF, INSTANCE_2_CONTAINER_CONF
from lib.service.database.defaults import DB_INSTANCE_2_CONFIG

# If you mark this as true, the table `nsw_valuer_general.raw_entries`
# will be dropped. If you have space limitations and no desire to debug
# the data than dropping this makes sense. If you wish to debug some values
# then keeping this around may make some sense.
GLOBAL_FLAGS = {
    'drop_raw_nsw_valuer_general_entries': True,
    'reinitialise_container': True,
}

db_service_config = DB_INSTANCE_2_CONFIG
docker_image_conf = INSTANCE_2_IMAGE_CONF
docker_container_conf = INSTANCE_2_CONTAINER_CONF

Exception: 

## Download Static Files

In [None]:
import logging
from lib.service.io import IoService
from lib.tasks.fetch_static_files import initialise, get_session

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

io_service = IoService.create(None)
async with get_session(io_service) as session:
    environment = await initialise(io_service, session)

land_value_dis = environment.land_value
w_sale_price = environment.sale_price_weekly
a_sale_price = environment.sale_price_annual
gnaf_dis = environment.gnaf

## Create Docker Container and Database

In [None]:
from lib.pipeline.gnaf.init_schema import init_target_schema
from lib.service.docker import DockerService
from lib.service.database import DatabaseService

docker_service = DockerService.create()

if GLOBAL_FLAGS['reinitialise_container']:
    image = docker_service.create_image(docker_image_conf)
    image.prepare()

    container = docker_service.create_container(image, docker_container_conf)
    container.clean()
    container.prepare(db_service_config)
    container.start()
else:
    print('skipping container initialisation')

db_service = DatabaseService.create(db_service_config, 32)
await db_service.wait_till_running()
await db_service.open()

if GLOBAL_FLAGS['reinitialise_container']:
    await init_target_schema(gnaf_dis.publication, io_service, db_service)
else:
    print('skipping DB initialisation')
    raise Exception()

## Init DB Schema

In [2]:
from lib.tasks.update_schema import update_schema, UpdateSchemaConfig
from lib.tooling.schema.config import ns_dependency_order

await update_schema(
    UpdateSchemaConfig(
        packages=ns_dependency_order,
        range=None,
        apply=True,
    ),
    db_service,
    io_service,
)

TypeError: UpdateSchemaConfig.__init__() missing 2 required positional arguments: 'packages' and 'range'

## Ingest ABS Data

In [None]:
from lib.tasks.ingest_abs import ingest_all
from lib.pipeline.abs.defaults import ABS_MAIN_STRUCTURES, NON_ABS_MAIN_STRUCTURES
from lib.pipeline.abs.config import IngestionConfig, WorkerConfig, WorkerLogConfig

await ingest_all(
    IngestionConfig(
        ingest_sources=[ABS_MAIN_STRUCTURES, NON_ABS_MAIN_STRUCTURES],
        worker_count=4,
        worker_config=WorkerConfig(
            db_config=db_service_config,
            db_connections=2,
            log_config=WorkerLogConfig(
                level=logging.INFO,
                format='%(asctime)s - %(levelname)s - %(message)s',
                datefmt=None,
            ),
        ),
    ),
    db_service,
    io_service,
)

## Ingest NSW Valuer General Land Values

In [None]:
from lib.service.clock import ClockService
from lib.pipeline.nsw_vg.property_sales.ingestion import NSW_VG_PS_INGESTION_CONFIG
from lib.tasks.nsw_vg.ingest import ingest_nswvg
from lib.tasks.nsw_vg.ingest import NswVgIngestionConfig, NswVgIngestionDedupConfig, NswVgLandValueIngestionConfig
from lib.tasks.nsw_vg.ingest_property_sales import PropertySaleIngestionConfig, ChildConfig, ParentConfig

await ingest_nswvg(
    environment,
    ClockService(),
    db_service,
    io_service,
    NswVgIngestionConfig(
        load_raw_land_values=NswVgLandValueIngestionConfig(
            truncate_raw_earlier=False,
        ),
        load_raw_property_sales=PropertySaleIngestionConfig(
            worker_count=6,
            worker_config=ChildConfig(
                db_config=db_service_config,
                db_pool_size=4,
                db_batch_size=1000,
                file_limit=None,
                ingestion_config=NSW_VG_PS_INGESTION_CONFIG,
                parser_chunk_size=8 * 2 ** 10,
                log_config=None,
            ),
            parent_config=ParentConfig(
                target_root_dir='./_out_zip',
                publish_min=None,
                publish_max=None,
                download_min=None,
                download_max=None,
            ),
        ),
        deduplicate=NswVgIngestionDedupConfig(
            run_from=1,
            run_till=6,
        ),
        load_parcels=False,
    ),
)

## Ingest Gnaf

In [None]:
from lib.tasks.ingest_gnaf import ingest_gnaf
await ingest_gnaf(gnaf_dis.publication, db_service)

## Done

In [None]:
async with (
    await db_service.async_connect() as c,
    c.cursor() as cursor
):
    for schema in ['nsw_valuer_general', 'gnaf', 'abs_main_structures', 'non_abs_main_structures']:
        # Get the list of all tables
        cursor.execute(f"""
            SELECT table_name
            FROM information_schema.tables
            WHERE table_schema = '{schema}'
        """)
        tables = cursor.fetchall()
    
        # Get row count for each table
        for table in tables:
            await cursor.execute(f'SELECT COUNT(*) FROM {schema}.{table[0]}')
            count = cursor.fetchone()[0]
            print(f"Table {schema}.{table[0]} has {count} rows")