# Final Project Pipeline

### Introduction
In the fast-paced world of real estate, the ability to continuously monitor and analyze housing data is crucial for obtaining optimal pricing. In order to achieve accurate insights and competitive advantages, clear, reliable, and consistent data is paramount. This project focuses on the creation of a robust data pipeline that facilitates the seamless collection, processing, and analysis of housing data, making it an ideal example of how data engineering and analytics can optimize decision-making processes.

### Dataset
The dataset used in this project comes from GeeksforGeeks. While smaller in size than what would typically be encountered in a real-world scenario, this dataset provides a clear view of the data we aim to analyze. 


| **Feature**                      | **Description**                                                                 |
|-----------------------------------|---------------------------------------------------------------------------------|
| **parcelid**                      | Unique identifier for each property.                                            |
| **airconditioningtypeid**         | Identifier for the type of air conditioning system.                             |
| **architecturalstyletypeid**      | Identifier for the architectural style of the property.                         |
| **basementsqft**                  | Square footage of the basement.                                                  |
| **bathroomcnt**                   | Total number of bathrooms in the property.                                      |
| **bedroomcnt**                    | Total number of bedrooms in the property.                                       |
| **buildingclasstypeid**           | Identifier for the building type (e.g., wood frame, steel).                     |
| **buildingqualitytypeid**         | Identifier for the building's quality.                                          |
| **calculatedbathnbr**              | Calculated number of bathrooms based on data.                                   |
| **decktypeid**                    | Identifier for the type of deck (e.g., wood, concrete).                         |
| **finishedfloor1squarefeet**      | Square footage of the first floor that is finished.                             |
| **calculatedfinishedsquarefeet**  | Total finished square footage of the property.                                  |
| **fips**                          | Federal Information Processing Standards (FIPS) code for region.                |
| **fireplacecnt**                  | Number of fireplaces in the property.                                           |
| **fullbathcnt**                   | Number of full bathrooms in the property.                                       |
| **garagecarcnt**                  | Number of cars the garage can accommodate.                                      |
| **garagetotalsqft**               | Total square footage of the garage.                                             |
| **hashottuborspa**                | Binary indicator if the property has a hot tub or spa.                          |
| **heatingorsystemtypeid**         | Identifier for the heating or climate control system.                           |
| **latitude**                      | Geographic latitude of the property.                                            |
| **longitude**                     | Geographic longitude of the property.                                           |
| **lotsizesquarefeet**             | Size of the property lot in square feet.                                        |
| **poolcnt**                       | Number of pools on the property.                                                |
| **poolsizesum**                   | Total size (square footage) of all pools.                                       |
| **propertycountylandusecode**     | Code for the type of land use in the county.                                    |
| **propertylandusetypeid**         | Identifier for the land use type (e.g., residential, commercial).               |
| **propertyzoningdesc**            | Description of the zoning classification (e.g., residential, commercial).        |
| **regionidcity**                  | Identifier for the city the property is located in.                             |
| **regionidcounty**                | Identifier for the county the property is located in.                           |
| **regionidzip**                   | ZIP code identifier for the property.                                           |
| **roomcnt**                       | Total number of rooms in the property.                                          |
| **storytypeid**                   | Identifier for the number of stories in the property.                           |
| **threequarterbathnbr**           | Number of three-quarter bathrooms (toilet, sink, shower).                       |
| **unitcnt**                       | Number of units in a multi-unit property.                                        |
| **yearbuilt**                     | The year the property was built.                                                |
| **structuretaxvaluedollarcnt**    | Tax-assessed value of the property structure.                                   |
| **taxvaluedollarcnt**             | Total tax-assessed value of the property (land + structure).                    |
| **assessmentyear**                | Year of the tax assessment.                                                     |
| **landtaxvaluedollarcnt**         | Tax-assessed value of the property land.                                        |
| **taxamount**                     | Total property taxes due.                                                       |
| **taxdelinquencyflag**            | Binary flag indicating if there is a tax delinquency.                           |
| **taxdelinquencyyear**            | Year in which tax delinquency occurred, if applicable.                          |
| **target**                        | Target variable for prediction models (e.g., sale status).                      |



### Pipeline Overview

The data pipeline is designed to automate the ingestion, processing, and analysis of the housing dataset. Below is a high-level overview of the pipeline architecture:





```mermaid
graph LR;
    subgraph Ingestion
        A[GeeksforGeeks Website] --> B[InvokeHTTP Processor]
    end
    subgraph Transformation
        B --> C[PutHDFS Processor]
        C --> D[Hadoop File Store]
        D --> E[Hive Table Upload]
        E --> F[Pyspark/Polars for Analysis]
        F --> G[Machine Learning Models/Analysis Output]
    end
    subgraph Loading
        G --> H[Loading to Final Storage/Reporting]
    end

    style Ingestion fill:#a2d3f7,stroke:#0061f2,stroke-width:2px
    style Transformation fill:#f7d2a2,stroke:#f2a600,stroke-width:2px
    style Loading fill:#d3f7a2,stroke:#60f200,stroke-width:2px
```


