# Preprocessing using dataflow

## 1. The dataset

### NYC Property Sales: A year's worth of properties sold on the NYC real estate market

The nyc-rolling-sales.csv.zip (2 Mb) is a record of every building or building unit (apartment, etc.) sold in the New York City property market over a 12-month period from September 2016 to September 2017. It contains the following features:

* **Borough: (N)** The name of the borough in which the property is located. A digit code for the borough the property is located in; in order these are Manhattan (1), Bronx (2), Brooklyn (3), Queens (4), and Staten Island (5).
* **Neighborhood: (S)** Department of Finance assessors determine the neighborhood name in the course of valuing properties. The common name of the neighborhood is generally the same as the name Finance designates. However, there may be slight differences in neighborhood boundary lines and some sub-neighborhoods may not be included. 
* **Building Class Category: (S)** This is a field that we are including so that users of the Rolling Sales Files can easily identify similar properties by broad usage (e.g. One Family Homes) without looking up individual Building Classes. Files are sorted by Borough, Neighborhood, Building Class Category, Block and Lot.
* **Tax Class at Present: (N)** Every property in the city is assigned to one of four tax classes (Classes 1, 2, 3, and 4), based on the use of the property.
    * *Class 1:* Includes most residential property of up to three units (such as one-, two-, and three-family homes and small stores or offices with one or two attached apartments), vacant land that is zoned for residential use, and most condominiums that are not more than three stories.
     * *Class 2:* Includes all other property that is primarily residential, such as cooperatives and condominiums.
     * *Class 3:* Includes property with equipment owned by a gas, telephone or electric company.
     * *Class 4:* Includes all other properties not included in class 1,2, and 3, such as offices, factories, warehouses, garage buildings, etc.
* **Block: (N)** A Tax Block is a sub-division of the borough on which real properties are located. The Department of Finance uses a Borough-Block-Lot classification to label all real property in the City. “Whereas” addresses describe the street location of a property, the block and lot distinguishes one unit of real property from another, such as the different condominiums in a single building. Also, block and lots are not subject to name changes based on which side of the parcel the building puts its entrance on. 
* **Lot: (N)** A Tax Lot is a subdivision of a Tax Block and represents the property unique location. The combination of borough, block, and lot forms a unique key for property in New York City. Commonly called a BBL.
* **Easement: (S)** An easement is a right, such as a right of way, which allows an entity to make limited use of another’s real property. For example: MTA railroad tracks that run across a portion of another property. 
* **Building Class at Present: (S)** The Building Classification is used to describe a property’s constructive use. The first position of the Building Class is a letter that is used to describe a general class of properties (for example “A” signifies one-family homes, “O” signifies office buildings. “R” signifies condominiums). The second position, a number, adds more specific information about the property’s use or construction style (using our previous examples “A0” is a Cape Cod style one family home, “O4” is a tower type office building and “R5” is a commercial condominium unit). The term Building Class used by the Department of Finance is interchangeable with the term Building Code used by the Department of Buildings.
* **Address: (S)** The street address of the property as listed on the Sales File. Coop sales include the apartment number in the address field. 
* **Apartement Number (S)**
* **Zip Code: (N)** The property’s postal code 
* **Residential Units: (N)** The number of residential units at the listed property.
* **Commercial Units: (N)** The number of commercial units at the listed property.
* **Total Units: (N)** The total number of units at the listed property.
* **Land Square Feet: (N)** The land area of the property listed in square feet.
* **Gross Square Feet: (N)** The total area of all the floors of a building as measured from the exterior surfaces of the outside walls of the building, including the land area and space within any building or structure on the property. 
* **Year Built: (N)** Year the structure on the property was built. 
* **Tax Class at Time of Sale (N)**
* **Building Class at Time of Sale: (S)** 
* **Sales Price: (N)** Price paid for the property. A $0$ sale indicates that there was a transfer of ownership without a cash consideration. There can be a number of reasons for a $0$ sale including transfers of ownership from parents to children. 
* **Sale Date: (D)** Date the property sold.

Note: (N): number, (S): string, (D): date

In [1]:
%%bash
conda update -y -n base -c defaults conda
source activate py2env
pip uninstall -y google-cloud-dataflow
conda install -y pytz
pip install apache-beam[gcp]==2.9.0

Solving environment: ...working... done

## Package Plan ##

  environment location: /usr/local

  added / updated specs: 
    - conda


