In [1]:
pip install openclean

Collecting openclean
  Downloading openclean-0.2.1-py3-none-any.whl (5.2 kB)
Collecting openclean-core==0.4.1
  Downloading openclean_core-0.4.1-py3-none-any.whl (267 kB)
[K     |████████████████████████████████| 267 kB 26.8 MB/s 
Collecting flowserv-core>=0.8.0
  Downloading flowserv_core-0.9.2-py3-none-any.whl (260 kB)
[K     |████████████████████████████████| 260 kB 45.8 MB/s 
Collecting jellyfish
  Downloading jellyfish-0.8.9.tar.gz (137 kB)
[K     |████████████████████████████████| 137 kB 41.0 MB/s 
Collecting jsonschema>=3.2.0
  Downloading jsonschema-4.2.1-py3-none-any.whl (69 kB)
[K     |████████████████████████████████| 69 kB 4.9 MB/s 
Collecting histore>=0.4.0
  Downloading histore-0.4.1-py3-none-any.whl (109 kB)
[K     |████████████████████████████████| 109 kB 35.1 MB/s 
[?25hCollecting refdata>=0.2.0
  Downloading refdata-0.2.0-py3-none-any.whl (37 kB)
Collecting passlib
  Downloading passlib-1.7.4-py2.py3-none-any.whl (525 kB)
[K     |███████████████████████████████

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import openclean
import os

Download Dataset from https://data.cityofnewyork.us/Housing-Development/DOB-Violations/3h2n-5cm9

Then upload it to google drive

In [4]:
fileName = '/content/drive/My Drive/Big Data/DOB_Violations.csv'

In [5]:
from openclean.pipeline import stream
ds = stream(os.path.join('data', fileName))

In [6]:
ds.columns

['ISN_DOB_BIS_VIOL',
 'BORO',
 'BIN',
 'BLOCK',
 'LOT',
 'ISSUE_DATE',
 'VIOLATION_TYPE_CODE',
 'VIOLATION_NUMBER',
 'HOUSE_NUMBER',
 'STREET',
 'DISPOSITION_DATE',
 'DISPOSITION_COMMENTS',
 'DEVICE_NUMBER',
 'DESCRIPTION',
 'ECB_NUMBER',
 'NUMBER',
 'VIOLATION_CATEGORY',
 'VIOLATION_TYPE']

In [7]:
from openclean.function.eval.base import Col

In [8]:
columns=[
    'HOUSE_NUMBER',
    'STREET',
    'VIOLATION_TYPE_CODE',
    'VIOLATION_NUMBER',
    'VIOLATION_CATEGORY',
    'VIOLATION_TYPE'
]
ds = ds.select(columns)

Data profiling with openclean

In [9]:
profiles = ds.profile()

In [10]:
profiles

[{'column': 'HOUSE_NUMBER',
  'stats': {'datatypes': defaultdict(collections.Counter,
               {'total': Counter({'date': 3689,
                         'float': 119,
                         'int': 1846055,
                         'str': 382542})}),
   'emptyValueCount': 367,
   'minmaxValues': {'date': {'maximum': datetime.datetime(4572, 12, 13, 0, 0),
     'minimum': datetime.datetime(100, 1, 13, 0, 0)},
    'float': {'maximum': 91018.0, 'minimum': 7.5},
    'int': {'maximum': 8152008, 'minimum': -28},
    'str': {'maximum': '¦04', 'minimum': '#G25FP'}},
   'totalValueCount': 2232772}},
 {'column': 'STREET',
  'stats': {'datatypes': defaultdict(collections.Counter,
               {'total': Counter({'date': 32, 'int': 32, 'str': 2215309})}),
   'emptyValueCount': 17399,
   'minmaxValues': {'date': {'maximum': datetime.datetime(2067, 12, 13, 0, 0),
     'minimum': datetime.datetime(180, 12, 13, 0, 0)},
    'int': {'maximum': 63, 'minimum': 0},
    'str': {'maximum': '\xa0MAIN S

In [11]:
profiles.stats()['empty']

HOUSE_NUMBER             367
STREET                 17399
VIOLATION_TYPE_CODE        0
VIOLATION_NUMBER           5
VIOLATION_CATEGORY      3757
VIOLATION_TYPE            11
Name: empty, dtype: int64

In [12]:
profiles.types()

Unnamed: 0,date,float,int,str
HOUSE_NUMBER,3689,119,1846055,382542
STREET,32,0,32,2215309
VIOLATION_TYPE_CODE,0,0,0,2232772
VIOLATION_NUMBER,64,278,1280467,951958
VIOLATION_CATEGORY,0,0,0,2229015
VIOLATION_TYPE,0,0,0,2232761


In [13]:
from openclean.function.eval.datatype import IsDatetime

ds.select('HOUSE_NUMBER').filter(IsDatetime('HOUSE_NUMBER')).head()

Unnamed: 0,HOUSE_NUMBER
505,360
807,263
811,503
972,503
1082,1301
1085,498
1259,108
1261,154
1284,615
1542,89-06


In [14]:
ds.select('STREET').filter(IsDatetime('STREET')).distinct()

Counter({'180  ST': 16,
         '31 ST.': 1,
         '41  ST': 9,
         '47  ST': 1,
         '47 ST.': 1,
         '67  ST': 3,
         '78. ST': 1})

In [15]:
ds.filter(Col('HOUSE_NUMBER')=='#G25FP').distinct()

Counter({('#G25FP',
          'J F K AIRPORT',
          'LL6291',
          '29796',
          'V-DOB VIOLATION - ACTIVE',
          'LL6291-LOCAL LAW 62/91 - BOILERS'): 1,
         ('#G25FP',
          'J F K AIRPORT',
          'LL6291',
          '29797',
          'V-DOB VIOLATION - ACTIVE',
          'LL6291-LOCAL LAW 62/91 - BOILERS'): 1,
         ('#G25FP',
          'J F K AIRPORT',
          'LL6291',
          '30384',
          'V-DOB VIOLATION - ACTIVE',
          'LL6291-LOCAL LAW 62/91 - BOILERS'): 1,
         ('#G25FP',
          'J F K AIRPORT',
          'LL6291',
          '30385',
          'V-DOB VIOLATION - ACTIVE',
          'LL6291-LOCAL LAW 62/91 - BOILERS'): 1,
         ('#G25FP',
          'J F K AIRPORT',
          'LL6291',
          '30651',
          'V-DOB VIOLATION - ACTIVE',
          'LL6291-LOCAL LAW 62/91 - BOILERS'): 1,
         ('#G25FP',
          'J F K AIRPORT',
          'LL6291',
          '30652',
          'V-DOB VIOLATION - ACTIVE',
      

In [16]:
ds.select('STREET').head()

Unnamed: 0,STREET
0,23 AVENUE
1,BROADWAY
2,BATHGATE AVENUE
3,NORTH 6 STREET
4,PARK PL
5,BEDFORD AVENUE
6,DEAN STREET
7,97 STREET
8,WEST 82 STREET
9,134 STREET


In [17]:
ds.select('VIOLATION_NUMBER').filter(IsDatetime('VIOLATION_NUMBER')).distinct()

Counter({'-10/89': 1,
         '-10/90': 1,
         '-11/89': 1,
         '-11/90': 1,
         '-11/94': 1,
         '-12/89': 1,
         '-12/90': 1,
         '-12/94': 1,
         '....1798': 1,
         '...1525': 1,
         '...7026': 1,
         '02/14/90': 1,
         '030100..-': 1,
         '04/04/94': 1,
         '04/05/2005': 1,
         '06 28 93': 1,
         '060100..-': 1,
         '060100...-': 1,
         '060200...-': 1,
         '07 14 93': 1,
         '1/9/91': 1,
         '104/01-02A': 1,
         '110100 -': 1,
         '110100 .-': 1,
         '110100.. -': 1,
         '110100..-': 1,
         '110100...-': 1,
         '1105 ./01': 1,
         '1105 /02': 1,
         '1105../01': 1,
         '1160./1': 1,
         '1160./2': 1,
         '1176 /1': 1,
         '1179  /4': 1,
         '1255-1-1': 2,
         '1275..../01': 1,
         '1275./01': 1,
         '1300 11/05': 1,
         '1308 4/01': 1,
         '132/02-04A': 1,
         '1320-1-1': 1,
         '133

In [18]:
ds.select('STREET').filter(IsDatetime('STREET')).distinct()

Counter({'180  ST': 16,
         '31 ST.': 1,
         '41  ST': 9,
         '47  ST': 1,
         '47 ST.': 1,
         '67  ST': 3,
         '78. ST': 1})

In [19]:
ds.count()

2232772

Data clean with spark

In [22]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 64.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=766302f418953fad6f5a421407d19c1a673d14d8a9c1559ae3690df83b3864b3
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [23]:
import sys
from operator import add
from pyspark import SparkContext
from csv import reader

In [24]:
sc = SparkContext()
data_full = sc.textFile(fileName, 1)
data_full = data_full.mapPartitions(lambda x: reader(x))
data_full = data_full.filter(lambda x: x[0] != "ISN_DOB_BIS_VIOL")

data = data_full.map(lambda x: [str(x[8]), str(x[9]), x[6], x[7], x[16], x[17]])

data = data.filter(lambda x: 
                        x[0] != "" and
                        x[1] != "" and
                        x[2] != "" and
                        x[3] != "" and
                        x[4] != "" and
                        x[5] != "")

def repair(x):
    x[0] = x[0].replace(' ','')
    x[0] = x[0].replace('\xa0','')
    x[0] = x[0].replace("'",'')
    x[0] = x[0].replace("\\",'')
    x[0] = x[0].replace("`",'')
    x[0] = x[0].replace("¦",'')
    x[0] = x[0].replace('+','')
    x[0] = x[0].replace('-','')
    
    x[1] = x[1].replace('  ',' ')
    x[1] = x[1].replace("'",'')
    x[1] = x[1].replace('`','')
    x[1] = x[1].replace('¦','')
    x[1] = x[1].replace('+','')
    x[1] = x[1].replace('-','')
    while len(x[0]) > 0 and x[0][0] == '0':
        x[0] = x[0][1:]
    while len(x[1]) > 0 and x[1][0] == '.':
        x[1] = x[1][1:]
    return x
    
data_repaired = data.map(repair)
data_repaired = data_repaired.filter(lambda x: x[0] != "").sortBy(lambda x:x[0]).sortBy(lambda x:x[1])

data_result = data_repaired.map(lambda x: str(x[0]) + " " + str(x[1]) + "\t" + str(x[2]) + "\t" + str(x[3]) + "\t" + str(x[4]) + "\t" + str(x[5]))

In [25]:
data_result.take(10)

['128 \tFISPNRF\t00065\tV*-DOB VIOLATION - DISMISSED\tFISPNRF-NO REPORT AND / OR LATE FILING (FACADE)',
 '3802 \t15 AVENUE\tBENCH\t01247\tV-DOB VIOLATION - ACTIVE\tBENCH-FAILURE TO BENCHMARK',
 '677 \tVAN SINDEREN AVENUE\tBENCH\t01178\tV*-DOB VIOLATION - DISMISSED\tBENCH-FAILURE TO BENCHMARK',
 '3802  15 AVENUE\tBENCH\t00570\tV-DOB VIOLATION - ACTIVE\tBENCH-FAILURE TO BENCHMARK',
 '3802  15 AVENUE\tBENCH\t00816\tV-DOB VIOLATION - ACTIVE\tBENCH-FAILURE TO BENCHMARK',
 '16304 0\tBENCH\t00640\tV-DOB VIOLATION - ACTIVE\tBENCH-FAILURE TO BENCHMARK',
 '32A 0\tBENCH\t00691\tV-DOB VIOLATION - ACTIVE\tBENCH-FAILURE TO BENCHMARK',
 '606 0\tBENCH\t00372\tV-DOB VIOLATION - ACTIVE\tBENCH-FAILURE TO BENCHMARK',
 '624 0\tBENCH\t00373\tV*-DOB VIOLATION - DISMISSED\tBENCH-FAILURE TO BENCHMARK',
 '100 1  STREET\tC\t06SC02\tVP*-VIOLATION UNSERVED ECB- DISMISSED\tC-CONSTRUCTION']

In [26]:
sc.stop()