In [1]:
import io
import pandas
import dotenv

import boto3
import botocore

from sqlalchemy import Engine
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from lib_land_registry_data.lib_db import PPMonthlyUpdateArchiveFileLog
from lib_land_registry_data.lib_env import EnvironmentVariables

In [2]:
dotenv.load_dotenv('.envrc')

True

In [8]:
environment_variables = EnvironmentVariables()

In [3]:
pp_complete_file = f'/data-land-registry/pp-complete/archive/pp-complete-2024-07-27.txt'

In [4]:
df = pandas.read_csv(
    pp_complete_file,
    header=None,
)

In [5]:
df_columns = [
    'transaction_unique_id',
    'price',
    'transaction_date',
    'postcode',
    'property_type',
    'new_tag',
    'lease',
    'primary_address_object_name',
    'secondary_address_object_name',
    'street',
    'locality',
    'town_city',
    'district',
    'county',
    'ppd_cat',
    'record_status',
]

In [6]:
df.columns = df_columns

In [22]:
df

Unnamed: 0,transaction_unique_id,price,transaction_date,postcode,property_type,new_tag,lease,primary_address_object_name,secondary_address_object_name,street,locality,town_city,district,county,ppd_cat,record_status
0,{5BBE9CB3-6332-4EB0-9CD3-8737CEA4A65A},42000,1995-12-21 00:00,NE4 9DN,S,N,F,8,,MATFEN PLACE,FENHAM,NEWCASTLE UPON TYNE,NEWCASTLE UPON TYNE,TYNE AND WEAR,A,A
1,{20E2441A-0F16-49AB-97D4-8737E62A5D93},95000,1995-03-03 00:00,RM16 4UR,S,N,F,30,,HEATH ROAD,GRAYS,GRAYS,THURROCK,THURROCK,A,A
2,{D893EE64-4464-44B5-B01B-8E62403ED83C},74950,1995-10-03 00:00,CW10 9ES,D,Y,F,15,,SHROPSHIRE CLOSE,MIDDLEWICH,MIDDLEWICH,CONGLETON,CHESHIRE,A,A
3,{F9F753A8-E56A-4ECC-9927-8E626A471A92},43500,1995-11-14 00:00,TS23 3LA,S,N,F,19,,SLEDMERE CLOSE,BILLINGHAM,BILLINGHAM,STOCKTON-ON-TEES,STOCKTON-ON-TEES,A,A
4,{E166398A-A19E-470E-BB5A-83B4C254CF6D},63000,1995-09-08 00:00,CA25 5QH,S,N,F,8,,CROSSINGS CLOSE,CLEATOR MOOR,CLEATOR MOOR,COPELAND,CUMBRIA,A,A
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
29299981,{12A8BAB6-EB70-2125-E063-4804A8C08CC1},50000,2024-01-19 00:00,TQ2 5PL,O,N,F,10,,UNION STREET,,TORQUAY,TORBAY,TORBAY,B,A
29299982,{12A8BAB6-EB71-2125-E063-4804A8C08CC1},246000,2024-02-19 00:00,EX32 7RL,T,N,F,56,,SILVERWOOD HEIGHTS,,BARNSTAPLE,NORTH DEVON,DEVON,B,A
29299983,{12A8BAB6-EB74-2125-E063-4804A8C08CC1},250000,2024-02-01 00:00,PL16 0LD,O,N,L,LYD VALE,GARAGE 2,OAK RIDGE,,LIFTON,WEST DEVON,DEVON,B,A
29299984,{12A8BAB6-EB7A-2125-E063-4804A8C08CC1},775000,2024-01-31 00:00,EX39 2RL,O,N,F,DYERS LOOKOUT,,DURRANT LANE,NORTHAM,BIDEFORD,TORRIDGE,DEVON,B,A


In [9]:
postgres_connection_string = environment_variables.get_postgres_connection_string()
postgres_engine = create_engine(postgres_connection_string)

In [10]:
with Session(postgres_engine) as session:
    rows = (
        session
        .query(PPMonthlyUpdateArchiveFileLog)
        .order_by(PPMonthlyUpdateArchiveFileLog.pp_monthly_update_archive_file_log_id)
        .all()
    )

print(f'number of rows loaded: {len(rows)}')

number of rows loaded: 114


