# Processing the CoreLogic Data using PySpark

In this Jupyter notebook, we demonstrate how you can process the CoreLogic data using PySpark. We will show you how to import the data, explore rows and columns, how to filter for a given location, and how to write a subset of the data to a csv file for later use. This [PySpark cheat sheet](http://datacamp-community-prod.s3.amazonaws.com/acfa4325-1d43-4542-8ce4-bea2d287db10) provides a great overview of available PySpark functionality. 

The notebook was created using a 'Jupyter + Spark Basic session' in [Open on Demand](https://arc.umich.edu/open-ondemand/) (OOD) on [Great Lakes](https://arc.umich.edu/greatlakes/) (GL). This automatically initializes Spark in the background.

If you are not running the notebook using OOD on GL, you will most likely have to make sure PySpark is installed on your system and then initialize it by typing 

```from pyspark import SparkContext```

```sc = SparkContext(master = 'local[2]')```

If you encounter any errors or have questions about this Jupyter notebook, or if you would like us to add another PySpark example using the CoreLogic data, feel free to reach out to [Armand Burks](mailto:arburks@umich.edu) and [Jule Kr√ºger](mailto:julianek@umich.edu).

## Importing the CoreLogic data with PySpark

The CoreLogic data are stored in a Turbo volume on the ```/nfs``` drive. To get access, you will need to sign a Memorandum of Understanding (MOU) with the University of Michigan Library. For more information about this, see [here](https://github.com/arc-ts/corelogic-on-greatlakes/tree/main/intro-to-corelogic-data). To execute this notebook, you will have to be granted access to the CoreLogic data on Turbo.

The raw data come in three separate files: deeds (28GB), foreclosures (6GB), and taxes (24GB). [CSCAR](https://cscar.research.umich.edu/) pre-processed the CoreLogic data and stored each raw file in 100 separate partitions. You can read more about this methodology in ```nfs/turbo/lib-data-corelogic/Docs/cscar_data.txt```.

We are going to work with the pre-processed data to improve import speeds. Let's store the paths to the pre-processed partitioned files in three separate variables:

In [1]:
turbo_path_fcl = "/nfs/turbo/lib-data-corelogic/Data/fcl/*.gz"
turbo_path_deed = "/nfs/turbo/lib-data-corelogic/Data/deed/*.gz"
turbo_path_tax = "/nfs/turbo/lib-data-corelogic/Data/tax/*.gz"

The following command imports a set of partitioned CoreLogic data file into a dataframe (df). You can switch between foreclosures, deeds, and taxes by choosing the relevant path accordingly. In each text file, the pipe ```|``` was used as a delimiter and is set  in the ```sep``` argument as such.

__A note on import speeds__: Because of their size, it takes a little while to read in each partitioned data file collection (foreclosures about 50 seconds, taxes about 4.5 minutes, deeds about 3.5 minutes). Importing the raw data files takes much longer, e.g., foreclosures about 6.5 minutes. Using the pre-processed data from CSCAR greatly improves import speeds as Spark can spread computation across multiple cores.

Let's work with the deed data for now:

In [2]:
df = spark.read.csv(turbo_path_deed, inferSchema=True, header=True, sep="|")

Let's explore the data a bit. We want to know how many rows and columns there are.

In [3]:
print((df.count(), len(df.columns))) ## print number of rows and columns

(367782480, 97)


Let's get the column names.

In [4]:
df.columns

['FIPS',
 'APN (Parcel Number) (unformatted)',
 'PCL ID IRIS FORMATTED',
 'APN SEQUENCE NUMBER',
 'PENDING RECORD INDICATOR',
 'CORPORATE INDICATOR',
 'OWNER FULL NAME',
 'OWNER 1 LAST NAME',
 'OWNER 1 FIRST NAME & M I',
 'OWNER 2 LAST NAME',
 'OWNER 2 FIRST NAME & MI',
 'OWNER ETAL INDICATOR',
 'C/O NAME',
 'OWNER RELATIONSHIP RIGHTS CODE',
 'OWNER RELATIONSHIP TYPE',
 'PARTIAL INTEREST INDICATOR',
 'ABSENTEE OWNER STATUS',
 'PROPERTY LEVEL LATITUDE',
 'PROPERTY LEVEL LONGITUDE',
 'SITUS HOUSE NUMBER PREFIX',
 'SITUS HOUSE NUMBER',
 'SITUS HOUSE NUMBER SUFFIX',
 'SITUS DIRECTION',
 'SITUS STREET NAME',
 'SITUS MODE',
 'SITUS QUADRANT',
 'SITUS APARTMENT UNIT',
 'SITUS CITY',
 'SITUS STATE',
 'SITUS ZIP CODE',
 'SITUS CARRIER CODE',
 'MAILING HOUSE NUMBER PREFIX',
 'MAILING HOUSE NUMBER',
 'MAILING HOUSE NUMBER SUFFIX',
 'MAILING DIRECTION',
 'MAILING STREET NAME',
 'MAILING MODE',
 'MAILING QUADRANT',
 'MAILING APARTMENT UNIT',
 'MAILING PROPERTY CITY',
 'MAILING PROPERTY STATE',
 'MA

Let's print a select few rows and columns of the dataframe:

In [5]:
df.select('FIPS', 'SITUS CITY', 'SITUS STATE', 'SITUS ZIP CODE').show()

+-----+----------+-----------+--------------+
| FIPS|SITUS CITY|SITUS STATE|SITUS ZIP CODE|
+-----+----------+-----------+--------------+
|12099|      null|       null|          null|
|17097|      null|       null|          null|
|97200|      null|       null|          null|
|97200|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|97199|      null|       null|          null|
|17097|      null|       null|    

Let's assume we wanted to filter the CoreLogic data only for Wayne county, MI. The FIPS code for Wayne county is '26163' ([source](https://mi.postcodebase.com/county/26163)).

In [6]:
WayneCsubset = df.filter(df.FIPS==26163)

Let's print a few rows and columns of the subset. As you can see, we are only dealing with Wayne county now.

In [7]:
WayneCsubset.select('FIPS', 'SITUS CITY', 'SITUS STATE', 'SITUS ZIP CODE').show()

+-----+----------+-----------+--------------+
| FIPS|SITUS CITY|SITUS STATE|SITUS ZIP CODE|
+-----+----------+-----------+--------------+
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|          null|
|26163|      null|       null|    

How many rows are in the Wayne county subset? Use the count() method

In [8]:
WayneCsubset.count()

2190976

There are many different cities within Wayne county. Let's get a list of all of them:

In [9]:
WayneCsubset.select('SITUS CITY').distinct().collect()

[Row(SITUS CITY='LINCOLN PARK'),
 Row(SITUS CITY='MELVINDALE'),
 Row(SITUS CITY='NORTHVILLE'),
 Row(SITUS CITY='TRENTON'),
 Row(SITUS CITY='YPSILANTI'),
 Row(SITUS CITY='BELLEVILLE'),
 Row(SITUS CITY='OAK PARK'),
 Row(SITUS CITY=None),
 Row(SITUS CITY='RIVERVIEW'),
 Row(SITUS CITY='GROSSE POINTE WOODS'),
 Row(SITUS CITY='FLAT ROCK'),
 Row(SITUS CITY='GROSSE ILE'),
 Row(SITUS CITY='GROSSE POINTE PARK'),
 Row(SITUS CITY='WARREN'),
 Row(SITUS CITY='GIBRALTAR'),
 Row(SITUS CITY='GROSSE POINTE'),
 Row(SITUS CITY='MONROE'),
 Row(SITUS CITY='WYANDOTTE'),
 Row(SITUS CITY='HIGHLAND'),
 Row(SITUS CITY='WOODHAVEN'),
 Row(SITUS CITY='INKSTER'),
 Row(SITUS CITY='SOUTHGATE'),
 Row(SITUS CITY='SUMPTER TWP'),
 Row(SITUS CITY='GARDEN CITY'),
 Row(SITUS CITY='HIGHLAND PARK'),
 Row(SITUS CITY='DETROIT'),
 Row(SITUS CITY='FERNDALE'),
 Row(SITUS CITY='CHELSEA'),
 Row(SITUS CITY='BROWNSTOWN TWP'),
 Row(SITUS CITY='NEW HUDSON'),
 Row(SITUS CITY='LANSING'),
 Row(SITUS CITY='DEARBORN HEIGHTS'),
 Row(SITUS CITY

Let's create a subset for Detroit only. We will filter using the 'SITUS CITY' variable. This variables designates the city associated with the property address. 

Note that this is a straightforward filtering method if using the Wayne county subset we created previously. 

In [10]:
detroit = WayneCsubset.filter(WayneCsubset["SITUS CITY"]=="DETROIT")

If you wanted to create a Detroit subset from the entire dataset, we suggest filtering on 'SITUS CITY' and 'SITUS STATE'. When using the entire dataframe, filtering on 'SITUS CITY' alone could run into city name ambiguity across states. 

In [11]:
detroit2 = df.filter((df["SITUS CITY"]=="DETROIT") & (df["SITUS STATE"]=="MI"))

How many rows are in the Detroit subset?

In [12]:
detroit.count()

1129420

Do the two filtering methods yield the same number of rows?

In [13]:
detroit.count()==detroit2.count()

False

Ok, this is a problem. There might be an issue with missing values. This needs to be explored further (TODO).

In [14]:
detroit.select('FIPS', 'SITUS CITY', 'SITUS STATE', 'SITUS ZIP CODE').show()

+-----+----------+-----------+--------------+
| FIPS|SITUS CITY|SITUS STATE|SITUS ZIP CODE|
+-----+----------+-----------+--------------+
|26163|   DETROIT|         MI|     482013148|
|26163|   DETROIT|         MI|     482013148|
|26163|   DETROIT|         MI|     482013148|
|26163|   DETROIT|         MI|     482013148|
|26163|   DETROIT|         MI|     482012463|
|26163|   DETROIT|         MI|         48202|
|26163|   DETROIT|         MI|         48202|
|26163|   DETROIT|         MI|         48202|
|26163|   DETROIT|         MI|     482022828|
|26163|   DETROIT|         MI|     482022828|
|26163|   DETROIT|         MI|     482022828|
|26163|   DETROIT|         MI|     482022828|
|26163|   DETROIT|         MI|     482022828|
|26163|   DETROIT|         MI|         48202|
|26163|   DETROIT|         MI|     482021302|
|26163|   DETROIT|         MI|     482021368|
|26163|   DETROIT|         MI|     482021368|
|26163|   DETROIT|         MI|     482021368|
|26163|   DETROIT|         MI|    

This looks like what we are interested in. Let's write the Detroit data to a new csv file in an ```output/``` folder in our ```home/[uniqname]/``` directory. (Note: You probably need to change this path, depending on where you store this notebook.) By default, Spark writes big data into multiple files to optimize computation times during import and export. This is recommended practice, especially if the data is big, i.e., if it has many rows and/or columns.

The below command writes 100 partitions of the Detroit data to the designated folder. 

_Note_: If you saved data to this path before, you will need to add the argument ```overwrite=True``` to the function call.

In [15]:
detroit.write.csv("../../output/corelogic_data_deeds_Detroit_partitioned")

There is also a way to force the data into one single file. Sometimes, a single file is needed to continue processing the data with other research software. Saving the data into one file is a little risky and might not always work, so caution is advised. 

The following command combines the data into one core (Caution: it is very slow). If the data does not fit, it will throw an error. _Note_: If you saved data to this path before, you will need to add the argument ```overwrite=True``` to the function call. The overwrite argument would overwrite the previously created ```output/``` directory.

Note: You would only need to run this, if you indeed need only one single data file. Many research programs allow you to read in and combine multiple data files into one object. See for example the [glob method in Python](https://docs.python.org/3/library/glob.html).

In [None]:
detroit.coalesce(1).write.csv("../../output/corelogic_data_deeds_Detroit_singlefile")

TODO: show how to draw and save just a sample of the Detroit data (n=1000) that is small enough to use for writing and testing code for analyzing the Detroit subset.

In [None]:
#sample and save