In [18]:
from google.cloud import storage
import json
import pandas as pd

# Process and Transformation pipeline

fetch_files -> transform files -> load them 

## Env variables

In [5]:
json_creds_path = "/Users/alexandergirardet/projects/estatewise/real_estate_analytics/development/real-estate-dev-key.json"
bucket_name = "rightmove_storage_dev"
client = storage.Client.from_service_account_json(json_credentials_path=json_creds_path)

## Functions

### fetch files

In [6]:
 def fetch_all_files(delimiter=None):

    blobs = client.list_blobs(bucket_name, prefix=f"rightmove/raw_data/", delimiter=delimiter)

    all_files = []

    for blob in blobs:
        name = blob.name
        all_files.append(name)
    return all_files

In [7]:
files = fetch_all_files()

In [8]:
file_id_list = [file.split("/")[-1] for file in files]

In [9]:
file_id_list

['04fe45d3-a1dc-47b7-ab8a-7db1e393fa05.json',
 '140a0a08-6a85-4b66-8859-09008274e091.json',
 '1ec51a02-9729-4110-bff1-9b8bdc976306.json',
 '3094a12c-0e2c-410a-9b3e-ae91d3a9d94b.json',
 '35fd40b9-c165-4e92-8590-d54af6eb6d4b.json',
 '436a086b-819a-4cb8-a03a-3cc937323f0f.json',
 '4e9c6ed9-9030-49a0-bc27-8f948af55b65.json',
 '50d4d296-3adb-46fc-9fba-0f5deafba2a2.json',
 '596e7849-2eb2-4a5f-908b-ac59db290f75.json',
 '6d060173-9238-4d7f-b424-97c063fabe1d.json',
 '7e56578f-5991-4d3f-8a72-e7507501cdbc.json',
 '83fd30f0-c08c-4adb-8e6a-4e00292b40e7.json',
 '86a4e58a-8f68-4834-850b-7b5fe257abd5.json',
 '8cd8cee0-ea11-4b13-980c-625908c33dc9.json',
 '907c096f-79b7-4b71-a869-8f9a367d0fef.json',
 '94fbed03-016f-4079-90bf-d3231bbcc6da.json',
 '9c314476-fafa-448c-97c4-1ca09da569df.json',
 'a6c34e9c-df4b-4dd0-b23c-568c8979779c.json',
 'ac568ace-9f07-45d3-a3c8-c2e975444e1e.json',
 'ae9d43ce-f583-4418-b42c-ef8171d2e575.json',
 'bb5cdb41-2aa1-4370-af92-034721e2e810.json',
 'd2c7ddba-0932-4b05-867e-bf5e2c3c

### download files

In [10]:
def download_data(file_name: str) -> object:
    
    bucket = client.bucket(bucket_name)
    file_path = f"rightmove/raw_data/{file_name}"
    
    print(file_path)
    
    blob = bucket.blob(file_path)
    
    json_string = blob.download_as_string()
    json_data = json.loads(json_string)
    
    return json_data

In [11]:
data = download_data(file_id_list[0])

rightmove/raw_data/04fe45d3-a1dc-47b7-ab8a-7db1e393fa05.json


### transform data

In [3]:
import sys
sys.path.append("..")
from final_transformer import ParquetTransformer

In [4]:
transformer = ParquetTransformer()

In [13]:
valid_data, invalid_data = transformer.transform_data(data)
print(len(valid_data))
# transformed_data = transformer.enforce_columns(valid_data)

50


### upload files

In [14]:
def upload_data(data: object, file_name: str, invalid=False) -> object:
    
    bucket = client.bucket(bucket_name)
    
    if invalid:
        upload_file_path = f"rightmove/invalid_data/{file_name}"
        blob = bucket.blob(upload_file_path)
        json_data = json.dumps(data).encode('utf-8')
        blob.upload_from_string(json_data, content_type='application/json')
    else:
        parquet_file_name = file_name.replace(".json", ".snappy.parquet")
        upload_file_path = f"rightmove/processed_data/{parquet_file_name}"
        blob = bucket.blob(upload_file_path)
        blob.upload_from_string(data.to_parquet(compression='snappy'))
    
    success_flag = True

    return success_flag

## Combined

In [22]:
files = fetch_all_files()
file_names = [file.split("/")[-1] for file in files]

for file_name in file_names:
    data = download_data(file_name)

    valid_data, invalid_data = transformer.transform_data(data)
    
    if len(invalid_data):
        # invalid_df = pd.DataFrame(invalid_data)
        flag = upload_data(invalid_data, file_name, invalid=True)
        print("Invalid Data uploaded")
        
    valid_df = pd.DataFrame(valid_data)
    flag = upload_data(valid_df, file_name, invalid=False)
    print("Transformed Data uploaded")

rightmove/raw_data/04fe45d3-a1dc-47b7-ab8a-7db1e393fa05.json
Transformed Data uploaded
rightmove/raw_data/140a0a08-6a85-4b66-8859-09008274e091.json
Transformed Data uploaded
rightmove/raw_data/1ec51a02-9729-4110-bff1-9b8bdc976306.json
Transformed Data uploaded
rightmove/raw_data/3094a12c-0e2c-410a-9b3e-ae91d3a9d94b.json
Transformed Data uploaded
rightmove/raw_data/35fd40b9-c165-4e92-8590-d54af6eb6d4b.json
Transformed Data uploaded
rightmove/raw_data/436a086b-819a-4cb8-a03a-3cc937323f0f.json
Transformed Data uploaded
rightmove/raw_data/4e9c6ed9-9030-49a0-bc27-8f948af55b65.json
Invalid Data uploaded
Transformed Data uploaded
rightmove/raw_data/50d4d296-3adb-46fc-9fba-0f5deafba2a2.json
Transformed Data uploaded
rightmove/raw_data/596e7849-2eb2-4a5f-908b-ac59db290f75.json
Transformed Data uploaded
rightmove/raw_data/6d060173-9238-4d7f-b424-97c063fabe1d.json
Transformed Data uploaded
rightmove/raw_data/7e56578f-5991-4d3f-8a72-e7507501cdbc.json
Invalid Data uploaded
Transformed Data uploaded

# Testing

In [33]:
import json
import pandas as pd
import jsonschema
import os
import random

# transformer = RightmoveTransformer()

def load_file_names():
    file_names = os.listdir('data')
    return file_names

def load_data(file_name):
    with open(f'data/{file_name}', 'r') as f:
        data = json.load(f)
    return data

def get_random_numbers(number_of_randoms):
    return random.sample(range(0, 50), number_of_randoms)

def add_random_column(item):
    """
    Adds a random column to each dict in the given list of dicts.
    The column name is a random string of length 5, and the column
    value is a random integer between 1 and 100.
    """
    col_name = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=5))
    col_value = random.randint(1, 100)
    item[col_name] = col_value

