In [2]:
# Polar and Polar Types
import polars
from polars.datatypes import Datetime

# Pandera types
import pandera.polars as pa
from pandera.typing import Series

# Base Python Dependencies
from datetime import datetime
import json

# Import DuckDb
import duckdb

# Custom Code
from src.pyUtils.apis import phillyOpenData

# Workflow:
1) Ingest Data from OpenDataPhilly
2) Run a quick set of validations against the data, checking for the following:
    - Unique identifier:
    - Date range is appropriate for the following:
    - Data Types are correct for:
      - one is int
    - 
3) Load to a DuckDB Database
4) Use DBT to create reporting aggregates.

## Work:

### Ingest Data 

In [4]:
# import data:
query = phillyOpenData.writeDateTimeFilter(
    end_date = datetime.today().date().strftime('%m/%d/%Y'),
    interval = 12,
    base_query = "SELECT * FROM PERMITS",
    date_field = 'permitissuedate'
)

data = (phillyOpenData.cartoApi(query)
                      .queryDataframe()
                      .with_columns(
                          polars.col("permitissuedate")
                                .str.to_date(format = "%Y-%m-%d %H:%M:%S%#z")
                      )

        )


### Validate with Pandera

#### Validation

In [5]:
import polars.datatypes


schema = pa.DataFrameSchema(
    {
          "cartodb_id":pa.Column(int)
        , "systemofrecord":pa.Column(
            str,pa.Check.isin(['ECLIPSE'])
        )
        , "permitissuedate":pa.Column(polars.datatypes.Date)
    },
    unique = ['cartodb_id'],
    coerce = True

)

try:
    (
    schema.validate(data,
                    lazy = True)
            .write_parquet('data/raw/raw__permits.parquet')
    )
    
except pa.errors.SchemaError as exc:
    print(exc)

### DBT for Reporting

#### Reporting we need to capture:
- Exploratory Values:
  - Words
- Metrics:


# Deprecated

## Ingest Data

In [2]:
# import data:
query = phillyOpenData.writeDateTimeFilter(
    end_date = datetime.today().date().strftime('%m/%d/%Y'),
    interval = 12,
    base_query = "SELECT * FROM PERMITS",
    date_field = 'permitissuedate'
)

data = phillyOpenData.cartoApi(query).queryDataframe()

In [9]:
# This is just a quick set of information on the dataframe
print__ingest_stats = f"""
Permits Returned: {data.shape[0]}
Earliest Permit Issue Date: {
    data.select(
        polars.min('permitissuedate')
    ).item()
}
Latest Permit Issue Date: {
    data.select(
        polars.max('permitissuedate')
    ).item()
}
"""

print(print__ingest_stats)


Permits Returned: 94051
Earliest Permit Issue Date: 2022-06-12 01:30:51+00
Latest Permit Issue Date: 2024-06-11 21:00:58+00



## Pandera

## Demo: Working Example.
- In this example, create a schema, evaluate it against the dataframe return the dataframe if it evaluates without error -- which it should.


In [10]:
# Create a schema to evaluate the dataframe
schema = pa.DataFrameSchema(
    {
        "cartodb_id":pa.Column(int),
        "unit_type":pa.Column(str, nullable=True),
        "geocode_x":pa.Column(float, nullable = True)
    }
)

In [11]:
# We're using lazy validation to raise all failed checks against the dataframe
try:
    z = schema.validate(data, lazy= True)
except pa.errors.SchemaErrors as exc:
    print(json.dumps(exc.message, indent=2))

z.shape

(94051, 34)

## Demo: Throw an error if a column doesn't exist in the schema
- In this example, we raise an exception because the dataset includes fields which are not included in the schema.

In [12]:
schema = pa.DataFrameSchema(
    {
        "cartodb_id":pa.Column(int),
        "unit_type":pa.Column(str, nullable=True),
        "geocode_x":pa.Column(float, nullable = True)
    },
    strict=True
)

In [13]:
try:
    z = schema.validate(data, lazy= True)
except pa.errors.SchemaErrors as exc:
    print(json.dumps(exc.message, indent=2))


{
  "SCHEMA": {
    "COLUMN_NOT_IN_SCHEMA": [
      {
        "schema": null,
        "column": null,
        "check": "column_in_schema",
        "error": "column 'the_geom' not in DataFrameSchema {'cartodb_id': <Schema Column(name=cartodb_id, type=DataType(Int64))>, 'unit_type': <Schema Column(name=unit_type, type=DataType(String))>, 'geocode_x': <Schema Column(name=geocode_x, type=DataType(Float64))>}"
      }
    ]
  }
}


## Demo: Failed row checks in a lazy evaluation, formatted as an object.

- Create a validation object as a class, not a schema
- The valdation will have a set of checks that the rows in the dataframe will fail.
- We'll return the failed rows

As a note -- it does look like this function isn't available for polars:

ModuleNotFoundError: No module named 'pyarrow'

## Write Data to DUCKDB

In [3]:
import duckdb

In [8]:
phillyOpenData.cartoApi(query).request_url

'https://phl.carto.com/api/v2/sql?q=SELECT+%2A+FROM+PERMITS+WHERE+permitissuedate+%3C%3D+%2706%2F19%2F2024%27+AND+permitissuedate+%3E%3D+%2706%2F19%2F2023%27&format=CSV'

In [15]:
duckdb.sql(
    '''
    SELECT
        OPA_OWNER,
        COUNT(*)
    FROM
        data
    GROUP BY
        OPA_OWNER
    ORDER BY
        COUNT(*) DESC
    '''
)

┌──────────────────────────────────────────┬──────────────┐
│                opa_owner                 │ count_star() │
│                 varchar                  │    int64     │
├──────────────────────────────────────────┼──────────────┤
│ NULL                                     │         1222 │
│ PHILADELPHIA HOUSING AUTH                │          710 │
│ PHILADELPHIA LAND BANK                   │          364 │
│ CIVETTA 2 LLC                            │          311 │
│ CITY OF PHILA                            │          248 │
│ TRS UNIV OF PENN                         │          172 │
│ CITY OF PHILA, DEPT OF PUBLIC PROP       │          166 │
│ REDEVELOPMENT AUTHORITY, OF PHILADELPHIA │          160 │
│ TRUSTEES OF THE UNIVERSIT                │          133 │
│ FAIRMOUNT MANOR REALTY CO                │          111 │
│             ·                            │            · │
│             ·                            │            · │
│             ·                         

In [11]:
data.columns

['the_geom',
 'cartodb_id',
 'the_geom_webmercator',
 'objectid',
 'permitnumber',
 'addressobjectid',
 'parcel_id_num',
 'permittype',
 'permitdescription',
 'commercialorresidential',
 'typeofwork',
 'approvedscopeofwork',
 'permitissuedate',
 'status',
 'applicanttype',
 'contractorname',
 'contractoraddress1',
 'contractoraddress2',
 'contractorcity',
 'contractorstate',
 'contractorzip',
 'mostrecentinsp',
 'opa_account_num',
 'address',
 'unit_type',
 'unit_num',
 'zip',
 'censustract',
 'council_district',
 'opa_owner',
 'systemofrecord',
 'geocode_x',
 'geocode_y',
 'posse_jobid']