The following packages will be UPDATED:

    ca-certificates: 2018.03.07-0      defaults --> 2019.1.23-0       defaults
    certifi:         2018.11.29-py27_0 defaults --> 2019.3.9-py27_0   defaults
    conda:           4.5.12-py27_0     defaults --> 4.6.12-py27_1     defaults
    openssl:         1.1.1a-h7b6447c_0 defaults --> 1.1.1b-h7b6447c_1 defaults

Preparing transaction: ...working... done
Verifying transaction: ...working... done
Executing transaction: ...working... done
Collecting package metadata: ...working... done
Solving environment: ...working... done

# All requested packages already installed.

Collecting pytz<=2018.4,>=2018.3 (from apache-beam[gcp]==2.9.0)
  Using cached https://files.pythonhosted.org/packages/dc/83/15f7833b70d3e067ca91467ca245bae0f6fe56ddc7451aa0dc5606b120f2/pytz-2018.4-py2.py3-none-any.whl
Installing collected packa

Skipping google-cloud-dataflow as it is not installed.
google-cloud-bigquery 1.6.1 has requirement google-api-core<2.0.0dev,>=1.0.0, but you'll have google-api-core 0.1.4 which is incompatible.


don't forget to **Reset Session**

In [1]:
# change these to try this notebook out
BUCKET = 'huiyi-sandbox'
PROJECT = 'huiyi-training'
REGION = 'us-central1'

In [2]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [3]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi

Save the query from earlier

In [4]:
# Create SQL query using natality data after the year 2000
import google.datalab.bigquery as bq
query = """
SELECT
    SALE_PRICE,
    BOROUGH,
    NEIGHBORHOOD,
    BUILDING_CLASS_CATEGORY,
    TAX_CLASS_AT_PRESENT,
    BLOCK,
    LOT,
    ZIP_CODE,
    RESIDENTIAL_UNITS,
    COMMERCIAL_UNITS, 
    TOTAL_UNITS,
    LAND_SQUARE_FEET,
    GROSS_SQUARE_FEET,
    YEAR_BUILT,
    TAX_CLASS_AT_TIME_OF_SALE,
    EXTRACT(YEAR FROM SALE_DATE) as S_YEAR,
    EXTRACT(MONTH FROM SALE_DATE) as S_MONTH,
    EXTRACT(DAY FROM SALE_DATE) as S_DAY
FROM 
  test.nycrollingsales
WHERE SALE_PRICE <> ' -  '
AND SALE_PRICE > '10000'
ORDER BY
  SALE_PRICE
"""

In [5]:
# Call BigQuery
df = bq.Query( query ).execute().result().to_dataframe()
df.head()

Unnamed: 0,SALE_PRICE,BOROUGH,NEIGHBORHOOD,BUILDING_CLASS_CATEGORY,TAX_CLASS_AT_PRESENT,BLOCK,LOT,ZIP_CODE,RESIDENTIAL_UNITS,COMMERCIAL_UNITS,TOTAL_UNITS,LAND_SQUARE_FEET,GROSS_SQUARE_FEET,YEAR_BUILT,TAX_CLASS_AT_TIME_OF_SALE,S_YEAR,S_MONTH,S_DAY
0,100000,4,ROCKAWAY PARK,07 RENTALS - WALKUP APARTMENTS,2B,16181,29,11694,10,0,10,8000,3750,1930,2,2017,5,30
1,100000,3,OLD MILL BASIN,09 COOPS - WALKUP APARTMENTS,2,8401,20,11234,0,0,0,0,0,1955,2,2017,4,7
2,100000,4,OAKLAND GARDENS,09 COOPS - WALKUP APARTMENTS,2,7628,2,11364,0,0,0,-,-,1949,2,2016,11,7
3,100000,4,QUEENS VILLAGE,09 COOPS - WALKUP APARTMENTS,2,10682,65,11427,0,0,0,-,-,1949,2,2017,2,17
4,100000,1,HARLEM-UPPER,10 COOPS - ELEVATOR APARTMENTS,2,2072,55,10031,0,0,0,-,-,1910,2,2017,5,11


Create ML dataset using Dataflow

