READER

In [1]:
import os
import pandas as pd
import regex as re
from datetime import datetime
import numpy as np
from azure.storage.blob import BlobServiceClient



In [2]:
class Reader():
    def __init__(self, file_path):
        self.__file_path = file_path
        self.__files = [ f for f in os.listdir(file_path) if os.path.isfile(os.path.join(file_path,f)) ]
        self.dataFrames = []

    # Returns a list with all the file names in a specific folder
    def listFile(self):
        return self.__files

    # Returns a list of all the dataframes in the folder from csv files
    def DfList(self):
        if not self.dataFrames:
            for file in self.__files:
                if file.endswith(".csv"):
                    self.dataFrames.append(pd.read_csv(os.path.join(self.__file_path, file)))
                else:
                    raise ValueError("File must be a CSV.")
        return self.dataFrames

    # Returns a specific dataframe by index 
    def getDfByIndex(self, index):
        self.DfList()
        index -= 1
        if index < 0 or index >= len(self.dataFrames):
            raise ValueError("Index out of range.")
        return self.dataFrames[index]
    
    # Returns the amount of files in the folder
    def getLength(self):
        return len(self.__files)
    

WRITER

In [3]:
class Writer():
    def __init__(self, dataFrame, fileName, folder_path):
        self.df = dataFrame
        self.fn = fileName
        self.folder_path = folder_path
        self.writeAzureBlobAndCsv()

    # Writes a dataframe to a CSV file
    def writeCsv(self):
        if not isinstance(self.df, pd.DataFrame):
            raise ValueError("Dataframe must be a pandas DataFrame")
        
        self.df.to_csv(os.path.join(self.folder_path, self.fn + '.csv'), index=False)
        return print("File saved successfully")
    
    def writeAzureBlobAndCsv(self):
        self.writeCsv()
        
        conn_str = "DefaultEndpointsProtocol=https;AccountName=batchprocessing94;AccountKey=aH9cA5Xwbv+fELTDQPG3BZaM1AvUAK7LQTX5A6PDYMHTw6EqcXlSvzlw5Aqfs7i3XdJSzgQQw3OZ+AStf56OCw==;EndpointSuffix=core.windows.net"
        container_name = "batchprocessingcontainer"
        blob_name = "batch_data.csv"
        file_path = "output/processed_data.csv"

        blob_service_client = BlobServiceClient.from_connection_string(conn_str)
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

        with open(file_path, "rb") as data:
            blob_client.upload_blob(data)
            print("File uploaded to Azure Blob Storage successfully")      
        




In [3]:
reader = Reader('airflow/dags/files/batch/')
# filenames = ['test1', 'test2']

# for filename, file in zip(filenames, reader.DfList()):
#     Writer(file, filename, 'output')

df = reader.getDfByIndex(1)



VALIDATOR

In [None]:
non_mandatory_columns = {
    'Suite/ Condo   #', 'Owner Name', 'Address', 'City', 'State',
    'Tax District', 'image', 'Foundation Type', 'Exterior Wall', 'Grade'
}

def validate_parcel_id(value):
    if not isinstance(value, str):
        value = str(value)
    value = value.strip()

    pattern = r'^\d{3} \d{2} \d[A-Z]? \d{3}\.\d{2}$'
    return re.match(pattern, value) is not None

def validate_land_use(value):
    if not isinstance(value, str):
        return False
    stripped = value.strip()
    return stripped == stripped.upper()

def validate_property_address(value):
    if not isinstance(value, str):
        return False
    value = value.strip()

    pattern = r'^[A-Z0-9\s\.\'\-]+$'
    return re.fullmatch(pattern, value) is not None and value == value.upper()


def validate_property_city(value):
    if not isinstance(value, str):
        return False
    value = value.strip()

    return bool(re.fullmatch(r"[A-Z\s]+", value))

def validate_date(value):
    try:
        date = pd.to_datetime(value, format='%Y-%m-%d', errors='raise')
        return date <= pd.Timestamp.now()
    except:
        return False

def validate_price(value):
    return isinstance(value, (int, float)) and value >= 0

def validate_legal_reference(value):
    if not isinstance(value, str):
        value = str(value)

    value = value.strip().replace(" ", "")

    match = re.fullmatch(r'-?(\d{7,8})-(\d{6,8})', value)

    if not match:
        return False
    return True

def validate_sold_as_vacant(value):
    if not isinstance(value, str):
        return False
    return value in ['Yes', 'No']

