In [43]:
import pyspark as ps
import pandas as pd

In [44]:
pdvalues = pd.read_csv('data/csv_to_sql/completed_directory/EXTR_ValueHistory_V.csv',nrows=10)

In [48]:
pdvalues.columns

Index(['Major', 'Minor', 'TaxYr', 'OmitYr', 'ApprLandVal', 'ApprImpsVal',
       'ApprImpIncr', 'LandVal', 'ImpsVal', 'TaxValReason', 'TaxStatus',
       'LevyCode', 'ChangeDate', 'ChangeDocId', 'Reason', 'SplitCode'],
      dtype='object')

In [57]:
a = pdvalues.dtypes
a

Major            int64
Minor            int64
TaxYr            int64
OmitYr           int64
ApprLandVal      int64
ApprImpsVal      int64
ApprImpIncr      int64
LandVal          int64
ImpsVal          int64
TaxValReason    object
TaxStatus       object
LevyCode         int64
ChangeDate      object
ChangeDocId     object
Reason          object
SplitCode        int64
dtype: object

In [2]:
spark = (ps.sql.SparkSession
         .builder
         .master('local[4]')
         .appName('lecture')
         .getOrCreate()
        )
sc = spark.sparkContext

In [113]:
values = spark.read.csv('data/EXTR_ValueHistory_V.csv', header=True, inferSchema=True )


In [77]:
values.take(5)