In [11]:
aws_access_key_id = environment_variables.get_aws_access_key_id()
aws_secret_access_key = environment_variables.get_aws_secret_access_key()
minio_url = environment_variables.get_minio_url()

boto3_session = boto3.Session(
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
)

In [12]:
print(f'current pp-complete dataframe contains {len(df)} rows')

current pp-complete dataframe contains 29299986 rows


In [13]:
df_copy = df.copy()

In [14]:
df['transaction_unique_id'].value_counts().max()

np.int64(1)

In [17]:
df_month_list = []

for row in rows:
    pp_monthly_update_archive_file_log_id = row.pp_monthly_update_archive_file_log_id
    data_timestamp = row.data_timestamp
    data_timestamp_str = data_timestamp.strftime('%Y-%m-%d')
    print(f'processing row {data_timestamp_str=}')

    s3_bucket = row.s3_bucket
    s3_object_key = row.s3_object_key

    boto3_client = (
        boto3_session.client(
            's3',
            endpoint_url=minio_url,
            config=botocore.config.Config(signature_version='s3v4'),
        )
    )

    print(f'loading {s3_bucket}/{s3_object_key}')

    s3_response_object = boto3_client.get_object(Bucket=s3_bucket, Key=s3_object_key)
    pp_monthly_update_data = s3_response_object['Body'].read()

    df_current_month = pandas.read_csv(
        io.BytesIO(pp_monthly_update_data),
        header=None,
    )
    print(f'current month df contains {len(df_current_month)} rows')

    if len(df_current_month.columns) == 16:
        pass
    elif len(df_current_month.columns) == 15:
        df_current_month['ppd_cat'] = None
    else:
        raise RuntimeError(f'invalid number of columns: {len(df_current_month.columns)}')

    df_current_month.columns = df_columns

    # check no duplicate transaction_unique_id in this months dataframe
    df_current_month_tuid_value_counts = df_current_month['transaction_unique_id'].value_counts()

    vc_gt = df_current_month_tuid_value_counts[df_current_month_tuid_value_counts > 1]
    if len(vc_gt) > 0:
        print(vc_gt)

    df_tuid_dup = df_current_month[df_current_month['transaction_unique_id'].isin(vc_gt.index)].copy()
    df_tuid_dup.reset_index(inplace=True)
    if len(df_tuid_dup) > 0:
        print(f'duplicate transaction_unique_id detected')
        print(df_tuid_dup)
        df_tuid_dup.sort_values(by='transaction_unique_id').to_csv(f'df_tuid_dup_{data_timestamp_str}.csv')

    df_month_list.append(df_current_month)

df_months = pandas.concat(df_month_list, ignore_index=True)