In [28]:
def to_csv(rowdict):
  # Pull columns from BQ and create a line
  import hashlib
  import copy
  CSV_COLUMNS='SALE_PRICE,BOROUGH,NEIGHBORHOOD,BUILDING_CLASS_CATEGORY,TAX_CLASS_AT_PRESENT,BLOCK,LOT,ZIP_CODE,RESIDENTIAL_UNITS,COMMERCIAL_UNITS,\
  TOTAL_UNITS,LAND_SQUARE_FEET,GROSS_SQUARE_FEET,YEAR_BUILT,TAX_CLASS_AT_TIME_OF_SALE'.split(',')

  dataset = copy.deepcopy(rowdict)
  print(type(dataset))
  # Write out rows for the input row
  for result in dataset:
    data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
    key = hashlib.sha224(data).hexdigest()  # hash the columns to form a key
    yield str('{},{}'.format(data, key))

In [29]:
import apache_beam as beam
import datetime, os

In [30]:
def preprocess(in_test_mode):
  import shutil, os, subprocess
  job_name = 'preprocess-tuto-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  if in_test_mode:
      print('Launching local job ... hang on')
      OUTPUT_DIR = './preproc'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      os.makedirs(OUTPUT_DIR)
  else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/preproc/'.format(BUCKET)
      try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      except:
        pass

  options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
      'job_name': job_name,
      'project': PROJECT,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'max_num_workers': 6
  }
  
  opts = beam.pipeline.PipelineOptions(flags = [], **options)
  if in_test_mode:
      RUNNER = 'DirectRunner'
  else:
      RUNNER = 'DataflowRunner'
      
  p = beam.Pipeline(RUNNER, options = opts)
  
  query = """
  SELECT
    SALE_PRICE,
    BOROUGH,
    NEIGHBORHOOD,
    BUILDING_CLASS_CATEGORY,
    TAX_CLASS_AT_PRESENT,
    BLOCK,
    LOT,
    ZIP_CODE,
    RESIDENTIAL_UNITS,
    COMMERCIAL_UNITS, 
    TOTAL_UNITS,
    LAND_SQUARE_FEET,
    GROSS_SQUARE_FEET,
    YEAR_BUILT,
    TAX_CLASS_AT_TIME_OF_SALE,
    EXTRACT(YEAR FROM SALE_DATE) as S_YEAR,
    EXTRACT(MONTH FROM SALE_DATE) as S_MONTH,
    EXTRACT(DAY FROM SALE_DATE) as S_DAY
  FROM 
    test.nycrollingsales
  WHERE SALE_PRICE <> ' -  '
  AND NEIGHBORHOOD <> ' -  '
  AND BUILDING_CLASS_CATEGORY <> ' -  '
  AND TAX_CLASS_AT_PRESENT <> ' -  '
  AND LAND_SQUARE_FEET <> ' -  '
  AND GROSS_SQUARE_FEET <> ' -  '
  AND SALE_PRICE > '10000'
  AND BOROUGH > 0
  AND BLOCK > 0
  AND LOT > 0
  AND ZIP_CODE > 0
  AND RESIDENTIAL_UNITS >= 0
  AND COMMERCIAL_UNITS >= 0
  AND TOTAL_UNITS >= 0
  AND YEAR_BUILT > 0
  AND TAX_CLASS_AT_TIME_OF_SALE > 0
  AND TOTAL_UNITS = RESIDENTIAL_UNITS + COMMERCIAL_UNITS
  AND TOTAL_UNITS > 0
  AND LAND_SQUARE_FEET > '0'
  AND GROSS_SQUARE_FEET > '0'
  AND 
  (
  TAX_CLASS_AT_PRESENT = '1'
  OR TAX_CLASS_AT_PRESENT = '2'
  OR TAX_CLASS_AT_PRESENT = '3'
  OR TAX_CLASS_AT_PRESENT = '4'
  )
  ORDER BY
    SALE_PRICE
    """
  
  if in_test_mode:
    query = query + ' LIMIT 100'
  
  for step in ['train', 'eval']:
    if step == 'train':
      selquery = "SELECT * FROM ({}) WHERE SALE_PRICE < '600000'" .format(query)
    else:
      selquery = "SELECT * FROM ({}) WHERE SALE_PRICE >= '600000'".format(query)

    (p 
     | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))
     | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
     | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
    )

  job = p.run()
  if in_test_mode:
    job.wait_until_finish()
    print("Done!")


In [31]:
preprocess(in_test_mode = False)

Launching Dataflow job preprocess-tuto-features-190418-134957 ... hang on