def validate_acreage(value):
    try:
        return float(value) >= 0
    except:
        return False
    
def validate_neighborhood(value):
    if not isinstance(value, (int, float)) or value < 0:
        return False
    return len(str(int(value))) <= 5



def validate_year(value):
    try:
        year = int(float(value))
        return 100 <= year <= pd.Timestamp.now().year
    except:
        return False

def validate_numeric(value):
    return isinstance(value, (int, float)) and value >= 0

def validate_bed_bath(value):
    return isinstance(value, (int, float)) and value >= 0 and value <= 20

def validate_row(row):
    errors = []

    validations = {
        'Parcel ID': validate_parcel_id,
        'Land Use': validate_land_use,
        'Property Address': validate_property_address,
        'Property City': validate_property_city,
        'Sale Date': validate_date,
        'Sale Price': validate_price,
        'Legal Reference': validate_legal_reference,
        'Sold As Vacant': validate_sold_as_vacant,
        'Multiple Parcels Involved in Sale': validate_sold_as_vacant,
        'Acreage': validate_acreage,
        'Neighborhood': validate_neighborhood,
        'Land Value': validate_price,
        'Building Value': validate_price,
        'Total Value': validate_price,
        'Finished Area': validate_numeric,
        'Year Built': validate_year,
        'Bedrooms': validate_bed_bath,
        'Full Bath': validate_bed_bath,
        'Half Bath': validate_bed_bath,
    }

    for column, validator in validations.items():
        value = row.get(column)
        if pd.isna(value) and column in non_mandatory_columns:
            continue 
        if not pd.isna(value) and not validator(value):
            errors.append(f"Invalid value in column '{column}': {value}")

    return errors

def validate_dataset(df):
    all_errors = {}
    for index, row in df.iterrows():
        errors = validate_row(row)
        if errors:
            all_errors[index] = errors
    return all_errors

validation_errors = validate_dataset(df)


if validation_errors:
    for row_idx, errors in validation_errors.items():
        print(f"Row {row_idx}:")
        for error in errors:
            print(f"  - {error}")
else:
    print("Validation passed: No errors found.")

Row 1:
  - Invalid value in column 'Land Value': 32000.0
  - Invalid value in column 'Building Value': 134400.0
  - Invalid value in column 'Total Value': 168300.0
Row 2:
  - Invalid value in column 'Land Value': 34000.0
  - Invalid value in column 'Building Value': 157800.0
  - Invalid value in column 'Total Value': 191800.0
Row 3:
  - Invalid value in column 'Land Value': 25000.0
  - Invalid value in column 'Building Value': 243700.0
  - Invalid value in column 'Total Value': 268700.0
Row 4:
  - Invalid value in column 'Land Value': 25000.0
  - Invalid value in column 'Building Value': 138100.0
  - Invalid value in column 'Total Value': 164800.0
Row 5:
  - Invalid value in column 'Land Value': 25000.0
  - Invalid value in column 'Building Value': 86100.0
  - Invalid value in column 'Total Value': 113300.0
Row 7:
  - Invalid value in column 'Land Value': 16000.0
  - Invalid value in column 'Building Value': 68100.0
  - Invalid value in column 'Total Value': 84300.0
Row 8:
  - Invalid 

In [34]:
df

