In [1]:
import numpy as np
import pandas as pd
from pathlib import Path
from dataclasses import dataclass, field
from jh_interview.models import TransactionModel, PropertyModel, PostcodeModel

from hashlib import md5

In [2]:
# Constants

DATA_DIR = Path('../data/')
"""Path to the data directory."""

PRICE_PAID_FILE_FIRST = DATA_DIR / 'pp-2019.csv'
"""Path to the first price paid data file."""

PRICE_PAID_FILE_SECOND = DATA_DIR / 'pp-2020.csv'
"""Path to the second price paid data file."""

HOUSE_PRICE_INDEX_FILE = DATA_DIR / 'UK-HPI-full-file-2021-03.csv'
"""Path to the house price index data file."""

AVERAGE_PRICES_FILE = DATA_DIR / 'Average-prices-2021-03.csv'
"""Path to the average prices data file."""

'Path to the average prices data file.'

In [3]:
def load_price_paid_data(filepaths: list[Path]) -> pd.DataFrame:
    """
    Load price paid data from a CSV file.
    """
    column_names = [
        'transaction_id', 'price', 'date_of_transfer', 'postcode', 'property_type',
        'old_new', 'duration', 'paon', 'saon', 'street', 'locality', 'town_city',
        'district', 'country', 'ppd_category_type', 'record_status'
        ]

    df = pd.concat(
        [
            pd.read_csv(
                filename,
                names=column_names,
            )
            for filename in filepaths
        ]
    )
    df['date_of_transfer'] = pd.to_datetime(df['date_of_transfer'])
    return df

In [4]:
def load_price_index_data(filepath: Path) -> pd.DataFrame:
    """
    Load house price index data from a CSV file.
    """
    column_names = [
        "Date", "RegionName", "AreaCode", "AveragePrice", "Index", "IndexSA",
        "1m%Change", "12m%Change", "AveragePriceSA", "SalesVolume",
        "DetachedPrice", "DetachedIndex", "Detached1m%Change", "Detached12m%Change",
        "SemiDetachedPrice", "SemiDetachedIndex", "SemiDetached1m%Change", "SemiDetached12m%Change",
        "TerracedPrice", "TerracedIndex", "Terraced1m%Change", "Terraced12m%Change",
        "FlatPrice", "FlatIndex", "Flat1m%Change", "Flat12m%Change",
        "CashPrice", "CashIndex", "Cash1m%Change", "Cash12m%Change", "CashSalesVolume",
        "MortgagePrice", "MortgageIndex", "Mortgage1m%Change", "Mortgage12m%Change", "MortgageSalesVolume",
        "FTBPrice", "FTBIndex", "FTB1m%Change", "FTB12m%Change",
        "FOOPrice", "FOOIndex", "FOO1m%Change", "FOO12m%Change",
        "NewPrice", "NewIndex", "New1m%Change", "New12m%Change", "NewSalesVolume",
        "OldPrice", "OldIndex", "Old1m%Change", "Old12m%Change", "OldSalesVolume",
    ]
    df = pd.read_csv(filepath, names=column_names, header=0)
    df['Date'] = pd.to_datetime(df['Date'])
    return df

In [5]:
def load_average_prices_data(filepath: Path) -> pd.DataFrame:
    """
    Load average prices data from a CSV file.
    """
    column_names = [
        "Date", "Region_Name", "Area_Code", "Average_Price", "Monthly_Change", "Annual_Change", "Average_Price_SA",
    ]
    df = pd.read_csv(filepath, names=column_names, header=0)
    df['Date'] = pd.to_datetime(df['Date'])
    return df

In [6]:
transactions_pp = load_price_paid_data(
    [PRICE_PAID_FILE_FIRST,
    PRICE_PAID_FILE_SECOND]
)
price_index = load_price_index_data(HOUSE_PRICE_INDEX_FILE)
average_prices = load_average_prices_data(AVERAGE_PRICES_FILE)

In [7]:
from jh_interview.database.schemas import Property, Transaction, Postcode
from jh_interview.database.db_session import get_session, DB_init, MSSQL_LINK
from jh_interview.database.repository import PropertyRepository, TransactionRepository, PostcodeRepository
from sqlalchemy.orm import Session

print(MSSQL_LINK)
DB_init()

mssql+pyodbc://sa:YourStrong!Passw0rd@localhost:1433/property_db?driver=ODBC+Driver+17+for+SQL+Server


In [9]:
from collections import defaultdict

def chunked_data(data, chunk_size):
    return (data[i:i+chunk_size] for i in range(0, len(data), chunk_size))

with get_session() as session:
    transactions_pp['property_id'] = (transactions_pp[['postcode', 'paon', 'saon']].astype(str).apply(lambda x: md5(''.join(x).encode()).hexdigest(), axis=1))

    transaction_data = transactions_pp[['transaction_id', 'price', 'date_of_transfer', 'property_id']].to_dict('records')

    property_data = defaultdict(lambda: {'transactions': []})

    for _, row in transactions_pp.iterrows():
        property_id = row['property_id']

        property_data[property_id].update({
            'unique_id': property_id,
            'property_type': row['property_type'] if pd.notna(row['property_type']) else None,
            'postcode': row['postcode'] if pd.notna(row['postcode']) else None,
            'old_new': row['old_new'] if pd.notna(row['old_new']) else None,
            'paon': row['paon'] if pd.notna(row['paon']) else None,
            'saon': row['saon'] if pd.notna(row['saon']) else None,
            'street': row['street'] if pd.notna(row['street']) else None,
            'locality': row['locality'] if pd.notna(row['locality']) else None,
            'town_city': row['town_city'] if pd.notna(row['town_city']) else None,
            'district': row['district'] if pd.notna(row['district']) else None,
            'country': row['country'] if pd.notna(row['country']) else None,
        })

        property_data[property_id]['transactions'].append(row['transaction_id'])
        
    property_data = list(property_data.values())

    print("Starting to insert transaction data")
    for chunk in chunked_data(transaction_data, 1000):
        session.bulk_insert_mappings(Transaction, chunk)

    print('Starting to insert property data')
    for chunk in chunked_data(property_data, 1000):
        session.bulk_insert_mappings(Property, chunk)

    session.commit()

Starting to insert transaction data
Starting to insert property data