The data is ingested into the pipeline via NiFi. NiFi uses the InvokeHTTP processor to get the data from the GeeksforGeeks website that hosts the data. It then uses the PutHDFS processor to put it into a hadoop file store location in data/zillow. Once there I use the Hive CLI to take the data from the file and upload it to a hive table. Once in the Hive table, I can use Pyspark to do machine learning and analysis on the dataset.

### Issues Encountered

So I did encounter quite a few issues. I do want to clarify that I did not encounter issues with the core pipeline itself, but I did run into issues with Hive integration. I was able to successfully get the Hive Processors in NiFi, but despite me reconfiguring the Hive-site.xml and attempting to bridge the NiFi connection with Hive, it still proved to be unsuccessful.

I have tried several attempts where I configured it to be localhost and port forwarding, but still no luck.

### Successful Pipeline

#### Nifi Ingestion:

![nifi.png](attachment:image.png)

#### Moving to HDFS

![hdfs.png](attachment:image-2.png)

### Code

### Improvements
Option 1 for batches
So for improvements on this project that I will most likely do, I would change quite a bit of it to have much easier integration with all these items.

I would change my orchestrator to Prefect (I personally like their UI so much better and it is a lot cleaner to work with). I would have ECS and ECR instances generated so that I can deploy a VM with prefect-deployment.yml files so that I can run the virtual environment (most likely going to be UV since it is written in Rust and so much faster) on that instance. Once it is running, I would then have my pipeline pull data directly from the URL in python using something like HTTPX to get the data. I would save that as a dataframe. Use something like Pandera to ensure the data follows the appropriate Schema. I would then use pyspark or polars (depending on the size of the df) to clean/do analysis and then I would use dbt integration that prefect has to set up a profile to push it to whatever schema I define in the profile (could be test or prod). Then I could call deployments on a schedule that Prefect allows me to do.


Option 2:
Essentially the same thing except I would set up kafka topic to constantly let me stream data as it gets pulled. Then I would set up DBT to be incremental pushes instead of just batch pushes.


### Conclusion



In [23]:
import base64
from IPython.display import display, HTML

def mm(graph):
    # Encode the Mermaid graph into a base64 string
    graphbytes = graph.encode("utf8")
    base64_bytes = base64.urlsafe_b64encode(graphbytes)
    base64_string = base64_bytes.decode("ascii")
    
    # Construct the URL for the mermaid diagram
    url = "https://mermaid.ink/img/" + base64_string
    print(url)
    # Embed the image using HTML
    display(HTML(f'<img src="{url}" width="100%" />'))

# Call the function to render the Mermaid diagram
mm("""
graph LR;
    subgraph Ingestion
        A[GeeksforGeeks Website] --> B[InvokeHTTP Processor]
    end
    subgraph Transformation
        B --> C[PutHDFS Processor]
        C --> D[Hadoop File Store]
        D --> E[Hive Table Upload]
        E --> F[Pyspark/Polars for Analysis]
        F --> G[Machine Learning Models/Analysis Output]
    end
    subgraph Loading
        G --> H[Loading to Final Storage/Reporting]
    end

    style Ingestion fill:#a2d3f7,stroke:#0061f2,stroke-width:2px
    style Transformation fill:#f7d2a2,stroke:#f2a600,stroke-width:2px
    style Loading fill:#d3f7a2,stroke:#60f200,stroke-width:2px
""")


https://mermaid.ink/img/CmdyYXBoIExSOwogICAgc3ViZ3JhcGggSW5nZXN0aW9uCiAgICAgICAgQVtHZWVrc2ZvckdlZWtzIFdlYnNpdGVdIC0tPiBCW0ludm9rZUhUVFAgUHJvY2Vzc29yXQogICAgZW5kCiAgICBzdWJncmFwaCBUcmFuc2Zvcm1hdGlvbgogICAgICAgIEIgLS0-IENbUHV0SERGUyBQcm9jZXNzb3JdCiAgICAgICAgQyAtLT4gRFtIYWRvb3AgRmlsZSBTdG9yZV0KICAgICAgICBEIC0tPiBFW0hpdmUgVGFibGUgVXBsb2FkXQogICAgICAgIEUgLS0-IEZbUHlzcGFyay9Qb2xhcnMgZm9yIEFuYWx5c2lzXQogICAgICAgIEYgLS0-IEdbTWFjaGluZSBMZWFybmluZyBNb2RlbHMvQW5hbHlzaXMgT3V0cHV0XQogICAgZW5kCiAgICBzdWJncmFwaCBMb2FkaW5nCiAgICAgICAgRyAtLT4gSFtMb2FkaW5nIHRvIEZpbmFsIFN0b3JhZ2UvUmVwb3J0aW5nXQogICAgZW5kCgogICAgc3R5bGUgSW5nZXN0aW9uIGZpbGw6I2EyZDNmNyxzdHJva2U6IzAwNjFmMixzdHJva2Utd2lkdGg6MnB4CiAgICBzdHlsZSBUcmFuc2Zvcm1hdGlvbiBmaWxsOiNmN2QyYTIsc3Ryb2tlOiNmMmE2MDAsc3Ryb2tlLXdpZHRoOjJweAogICAgc3R5bGUgTG9hZGluZyBmaWxsOiNkM2Y3YTIsc3Ryb2tlOiM2MGYyMDAsc3Ryb2tlLXdpZHRoOjJweAo=