def remove_random_column(item):
    """
    Removes a random column from each dict in the given list of dicts.
    """
    if len(item) > 1:
        col_name = random.choice(list(item.keys()))
        del item[col_name]

def change_random_value_type(item):
    """
    Changes the data type of a random value in each dict in the given list of dicts.
    The new data type is chosen randomly from float, int, and str.
    """
    col_name = random.choice(list(item.keys()))
    col_value = item[col_name]
    new_type = random.choice([float, int, str])
    if new_type == float:
        item[col_name] = 666
    elif new_type == int:
        item[col_name] = 666
    else:
        item[col_name] = "Test"

def nullify_id(item):
    """
    Sets the value of the 'id' key to None in a random dict in the given list of dicts.
    """
    item['id'] = None
    
def randomize_data(data):

    random_numbers = get_random_numbers(10)
    random_functions = [add_random_column, remove_random_column, change_random_value_type, nullify_id]

    for i, item in enumerate(data):
        if i in random_numbers:
            func = random.choice(random_functions)
            func_name = func.__name__
            print(f"Item {i}, is being randomized by the {func_name} function")
            func(item)
            
    return data

### TEST


In [65]:
files = fetch_all_files()
file_names = [file.split("/")[-1] for file in files]

In [68]:
files = fetch_all_files()
file_names = [file.split("/")[-1] for file in files]

In [70]:
files = fetch_all_files()
file_names = [file.split("/")[-1] for file in files]

for file_name in file_names:
    data = download_data(file_name)
    randomized_data = randomize_data(data)

    valid_data, invalid_data = transformer.validate_data(randomized_data)
    transformed_data = transformer.enforce_columns(valid_data)

    if len(invalid_data) > 0:
        flag = upload_data(invalid_data, file_name, invalid=True)
        print("Invalid Data uploaded")

    flag = upload_data(transformed_data, file_name, invalid=False)
    print("Transformed Data uploaded")

rightmove/raw_data/04fe45d3-a1dc-47b7-ab8a-7db1e393fa05.json
Item 0, is being randomized by the change_random_value_type function
Item 11, is being randomized by the nullify_id function
Item 13, is being randomized by the remove_random_column function
Item 17, is being randomized by the remove_random_column function
Item 25, is being randomized by the change_random_value_type function
Item 32, is being randomized by the change_random_value_type function
Item 34, is being randomized by the nullify_id function
Item 36, is being randomized by the nullify_id function
Item 38, is being randomized by the change_random_value_type function
Item 40, is being randomized by the add_random_column function
Item 0 failed because 'Test' is not of type 'integer', 'null'
Item 11 failed because None is not of type 'integer'
Item 13 failed because 'lozengeModel' is a required property
Item 17 failed because 'auction' is a required property
Item 25 failed because 666 is not of type 'boolean', 'null'
Item 

In [58]:
data = download_data(file_names[0])
randomized_data = randomize_data(data)

valid_data, invalid_data = transformer.validate_data(randomized_data)
transformed_data = transformer.enforce_columns(valid_data)

if len(invalid_data) > 0:
    flag = upload_data(invalid_data, file_name, invalid=True)
    print("Invalid Data uploaded")

flag = upload_data(transformed_data, file_name, invalid=False)
print("Transformed Data uploaded")

rightmove/raw_data_json_test/14c71934-8f7d-40af-a161-abece209567e.json
Item 1, is being randomized by the remove_random_column function
Item 12, is being randomized by the change_random_value_type function
Item 16, is being randomized by the change_random_value_type function
Item 17, is being randomized by the remove_random_column function
Item 22, is being randomized by the nullify_id function
Item 25, is being randomized by the add_random_column function
Item 30, is being randomized by the nullify_id function
Item 39, is being randomized by the change_random_value_type function
Item 41, is being randomized by the remove_random_column function
Item 47, is being randomized by the add_random_column function
Item 16 failed because 666 is not of type 'string', 'null'
Item 17 failed because 'residential' is a required property
Item 22 failed because None is not of type 'integer'
Item 30 failed because None is not of type 'integer'
Item 39 failed because 'Test' is not of type 'object'
Item 