[Row(Major=701485, Minor=40, TaxYr=1995, OmitYr=0, ApprLandVal=0, ApprImpsVal=0, ApprImpIncr=0, LandVal=12300, ImpsVal=62500, TaxValReason='  ', TaxStatus='T', LevyCode=10, ChangeDate=datetime.datetime(1994, 8, 12, 0, 0), ChangeDocId=' ', Reason='REVALUE', SplitCode=0),
 Row(Major=701485, Minor=40, TaxYr=1995, OmitYr=0, ApprLandVal=0, ApprImpsVal=0, ApprImpIncr=0, LandVal=12315, ImpsVal=38480, TaxValReason='  ', TaxStatus='T', LevyCode=10, ChangeDate=datetime.datetime(1994, 5, 10, 0, 0), ChangeDocId='U051001', Reason='MAINTENANCE', SplitCode=0),
 Row(Major=701485, Minor=40, TaxYr=1994, OmitYr=0, ApprLandVal=0, ApprImpsVal=0, ApprImpIncr=0, LandVal=12315, ImpsVal=38480, TaxValReason='  ', TaxStatus='T', LevyCode=10, ChangeDate=datetime.datetime(1994, 5, 10, 0, 0), ChangeDocId='U051001', Reason='MAINTENANCE', SplitCode=0),
 Row(Major=701485, Minor=40, TaxYr=1997, OmitYr=0, ApprLandVal=0, ApprImpsVal=0, ApprImpIncr=0, LandVal=12300, ImpsVal=62500, TaxValReason='  ', TaxStatus='T', LevyCod

In [76]:
values.dtypes

[('Major', 'int'),
 ('Minor', 'int'),
 ('TaxYr', 'int'),
 ('OmitYr', 'int'),
 ('ApprLandVal', 'int'),
 ('ApprImpsVal', 'int'),
 ('ApprImpIncr', 'int'),
 ('LandVal', 'int'),
 ('ImpsVal', 'int'),
 ('TaxValReason', 'string'),
 ('TaxStatus', 'string'),
 ('LevyCode', 'int'),
 ('ChangeDate', 'timestamp'),
 ('ChangeDocId', 'string'),
 ('Reason', 'string'),
 ('SplitCode', 'int')]

In [67]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DataType, IntegerType, StringType

In [116]:
def create_PIN(major, minor):
    return str(major).zfill(6) + str(minor).zfill(4)
create_PIN_udf = udf(create_PIN, StringType())

In [117]:
values = values.withColumn("PIN", create_PIN_udf(values['Major'], values['Minor']))

In [118]:
values = values.withColumn("TotalVal", (values['LandVal'] + values['ImpsVal']))

In [92]:
values.head()

Row(Major=701485, Minor=40, TaxYr=1995, OmitYr=0, ApprLandVal=0, ApprImpsVal=0, ApprImpIncr=0, LandVal=12300, ImpsVal=62500, TaxValReason='  ', TaxStatus='T', LevyCode=10, ChangeDate=datetime.datetime(1994, 8, 12, 0, 0), ChangeDocId=' ', Reason='REVALUE', SplitCode=0, PIN='7014850040', TotalVal=74800)

In [93]:
values_pivot = values.groupBy('PIN').pivot('TaxYr').sum('TotalVal')

In [94]:
values_pivot.head()

Row(PIN='3352401605', 1953=None, 1966=None, 1973=None, 1975=None, 1976=None, 1977=None, 1978=None, 1979=None, 1980=None, 1981=None, 1982=None, 1983=60400, 1984=None, 1985=60400, 1986=None, 1987=55500, 1988=None, 1989=59200, 1990=None, 1991=78000, 1992=None, 1993=92800, 1994=None, 1995=95700, 1996=None, 1997=95700, 1998=103000, 1999=216000, 2000=126000, 2001=139000, 2002=152000, 2003=169000, 2004=180000, 2005=193000, 2006=224000, 2007=234000, 2008=263000, 2009=289000, 2010=269000, 2011=269000, 2012=239000, 2013=239000, 2014=168000, 2015=213000, 2016=248000, 2017=279000, 2018=324000, 2019=401000, 2020=420000, 2051=None)

In [120]:
values.printSchema()

root
 |-- Major: integer (nullable = true)
 |-- Minor: integer (nullable = true)
 |-- TaxYr: integer (nullable = true)
 |-- OmitYr: integer (nullable = true)
 |-- ApprLandVal: integer (nullable = true)
 |-- ApprImpsVal: integer (nullable = true)
 |-- ApprImpIncr: integer (nullable = true)
 |-- LandVal: integer (nullable = true)
 |-- ImpsVal: integer (nullable = true)
 |-- TaxValReason: string (nullable = true)
 |-- TaxStatus: string (nullable = true)
 |-- LevyCode: integer (nullable = true)
 |-- ChangeDate: timestamp (nullable = true)
 |-- ChangeDocId: string (nullable = true)
 |-- Reason: string (nullable = true)
 |-- SplitCode: integer (nullable = true)
 |-- PIN: string (nullable = true)
 |-- TotalVal: integer (nullable = true)



In [108]:
values.summary

<bound method DataFrame.summary of DataFrame[Major: int, Minor: int, TaxYr: int, OmitYr: int, ApprLandVal: int, ApprImpsVal: int, ApprImpIncr: int, LandVal: int, ImpsVal: int, TaxValReason: string, TaxStatus: string, LevyCode: int, ChangeDate: timestamp, ChangeDocId: string, Reason: string, SplitCode: int, PIN: string, TotalVal: int]>

In [105]:
dir(values)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_collectAsArrow',
 '_jcols',
 '_jdf',
 '_jmap',
 '_jseq',
 '_lazy_rdd',
 '_repr_html_',
 '_sc',
 '_schema',
 '_sort_cols',
 '_support_repr_html',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'dropna',
 'dtypes',
 'exceptAll',
 'explain',
 'fillna',
 'filter',
 'first',
 'foreach',
 'f

In [121]:
values.createOrReplaceTempView('values')

In [123]:
query = '''SELECT * 
            FROM values 
            WHERE PIN = '3352401605' 
            '''

this_house = spark.sql(query).show(50)

+------+-----+-----+------+-----------+-----------+-----------+-------+-------+------------+---------+--------+-------------------+-----------+---------+---------+----------+--------+
| Major|Minor|TaxYr|OmitYr|ApprLandVal|ApprImpsVal|ApprImpIncr|LandVal|ImpsVal|TaxValReason|TaxStatus|LevyCode|         ChangeDate|ChangeDocId|   Reason|SplitCode|       PIN|TotalVal|
+------+-----+-----+------+-----------+-----------+-----------+-------+-------+------------+---------+--------+-------------------+-----------+---------+---------+----------+--------+
|335240| 1605| 1995|     0|          0|          0|          0|  31600|  64100|            |        T|      10|1994-10-25 00:00:00|           |  REVALUE|        0|3352401605|   95700|
|335240| 1605| 1993|     0|          0|          0|          0|  29000|  63800|            |        T|      10|1992-03-27 00:00:00|           |  REVALUE|        0|3352401605|   92800|
|335240| 1605| 1991|     0|          0|          0|          0|  24400|  53600| 

In [124]:
query = '''SELECT DISTINCT Reason 
            FROM values 

            '''

this_house = spark.sql(query).show(50)

+------------------+
|            Reason|
+------------------+
|          NEW PLAT|
|    KILL BY CANCEL|
|   LEGAL/ABATEMENT|
|       COURT ORDER|
| MOBILE HOME X-FER|
| MERGE/STATUS CHNG|
|        REACTVATED|
|      LEGAL CHANGE|
|   BOARD EXTENSION|
|              null|
|           REVALUE|
|         AMENDMENT|
|  HOME IMP EXPIRED|
|  JUNE BOARD ORDER|
|        CORRECTION|
|   OMIT CORRECTION|
|   MERGE/CODE CHNG|
| 100% VALUE LAW 73|
|             MAINT|
|     OMIT REVISION|
|         EXTENSION|
|       FOREST LAND|
|   NOV BOARD ORDER|
|    LEVY CODE CHNG|
|            MERGER|
|   TAX STATUS CHNG|
|     SR CIT CHANGE|
|        NEW PARCEL|
| HISTORIC PROPERTY|
|     SEG/CODE CHNG|
| S.C. FROZEN VALUE|
|       SEGREGATION|
|        OPEN SPACE|
|       MAINTENANCE|
|  JULY BOARD ORDER|
|DESTROYED PROPERTY|
|     KILL BY MERGE|
|  CORRECTION BOARD|
|                  |
|   SEG/STATUS CHNG|
| STATE BOARD ORDER|
|   CO-OP SR CIT EX|
|      CONDEMNATION|
|     OMITTED ASSMT|
|HOME IMP EXE

In [126]:
query = '''SELECT * 
            FROM values 
            WHERE TaxYr = 2018 and Reason = 'NEW PARCEL'
            '''

this_house = spark.sql(query).show(9999999)

+------+-----+-----+------+-----------+-----------+-----------+-------+-------+------------+---------+--------+-------------------+-----------+----------+---------+----------+--------+
| Major|Minor|TaxYr|OmitYr|ApprLandVal|ApprImpsVal|ApprImpIncr|LandVal|ImpsVal|TaxValReason|TaxStatus|LevyCode|         ChangeDate|ChangeDocId|    Reason|SplitCode|       PIN|TotalVal|
+------+-----+-----+------+-----------+-----------+-----------+-------+-------+------------+---------+--------+-------------------+-----------+----------+---------+----------+--------+
|192205| 9466| 2018|     0|       1000|          0|          0|   1000|      0|            |        T|    1525|2017-05-02 00:00:00|    D001005|NEW PARCEL|        0|1922059466|    1000|
|105650|  350| 2018|     0|          0|          0|          0|      0|      0|            |        T|    1518|2017-04-10 00:00:00|    D000832|NEW PARCEL|        0|1056500350|       0|
| 82505| 9361| 2018|     0|     171000|          0|          0| 171000|    

In [127]:
cap_hill = query = '''SELECT * 
            FROM values 
            WHERE PIN = '6852700555' 
            '''

this_house = spark.sql(query).show(9999)

+------+-----+-----+------+-----------+-----------+-----------+-------+-------+------------+---------+--------+-------------------+-----------+-----------+---------+----------+--------+
| Major|Minor|TaxYr|OmitYr|ApprLandVal|ApprImpsVal|ApprImpIncr|LandVal|ImpsVal|TaxValReason|TaxStatus|LevyCode|         ChangeDate|ChangeDocId|     Reason|SplitCode|       PIN|TotalVal|
+------+-----+-----+------+-----------+-----------+-----------+-------+-------+------------+---------+--------+-------------------+-----------+-----------+---------+----------+--------+
|685270|  555| 1995|     0|          0|          0|          0|  98000|  28400|            |        T|      10|1994-11-01 00:00:00|           |    REVALUE|        0|6852700555|  126400|
|685270|  555| 1993|     0|          0|          0|          0|  89100|  48200|            |        T|      10|1992-05-08 00:00:00|           |    REVALUE|        0|6852700555|  137300|
|685270|  555| 1991|     0|          0|          0|          0|  65600

In [129]:
type(this_house)

NoneType

In [130]:
cap_hill = query = '''SELECT * 
            FROM values 
            WHERE PIN = '6852700559' 
            '''

this_house = spark.sql(query).show(9999)

+------+-----+-----+------+-----------+-----------+-----------+-------+-------+------------+---------+--------+-------------------+-----------+-----------+---------+----------+--------+
| Major|Minor|TaxYr|OmitYr|ApprLandVal|ApprImpsVal|ApprImpIncr|LandVal|ImpsVal|TaxValReason|TaxStatus|LevyCode|         ChangeDate|ChangeDocId|     Reason|SplitCode|       PIN|TotalVal|
+------+-----+-----+------+-----------+-----------+-----------+-------+-------+------------+---------+--------+-------------------+-----------+-----------+---------+----------+--------+
|685270|  559| 2020|     0|     160000|     349000|          0| 160000| 349000|            |        T|      10|2018-11-08 00:00:00|    E003464|SEGREGATION|        0|6852700559|  509000|
|685270|  559| 2019|     0|     160000|     349000|          0| 160000| 349000|            |        T|      10|2018-11-08 00:00:00|    E003464|SEGREGATION|        0|6852700559|  509000|
|685270|  559| 2018|     0|     141000|          0|          0| 141000