Unnamed: 0.2,Unnamed: 0.1,Unnamed: 0,Parcel ID,Land Use,Property Address,Suite/ Condo #,Property City,Sale Date,Sale Price,Legal Reference,...,Building Value,Total Value,Finished Area,Foundation Type,Year Built,Exterior Wall,Grade,Bedrooms,Full Bath,Half Bath
0,0,0,105 03 0D 008.00,RESIDENTIAL CONDO,1208 3RD AVE S,8,NASHVILLE,2013-01-24,132000,20130128-0008725,...,,,,,,,,,,
1,1,1,105 11 0 080.00,SINGLE FAMILY,1802 STEWART PL,,NASHVILLE,2013-01-11,191500,20130118-0006337,...,134400.0,168300.0,1149.00000,PT BSMT,1941.0,BRICK,C,2.0,1.0,0.0
2,2,2,118 03 0 130.00,SINGLE FAMILY,2761 ROSEDALE PL,,NASHVILLE,2013-01-18,202000,20130124-0008033,...,157800.0,191800.0,2090.82495,SLAB,2000.0,BRICK/FRAME,C,3.0,2.0,1.0
3,3,3,119 01 0 479.00,SINGLE FAMILY,224 PEACHTREE ST,,NASHVILLE,2013-01-18,32000,20130128-0008863,...,243700.0,268700.0,2145.60001,FULL BSMT,1948.0,BRICK/FRAME,B,4.0,2.0,0.0
4,4,4,119 05 0 186.00,SINGLE FAMILY,316 LUTIE ST,,NASHVILLE,2013-01-23,102000,20130131-0009929,...,138100.0,164800.0,1969.00000,CRAWL,1910.0,FRAME,C,2.0,1.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
56631,56631,56631,093 13 0B 274.00,RESIDENTIAL CONDO,320 11TH AVE S,274.0,NASHVILLE,2016-10-06,210000,20161007-0106599,...,,,,,,,,,,
56632,56632,56632,093 13 0D 044.00,RESIDENTIAL CONDO,700 12TH AVE S,608.0,NASHVILLE,2016-10-25,338000,20161101-0115186,...,,,,,,,,,,
56633,56633,56633,093 13 0D 048.00,RESIDENTIAL CONDO,700 12TH AVE S,613.0,NASHVILLE,2016-10-04,742000,20161010-0106889,...,,,,,,,,,,
56634,56634,56634,093 13 0D 056.00,RESIDENTIAL CONDO,700 12TH AVE S,708.0,NASHVILLE,2016-10-26,320000,20161031-0114730,...,,,,,,,,,,


PROCESSING

In [35]:
def process_data(df):

    # Remove NaN values from mandatory columns
    mandatory_columns = ['Parcel ID', 'Land Use', 'Property Address', 'Property City', 'Sale Date', 'Sale Price', 'Legal Reference', 'Sold As Vacant',
                        'Multiple Parcels Involved in Sale', 'Acreage', 'Neighborhood', 'Land Value', 'Building Value', 'Total Value', 'Finished Area', 'Year Built', 'Bedrooms', 'Full Bath', 'Half Bath']
    df = df.dropna(subset=mandatory_columns)
    
    # Drop certain columns
    columns_to_remove = ['image', 'Sold As Vacant', 'Multiple Parcels Involved in Sale', 'Unnamed: 0.1', 'Unnamed: 0' ]
    df = df.drop(columns=columns_to_remove)


    # Price per square foot
    df['Price per square foot'] = df['Sale Price'] / df['Finished Area']  
    
    # Age of property
    df['Age of property'] = datetime.today().year - df['Year Built']
    
    # Sale year and sale month
    df['Sale Year'] = pd.to_datetime(df['Sale Date']).dt.year
    df['Sale Month'] = pd.to_datetime(df['Sale Date']).dt.month
    
    # Land-to-building value ratio
    df['Land-to-Building Value Ratio'] = df['Land Value'] / df['Building Value']
    
    # Sale price category
    def categorize_sale_price(price):
        if price < 100000:
            return 'Low'
        elif 100000 <= price <= 300000:
            return 'Medium'
        else:
            return 'High'
    
    df['Sale Price Category'] = df['Sale Price'].apply(categorize_sale_price)
    
    # Family Name and First Name of owner
    def extract_name(owner_name):
        if pd.isna(owner_name):
            return np.nan, np.nan
        name_parts = owner_name.split(', ')
        if len(name_parts) == 2:
            return name_parts[0], name_parts[1]
        return owner_name, np.nan 

    df['Family Name'], df['First Name'] = zip(*df['Owner Name'].apply(extract_name))
    
    return df


df_processed = process_data(df)





In [36]:
df_processed