processing row data_timestamp_str='2015-01-31'
loading land-registry-data-archive/PPMS_update_2015_1_31.txt
current month df contains 83245 rows
Empty DataFrame
Columns: [transaction_unique_id, price, transaction_date, postcode, property_type, new_tag, lease, primary_address_object_name, secondary_address_object_name, street, locality, town_city, district, county, ppd_cat, record_status]
Index: []
processing row data_timestamp_str='2015-02-28'
loading land-registry-data-archive/PPMS_update_2015_2_28.txt
current month df contains 69326 rows
Empty DataFrame
Columns: [transaction_unique_id, price, transaction_date, postcode, property_type, new_tag, lease, primary_address_object_name, secondary_address_object_name, street, locality, town_city, district, county, ppd_cat, record_status]
Index: []
processing row data_timestamp_str='2015-03-31'
loading land-registry-data-archive/PPMS_update_2015_3_31.txt
current month df contains 78891 rows
Empty DataFrame
Columns: [transaction_unique_id, pric

In [18]:
# add column for version
# df_months['version'] = None

# for transaction_unique_id, df_ in df_months.groupby('transaction_unique_id'):
#     df_['version'] =
df_months['version'] = df_months.groupby('transaction_unique_id').cumcount()

In [19]:
vc = df_months['transaction_unique_id'].value_counts()

In [20]:
vc[vc > 1]

transaction_unique_id
{288DCE2A-15B1-E510-E050-A8C06205480E}    11
{75050A86-15F9-9A88-E053-6B04A8C02390}    10
{6DA0844A-D1FC-30F2-E053-6B04A8C05F3B}     9
{25EA59FA-779E-4D50-E050-A8C0630562D0}     9
{7011B10A-1EF1-8ED6-E053-6B04A8C075C1}     9
                                          ..
{93E6821F-3714-40FD-E053-6B04A8C0C1DF}     2
{F3B6C199-5089-6E40-E053-6C04A8C0B3B4}     2
{98C75471-B692-72E9-E053-6B04A8C042F0}     2
{C18F412A-E84B-81A6-E053-6B04A8C0AD18}     2
{8A78B2B0-5C51-5CB0-E053-6B04A8C0F504}     2
Name: count, Length: 749427, dtype: int64

In [21]:
vc.max()

np.int64(11)

In [46]:
# example of large number of changes
df_months[df_months['transaction_unique_id'] == '{75050A86-15F9-9A88-E053-6B04A8C02390}']

Unnamed: 0,transaction_unique_id,price,transaction_date,postcode,property_type,new_tag,lease,primary_address_object_name,secondary_address_object_name,street,locality,town_city,district,county,ppd_cat,record_status,version
3334,{75050A86-15F9-9A88-E053-6B04A8C02390},4699400,2017-03-24 00:00,CH49 4SB,O,N,F,3,,HAREFIELDS WAY,,WIRRAL,WIRRAL,MERSEYSIDE,B,A,0
2217,{75050A86-15F9-9A88-E053-6B04A8C02390},4699400,2017-03-24 00:00,CH49 4SD,O,N,F,21,,FAR HEY DRIVE,,WIRRAL,WIRRAL,MERSEYSIDE,B,C,1
1422,{75050A86-15F9-9A88-E053-6B04A8C02390},4699400,2017-03-24 00:00,CH49 4SD,O,N,F,9,,FAR HEY DRIVE,,WIRRAL,WIRRAL,MERSEYSIDE,B,C,2
1609,{75050A86-15F9-9A88-E053-6B04A8C02390},4699400,2017-03-24 00:00,CH49 4SE,O,N,F,2,,LITTLE MEADOW CLOSE,MORETON,WIRRAL,WIRRAL,MERSEYSIDE,B,C,3
1250,{75050A86-15F9-9A88-E053-6B04A8C02390},4699400,2017-03-24 00:00,CH49 4SE,O,N,F,5,,LITTLE MEADOW CLOSE,MORETON,WIRRAL,WIRRAL,MERSEYSIDE,B,C,4
1345,{75050A86-15F9-9A88-E053-6B04A8C02390},4699400,2017-03-24 00:00,CH49 4SE,O,N,F,10,,LITTLE MEADOW CLOSE,MORETON,WIRRAL,WIRRAL,MERSEYSIDE,B,C,5
1332,{75050A86-15F9-9A88-E053-6B04A8C02390},4699400,2017-03-24 00:00,CH49 4SE,O,N,F,12,,LITTLE MEADOW CLOSE,MORETON,WIRRAL,WIRRAL,MERSEYSIDE,B,C,6
912,{75050A86-15F9-9A88-E053-6B04A8C02390},4699400,2017-03-24 00:00,CH49 4SD,O,N,F,28,,FAR HEY DRIVE,,WIRRAL,WIRRAL,MERSEYSIDE,B,C,7
1041,{75050A86-15F9-9A88-E053-6B04A8C02390},4699400,2017-03-24 00:00,CH49 4SE,O,N,F,9,,LITTLE MEADOW CLOSE,MORETON,WIRRAL,WIRRAL,MERSEYSIDE,B,C,8
1256,{75050A86-15F9-9A88-E053-6B04A8C02390},4699400,2017-03-24 00:00,CH49 4SD,O,N,F,28,,FAR HEY DRIVE,,WIRRAL,WIRRAL,MERSEYSIDE,B,C,9


In [24]:
# TODO: build `pp-complete.txt` for each month by going back progressively in time

for index in reversed(df_months.index):
    row = df_months.loc[index]

    record_status = row['record_status']

    if record_status == 'A':
        # delete all rows with matching TUID
        transaction_unique_id = row['transaction_unique_id']
        df = df[df['transaction_unique_id'] != transaction_unique_id]

    elif record_status == 'C':
        # row must be changed to previous version
        transaction_unique_id = row['transaction_unique_id']
        version = row['version']

        if version > 0:
            target_version = version - 1
            target_row = df_months[(df_months['transaction_unique_id'] == transaction_unique_id) & (df_months['version'] == target_version)]

            for column in df.columns:
                if column == 'transaction_unique_id':
                    continue
                df.loc[df['transaction_unique_id'] == transaction_unique_id][column] = target_row[column]
        else:
            raise RuntimeError(f'{version=}')

    elif record_status == 'D':
        # row must be added using previous values
        transaction_unique_id = row['transaction_unique_id']
        version = row['version']

        if version > 0:
            target_version = version - 1
            target_row = df_months[(df_months['transaction_unique_id'] == transaction_unique_id) & (df_months['version'] == target_version)]

            for column in df.columns:
                if column == 'transaction_unique_id':
                    continue
                df.loc[df['transaction_unique_id'] == transaction_unique_id][column] = target_row[column]
        else:
            raise RuntimeError(f'{version=}')

    else:
        raise RuntimeError(f'unrecognized {record_status=}')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[df['transaction_unique_id'] == transaction_unique_id][column] = target_row[column]


KeyboardInterrupt: 

In [31]:
tuid_pending_change = []
tuid_pending_undelete = []

for row in rows:
    pp_monthly_update_archive_file_log_id = row.pp_monthly_update_archive_file_log_id
    data_timestamp = row.data_timestamp
    data_timestamp_str = data_timestamp.strftime('%Y-%m-%d')
    print(f'processing row {data_timestamp_str=}')

    s3_bucket = row.s3_bucket
    s3_object_key = row.s3_object_key

    boto3_client = (
        boto3_session.client(
            's3',
            endpoint_url=minio_url,
            config=botocore.config.Config(signature_version='s3v4'),
        )
    )

    s3_response_object = boto3_client.get_object(Bucket=s3_bucket, Key=s3_object_key)
    pp_monthly_update_data = s3_response_object['Body'].read()

    df_current_month = pandas.read_csv(
        io.BytesIO(pp_monthly_update_data),
        header=None,
    )
    print(f'current month df contains {len(df_current_month)} rows')

    if len(df_current_month.columns) == 16:
        pass
    elif len(df_current_month.columns) == 15:
        df_current_month['ppd_cat'] = None
    else:
        raise RuntimeError(f'invalid number of columns: {len(df_current_month.columns)}')

    df_current_month.columns = df_columns

    # check no duplicate transaction_unique_id in this months dataframe
    df_current_month_tuid_value_counts = df_current_month['transaction_unique_id'].value_counts()
    assert df_current_month_tuid_value_counts.max() == 1, f'duplicate transaction_unique_id detected'

    # start of df manipulation

    for index, row in df_current_month[df_current_month['transaction_unique_id'].isin(tuid_pending_change)].iterrows():
        for column in df.columns:
            if column == 'transaction_unique_id':
                continue
            df[df['transaction_unique_id']][column] = row[column]

    for index, row in df_current_month[df_current_month['transaction_unique_id'].isin(tuid_pending_undelete)].iterrows():
        new_row_dict = {
            column: value for column, value in row.items()
        }
        # df.append(new_row_dict)
        df_new = pandas.DataFrame.from_records(
            [
                new_row_dict
            ]
        )
        df = pandas.concat(
            [df, df_new]
        )

    for index, row in reversed(df_current_month.iterrows()):
        record_status = row['record_status']

        if record_status == 'A':
            # delete all rows with matching TUID
            transaction_unique_id = row['transaction_unique_id']
            df = df[df['transaction_unique_id'] != transaction_unique_id]

        elif record_status == 'C':
            # row must be changed to previous version
            transaction_unique_id = row['transaction_unique_id']
            if not transaction_unique_id in tuid_pending_undelete:
                tuid_pending_change.append(transaction_unique_id)
            else:
                print(f'cannot add {transaction_unique_id=} to pending change, because exists in pending undelete')

        elif record_status == 'D':
            # row must be added using previous values
            transaction_unique_id = row['transaction_unique_id']
            if not transaction_unique_id in tuid_pending_change:
                tuid_pending_undelete.append(transaction_unique_id)
            else:
                print(f'cannot add {transaction_unique_id=} to pending undelete, because exists in pending change')

        else:
            raise RuntimeError(f'unrecognized {record_status=}')



processing row data_timestamp_str='2015-01-31'
current month df contains 83245 rows
1
