In [10]:
import dask.dataframe as dd
import pandas as pd

folders = ["ZAsmt", "ZTrans"]


ZAsmt = ["Main", "AdditionalPropertyAddress", "BKManagedSpecific", "Building", "BuildingAreas",
         "MailAddress", "Name", "SaleData", "TaxDistrict", "TaxExemption", "TypeConstruction", "Value"]
ZTrans = ["Main", "BKManagedSpecific", "BuyerMailAddress", "BuyerName",
          "ForeclosureNameAddress", "SellerMailAddress", "SellerName", "SellerNameDescriptionCode"]

file_layout = {"ZAsmt": ZAsmt, "ZTrans": ZTrans}

vars_interest = {"Main": ["RowID", "TransId", "BatchID", "ImportParcelID", "AssessorParcelNumber", "State", "County", "PropertyCity", "PropertyZip", "PropertyZip4", "PropertyAddressCensusTrackAndBlock",
                          "OriginalPropertyFullStreetAddress", "PropertyAddressLatitude", "PropertyAddressLongitude", "PropertyZoningSourceCode", "TaxIDNumber", "TaxAmount", "TaxYear", "TaxDelinquencyFlag",
                          "TaxDelinquencyAmount", "TaxDelinquencyYear", "LotSizeSquareFeet", "ValueCertDate", "DocumentDate", "DocumentTypeStndCode", "LoanAmount", "LoanAmountStndCode", "MaximumLoanAmount",
                          "LoanTypeClosedOpenEndStndCode", "LoanTypeFutureAdvancedFlag", "LoanTypeProgramStndCode", "LoanRateTypeStndCode", "LoanDueDate", "LoanTermMonths", "LoanTermYears"],
                 "Building": ["RowID", "TransId", "PropertyCountyLandUseCode", "YearBuilt", "ArchitecturalStyleStndCode", "Number of bedroom", "Number of Bathroom", "Number of stories", "Number of Rooms", "Number of units NoOfUnits"],
                 "BuildingArea": ["RowID", "TransId", "BuildingAreaSqft"],
                 "SaleData": ["RowID", "TransId", "SalePriceAmount, BuyerFullName", "DocumentDate"],
                 "Value": ["RowID", "TransId", "LandAssessedValue", "ImprovementAssessedValue", "TotalAssessedValue", "AssessmentYear", "TotalMarketValue", "LandAppraisalData", "TotalAppraisalValue", "AppraisalValueYear", "SalesPriceAmount"],
                 "BuyerName": ["RowID", "TransId", "BuyerIndividualFullName","BuyerNonIndividualName"],
                 "BuyerMailAddress": ["RowID", "TransId", "BuyerMailFullStreetAddress", "BuyerMailCity", "BuyerMailState", "BuyerMailZip", "BuyerMailZip4", "BuyerMailAddressCensusTrackAndBlock"],
                 "SellerMailAddress": ["RowID", "TransId", "SellerIndividualFullName", "SellerNonIndividualName", "SellerMailFullStreetAddress", "SellerMailCity", "SellerMailState", "SellerMailZip", "SellerMailZip4", "SellerMailAddressLatitude", "SellerMailAddressLongitude", "SellerMailAddressCensusTrackAndBlock"], 
                 "BKManagedSpecific": ["RowID", "TransId", "DeedTransType"], 
                 "ForeClosureNameAddress": ["RowID", "TransID", "FCMailIndividualFullName","FCMailNonIndividualName", "FCMailFullStreetAddress", "FCMailCity", "FCMailState", "FCMailZip", "FCMailZip4"]}


def insert_headers(folder, file, columns):
    # Removes the 'ut' at the beginning of the file name, which was the format given in the layout file
    curr_file = file[2:]
    print(curr_file)

    if curr_file not in vars_interest.keys():
        print(curr_file, " is not of interest (file)")
        return dd.DataFrame

    # Creates a DF of the current file
    try:
        file_df = dd.read_csv('' + folder + '\\' + curr_file + '.txt', sep='|', on_bad_lines='skip',
                              low_memory=False, encoding='latin-1', index_col=False, header=None)
        print(curr_file, " opened successfully.")
    except Exception as e:
        print(curr_file, " cannot be accessed. Skipping...")
        print(e)
        print("")
        return dd.DataFrame

    # Adds column names to the DF
    for i,column in enumerate(columns):
        columns[i] = column.lower()

    file_df.columns = columns
    print(columns, " : Added (lowercase) column names")

    for column in columns:
        curr_vars_interest = vars_interest[curr_file]
        for i,var in enumerate(curr_vars_interest):
            curr_vars_interest[i] = var.lower()

        if column.lower() not in curr_vars_interest:
            print(column, " is not of interest. Dropping...")
            file_df.drop(column, axis="columns", inplace=True)

    print("")
    return file_df


def add_headers(files_df, folder):
    # Determines which separate excel layout file to use. Layout files contain a list of headers for each file within the folder.
    if folder == "ZAsmt":
        layout_file = pd.read_csv('asmt_layout.xlsx')
    else:
        layout_file = pd.read_csv('trans_layout.xlsx')

    # List of file names in given folder; file names are repeated once in the resultant list for each variable they have.
    # Data is taken from the layout file.
    # Example: if BuyerName has 6 variables, it will be repeated in file_names 6 times. This will make a later operation easier.
    file_names = (layout_file["TableName"].to_numpy()).tolist()

    # List of column names taken from the "FieldName" column of the layout file.
    column_headers = (layout_file["FieldName"].to_numpy()).tolist()

    # Initialized a dictionary with each file name being a key
    file_col_headers = dict.fromkeys(file_names)

    for key in file_col_headers:
        file_col_headers[key] = []

    # Determines the file's associated column and places it in a list on the associated dict key
    total_var_count = 0
    for file in file_names:
        file_col_headers[file].append(column_headers[total_var_count])
        total_var_count += 1

    # file_col_headers now contains a complete dict with key 'ut' + Filename (how the layout file formats the name)
    # and values equal to a list of column names.

    # log
    print(folder, total_var_count,
          ": This number should be equal to the number of rows in the layout file (for the given folder) minus 1")
    print("")

    #
    for file in file_col_headers.keys():
        curr_df = insert_headers(folder, file, file_col_headers[file])
        if not curr_df.empty:
            files_df.append(curr_df)
    return files_df


# ----------------
# MAIN
# ----------------
folder = "ZAsmt"

# for folder in folders:
# DF containing all working data files, will be combined at ends
asmt_df = dd.DataFrame
trans_df = dd.DataFrame

for folder in folders:
    files_df = []
    files_df = add_headers(files_df, folder)

    # Combine files_df (merge files on key)
    # -----------------
    final_df = files_df[0]
    index = 0

    for file in files_df:
        if index > 0:
            if folder == "ZAsmt":
                final_df = final_df.merge(file, on="rowid", how="outer")
            else:
                print(files_df)
                for file in files_df:
                    print (file.head)
                final_df = final_df.merge(file, on="transid", how="outer")
        index = index + 1

    if folder == "ZAsmt":
        asmt_df = final_df
    else:
        trans_df = final_df

final_df = asmt_df.merge(trans_df, on="batchid", how="outer").dropna()

print("Writing to file...")
final_df.to_csv("out.csv")

UnicodeDecodeError: 'utf-8' codec can't decode bytes in position 15-16: invalid continuation byte