Unnamed: 0,Parcel ID,Land Use,Property Address,Suite/ Condo #,Property City,Sale Date,Sale Price,Legal Reference,Owner Name,Address,...,Full Bath,Half Bath,Price per square foot,Age of property,Sale Year,Sale Month,Land-to-Building Value Ratio,Sale Price Category,Family Name,First Name
1,105 11 0 080.00,SINGLE FAMILY,1802 STEWART PL,,NASHVILLE,2013-01-11,191500,20130118-0006337,"STINSON, LAURA M.",1802 STEWART PL,...,1.0,0.0,166.666667,84.0,2013,1,0.238095,Medium,STINSON,LAURA M.
2,118 03 0 130.00,SINGLE FAMILY,2761 ROSEDALE PL,,NASHVILLE,2013-01-18,202000,20130124-0008033,"NUNES, JARED R.",2761 ROSEDALE PL,...,2.0,1.0,96.612583,25.0,2013,1,0.215463,Medium,NUNES,JARED R.
3,119 01 0 479.00,SINGLE FAMILY,224 PEACHTREE ST,,NASHVILLE,2013-01-18,32000,20130128-0008863,"WHITFORD, KAREN",224 PEACHTREE ST,...,2.0,0.0,14.914243,77.0,2013,1,0.102585,Low,WHITFORD,KAREN
4,119 05 0 186.00,SINGLE FAMILY,316 LUTIE ST,,NASHVILLE,2013-01-23,102000,20130131-0009929,"HENDERSON, JAMES P. & LYNN P.",316 LUTIE ST,...,1.0,0.0,51.802946,115.0,2013,1,0.181028,Medium,HENDERSON,JAMES P. & LYNN P.
5,119 05 0 387.00,SINGLE FAMILY,2626 FOSTER AVE,,NASHVILLE,2013-01-04,93736,20130118-0006110,"MILLER, JORDAN",2626 FOSTER AVE,...,1.0,0.0,90.391514,80.0,2013,1,0.290360,Low,MILLER,JORDAN
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
56605,176 05 0 070.00,SINGLE FAMILY,5004 SUNSHINE DR,,ANTIOCH,2016-10-26,214000,20161102-0115842,"FREO TENNESSEE, LLC",5004 SUNSHINE DR,...,3.0,0.0,88.393226,29.0,2016,10,0.175562,Medium,FREO TENNESSEE,LLC
56607,176 09 0 003.00,SINGLE FAMILY,4964 HICKORY WOODS E,,ANTIOCH,2016-10-28,236000,20161031-0114817,"CHHAY, CHOWAN & NIM, PHALLY",4964 HICKORY WOODS E,...,3.0,0.0,75.713827,30.0,2016,10,0.156937,Medium,"CHHAY, CHOWAN & NIM, PHALLY",
56614,082 05 0 040.00,SINGLE FAMILY,1625 5TH AVE N,,NASHVILLE,2016-10-28,466000,20161102-0115988,"GLAUS, WILLIAM D. SR.",1625 5TH AVE N,...,2.0,1.0,284.667074,21.0,2016,10,0.195982,High,GLAUS,WILLIAM D. SR.
56615,082 05 0 058.00,SINGLE FAMILY,1614 5TH AVE N,,NASHVILLE,2016-10-26,685000,20161101-0115366,"DUNN, JEFFREY J. & HOWE, TRICIA L.",1614 5TH AVE N,...,3.0,1.0,276.432607,20.0,2016,10,0.135181,High,"DUNN, JEFFREY J. & HOWE, TRICIA L.",


In [18]:
Writer(df_processed, 'processed_data', 'output')

File saved successfully
File uploaded to Azure Blob Storage successfully


<__main__.Writer at 0x28f2bfe80d0>

In [4]:
df.dtypes

Unnamed: 0.1                           int64
Unnamed: 0                             int64
Parcel ID                             object
Land Use                              object
Property Address                      object
Suite/ Condo   #                      object
Property City                         object
Sale Date                             object
Sale Price                             int64
Legal Reference                       object
Sold As Vacant                        object
Multiple Parcels Involved in Sale     object
Owner Name                            object
Address                               object
City                                  object
State                                 object
Acreage                              float64
Tax District                          object
Neighborhood                         float64
image                                 object
Land Value                           float64
Building Value                       float64
Total Valu

In [5]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 56636 entries, 0 to 56635
Data columns (total 31 columns):
 #   Column                             Non-Null Count  Dtype  
---  ------                             --------------  -----  
 0   Unnamed: 0.1                       56636 non-null  int64  
 1   Unnamed: 0                         56636 non-null  int64  
 2   Parcel ID                          56636 non-null  object 
 3   Land Use                           56636 non-null  object 
 4   Property Address                   56477 non-null  object 
 5   Suite/ Condo   #                   6109 non-null   object 
 6   Property City                      56477 non-null  object 
 7   Sale Date                          56636 non-null  object 
 8   Sale Price                         56636 non-null  int64  
 9   Legal Reference                    56636 non-null  object 
 10  Sold As Vacant                     56636 non-null  object 
 11  Multiple Parcels Involved in Sale  56636 non-null  obj