<a href="https://colab.research.google.com/github/ADLTU/Hawi/blob/main/StreetGroupTask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SetUP

In [None]:
## Install apache-beam.
!pip install --quiet apache-beam

## Creating work directory
!mkdir -p data
%cd data

## Downloading data
  # 2022 only data
!wget http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-2022.csv
  # full data
!wget http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv

# Development

In [None]:
import apache_beam as beam
import json


class GroupTransactions(beam.DoFn):
    def process(self, transaction):
        ## Constructing a unique property ID using relevant fields from the transaction
        property_id = transaction['PAON'] + '_' + transaction['SAON'] + '_' + transaction['Postcode']
        
        ## Return the transaction with the property ID as the key
        return [(property_id, transaction)]


def run(csvPath, outputPath):

    """ 
        :param str csvPath: the path to the input csv file
        :param str outputPath: the path where you would like the resulted json file

    """
    
    with beam.Pipeline() as p:
        ## Transform for reading the CSV file
        read_file = beam.io.ReadFromText(csvPath)
        ## Transform for reading the lines and removing the symbol (")
        parse_file = beam.Map(lambda row: row.replace('"', '').split(','))
        ## Transform for creating dictionaries
        create_dictionary = beam.Map(lambda cols: {
            'TransactionID': cols[0].replace('{', '').replace('}', ''), 
            'Price': cols[1],
            'TransferDate': cols[2],
            'Postcode': cols[3],
            'PropertyType': cols[4],
            'Old/New': cols[5],
            'Duration': cols[6],
            'PAON': cols[7],
            'SAON': cols[8],
            'Street': cols[9],
            'Locality': cols[10],
            'Town/City': cols[11],
            'District': cols[12],
            'County': cols[13],
            'PPDCategory': cols[14]
        })

        ## Loading transactions data
        transactions = p | read_file | parse_file | create_dictionary



        ## Transform for creating a property ID
        add_key = beam.ParDo(GroupTransactions())
        ## Transform for grouping transactions by property ID
        group_by_key = beam.GroupByKey()
        ## Transform for creating json
        json_format = beam.Map(lambda x:
            json.dumps({'property_id': x[0], 'transactions': x[1]})
        )

        ## Creating output
        properties = transactions | add_key | group_by_key | json_format

        print(outputPath)

        # Write the output to a file in newline delimited JSON format
        
        properties | 'Write Output' >> beam.io.WriteToText(outputPath, shard_name_template='')


# TESTING


In [None]:
## unit tests

  #Testing the function that creates property ids
example1 = {'TransactionID': 'EC7AD09A-8B44-9200-E053-6C04A8C0E306', 'Price': '9999950', 'TransferDate': '2022-06-14 00:00', 'Postcode': 'W12 9BL', 'PropertyType': 'O', 'Old/New': 'N', 'Duration': 'F', 'PAON': '98', 'SAON': 'FLAT', 'Street': 'ASKEW ROAD', 'Locality': '', 'Town/City': 'LONDON', 'District': 'HAMMERSMITH AND FULHAM', 'County': 'GREATER LONDON', 'PPDCategory': 'B'}
out1 = [('98_FLAT_W12 9BL', {'TransactionID': 'EC7AD09A-8B44-9200-E053-6C04A8C0E306', 'Price': '9999950', 'TransferDate': '2022-06-14 00:00', 'Postcode': 'W12 9BL', 'PropertyType': 'O', 'Old/New': 'N', 'Duration': 'F', 'PAON': '98', 'SAON': 'FLAT', 'Street': 'ASKEW ROAD', 'Locality': '', 'Town/City': 'LONDON', 'District': 'HAMMERSMITH AND FULHAM', 'County': 'GREATER LONDON', 'PPDCategory': 'B'})]

result1 = GroupTransactions().process(example1)

try:
  assert out1 == result1
  print("Passed unit test")
except AssertionError:
  print("ERROR: Failed unit test")

In [None]:
## Integration tests
import filecmp

run(csvPath='/content/data/IntegrationTest1.csv', outputPath='/content/data/IntegrationTest1.json')

expectedResults = '/content/data/IntegrationTest1.json'
actualResults   = '/content/data/IntegrationResult1.json'


try:
  assert filecmp.cmp(expectedResults, actualResults)
  print("Passed Integration test")
except AssertionError:
  print("ERROR: Failed Integration test")


In [None]:
## End-to-end tests
  # 2022 data file

run(csvPath='/content/data/pp-2022.csv', outputPath='/content/data/pp-2022.json')

with open('/content/data/pp-2022.json') as f:
    line = f.readline()
    print(line)
    line = f.readline()
    print(line)
    line = f.readline()
    print(line)

In [None]:
## End-to-end tests
  # full data file

run(csvPath='/content/data/pp-complete.csv', outputPath='/content/data/pp-complete.json')

with open('/content/data/pp-complete.json') as f:
    line = f.readline()
    print(line)
    line = f.readline()
    print(line)
    line = f.readline()
    print(line)

/content/data/pp-complete.json


# Solution 2

In [None]:
def run2(csvPath, outputPath):

    """ 
        :param str csvPath: the path to the input csv file
        :param str outputPath: the path where you would like the resulted json file

    """

    properties = {}

    with open(csvPath) as data:
        for d in data:
              transaction = d.replace('"', '').split(',')

              transaction_info = {"TransactionID": transaction[0].replace('{', '').replace('}', ''), 
                    "Price": transaction[1], 
                    "TransferDate": transaction[2], 
                    "Postcode": transaction[3], 
                    "PropertyType": transaction[4], 
                    "Old/New": transaction[5], 
                    "Duration": transaction[6], 
                    "PAON": transaction[7], 
                    "SAON": transaction[8], 
                    "Street": transaction[9], 
                    "Locality": transaction[10], 
                    "Town/City": transaction[11], 
                    "District": transaction[12], 
                    "County": transaction[13], 
                    "PPDCategory": transaction[14]
                    }

              property_id = transaction_info['PAON'] + '_' + transaction_info['SAON'] + '_' + transaction_info['Postcode']

              if property_id in properties:
                  properties[property_id].append(transaction_info)
              else:
                  properties[property_id] = [transaction_info]
              pass


    with open(outputPath, 'w') as f:
        for property_ in properties:
            json.dump({'property_id': property_, 'transactions': properties[property_]}, f)
            f.write('\n')

In [None]:
## Integration tests
import filecmp

run2(csvPath='/content/data/IntegrationTest1.csv', outputPath='/content/data/IntegrationTest2.json')

expectedResults = '/content/data/IntegrationTest2.json'
actualResults   = '/content/data/IntegrationResult1.json'


try:
  assert filecmp.cmp(expectedResults, actualResults)
  print("Passed Integration test")
except AssertionError:
  print("ERROR: Failed Integration test")


In [None]:
## Speed Test 
import time
start_time = time.time()
run(csvPath='/content/data/pp-2022.csv', outputPath='/content/data/pp-2022.json')
print("--- %s seconds ---" % (time.time() - start_time))

start_time = time.time()
run2(csvPath='/content/data/pp-2022.csv', outputPath='/content/data/pp-2022.json')
print("--- %s seconds ---" % (time.time() - start_time))