In [83]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DateType

In [84]:
######################### Initialize SparkSession #########################
spark = SparkSession \
    .builder \
    .appName("SparkSQL") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Import data set

In [85]:
######################### Import data set  #########################
# cases has basic information about each case. (One row for each case)
# addresses has address of the properties that the case concerns. (Can be one or more entries for each case)
######################### Import oca_index data set  #########################

# load cases and addresses tables
eviction_cases = spark.read.format('csv').options(
    header='true', inferschema='true').load("data/eviction_cases.csv")
eviction_addresses = spark.read.format('csv').options(
    header='true', inferschema='true').load("data/eviction_addresses.csv")
covid_cases = spark.read.format('csv').options(
    header='true', inferschema='true').load("data/cases-by-day.csv")


nypd_data_2020 = spark.read.format('csv').options(
    header='true', inferschema='true').load("data/NYPD_Complaint_Data_2020.csv")
nypd_data_2021 = spark.read.format('csv').options(
    header='true', inferschema='true').load("data/NYPD_Complaint_Data_2021.csv")

covid_cases = covid_cases.withColumn("date_of_interest",to_timestamp("date_of_interest", "M/dd/yyyy"))
nypd_data_2020 = nypd_data_2020.withColumn("CMPLNT_FR_DT",to_timestamp("CMPLNT_FR_DT", "M/dd/yyyy"))
nypd_data_2021 = nypd_data_2021.withColumn("CMPLNT_FR_DT",to_timestamp("CMPLNT_FR_DT", "M/dd/yyyy"))




# create temp view for spark sql 
eviction_cases.createOrReplaceTempView("eviction_cases")
eviction_addresses.createOrReplaceTempView("eviction_addresses")
covid_cases.createOrReplaceTempView("covid_cases")
nypd_data_2020.createOrReplaceTempView("nypd_data_2020")
nypd_data_2021.createOrReplaceTempView("nypd_data_2021")

# print schema
eviction_cases.printSchema()
eviction_addresses.printSchema()
covid_cases.printSchema()
nypd_data_2020.printSchema()
nypd_data_2021.printSchema()






root
 |-- indexnumberid: string (nullable = true)
 |-- court: string (nullable = true)
 |-- fileddate: string (nullable = true)
 |-- propertytype: string (nullable = true)
 |-- classification: string (nullable = true)
 |-- specialtydesignationtypes: string (nullable = true)
 |-- status: string (nullable = true)
 |-- disposeddate: string (nullable = true)
 |-- disposedreason: string (nullable = true)
 |-- firstpaper: string (nullable = true)
 |-- primaryclaimtotal: string (nullable = true)
 |-- dateofjurydemand: string (nullable = true)

root
 |-- indexnumberid: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postalcode: string (nullable = true)

root
 |-- date_of_interest: timestamp (nullable = true)
 |-- CASE_COUNT: integer (nullable = true)
 |-- PROBABLE_CASE_COUNT: integer (nullable = true)
 |-- CASE_COUNT_7DAY_AVG: integer (nullable = true)
 |-- ALL_CASE_COUNT_7DAY_AVG: integer (nullable = true)
 |-- BX_CASE_COUNT: integer (nul

## Total filling of eviction in NYC after NYC lockdown (03/20/2020)

In [86]:
######################### Total filling of eviction in NYC after NYC lockdown (03/20/2020) #########################

# These classifications are consider eviction fillings: Holdover, Non-Payment

# These court are in the NYC: Bronx County Civil Court, Kings County Civil Court, New York County Civil Court, Queens County Civil Court, 
# Richmond County Civil Court, Redhook Community Justice Center and Harlem Community Justice Center

# We only retreive the cases after 03/20/2020 when NYC declared a city lockdown 



######################### Total filling of eviction in NYC after NYC lockdown (03/20/2020) #########################
query = """
select count(*)
from eviction_cases
where fileddate > '2020-03-20'
  and classification in ('Holdover','Non-Payment')
  and court in (
					'Bronx County Civil Court',
					'Kings County Civil Court',
					'New York County Civil Court',
					'Queens County Civil Court',
					'Richmond County Civil Court',
					'Redhook Community Justice Center',
					'Harlem Community Justice Center'
				)

"""
total_eviction_filling_after_lockdown = spark.sql(query).show()

+--------+
|count(1)|
+--------+
|   47949|
+--------+



## Per case filed date and disposed date

In [87]:
######################### case file date and disposed date #########################

# Each row represents each eviction case's fileddate, disposeddate, week_filed and week_disposed
# This is used for later data cleanning and integration

######################### case file date and disposed date #########################
query_after_lockdown_eviction_cases = """
select fileddate,
       disposeddate,
       cast(date_trunc('week', fileddate) as date)    as week_filed,
       cast(date_trunc('week', disposeddate) as date) as week_disposed
from eviction_cases
where classification in ('Holdover', 'Non-Payment')
  and court in ('Bronx County Civil Court',
                'Kings County Civil Court',
                'New York County Civil Court',
                'Queens County Civil Court',
                'Richmond County Civil Court',
                'Redhook Community Justice Center',
                'Harlem Community Justice Center')
  and fileddate > '2020-03-20'
  --and propertytype = 'Residential' # commented out to show Statewide evictions, which includes commercial
order by fileddate asc
"""
after_lockdown_eviction_cases = spark.sql(query_after_lockdown_eviction_cases)
after_lockdown_eviction_cases.show()

+----------+------------+----------+-------------+
| fileddate|disposeddate|week_filed|week_disposed|
+----------+------------+----------+-------------+
|2020-03-24|        null|2020-03-23|         null|
|2020-04-14|  2021-03-08|2020-04-13|   2021-03-08|
|2020-04-14|        null|2020-04-13|         null|
|2020-04-17|        null|2020-04-13|         null|
|2020-05-12|        null|2020-05-11|         null|
|2020-05-12|        null|2020-05-11|         null|
|2020-05-12|        null|2020-05-11|         null|
|2020-05-12|        null|2020-05-11|         null|
|2020-06-15|        null|2020-06-15|         null|
|2020-06-23|        null|2020-06-22|         null|
|2020-06-23|        null|2020-06-22|         null|
|2020-06-25|        null|2020-06-22|         null|
|2020-06-25|        null|2020-06-22|         null|
|2020-06-26|        null|2020-06-22|         null|
|2020-06-26|        null|2020-06-22|         null|
|2020-06-29|        null|2020-06-29|         null|
|2020-06-30|        null|2020-0

## Per week case filed date and disposed date with running sum and total active cases

In [88]:
######################### case file date and disposed date with running sum and total active cases #########################

# Each row represents each week (starting from 03/20/2020), the number of cases has been filed, the number of cases has been disposed,
# the cumulative cases for each one
# and the total active cases (filed_cases - disposed_cases)

# Will draw a time series graph using this data

######################### case file date and disposed date with running sum and total active cases #########################
query_eviction_cases_time_and_summary = """
with after_lockdown as (
    select fileddate,
           cast(date_trunc('week', fileddate) as date)    as week_filed,
           disposeddate,
           cast(date_trunc('week', disposeddate) as date) as week_disposed
    from eviction_cases
    where classification in ('Holdover', 'Non-Payment')
      and court in ('Bronx County Civil Court',
                    'Kings County Civil Court',
                    'New York County Civil Court',
                    'Queens County Civil Court',
                    'Richmond County Civil Court',
                    'Redhook Community Justice Center',
                    'Harlem Community Justice Center')
      and fileddate > '2020-01-01'
    order by fileddate asc),

     group_by_week as (
         select week_filed                                        as first_day_of_week,
                count(*) filter (where week_filed is not null)    as cases_filed,
                count(*) filter (where week_disposed is not null) as cases_disposed
         from after_lockdown
         group by week_filed
         order by week_filed)

select first_day_of_week,
       cases_filed,
       cases_disposed,
       sum(cases_filed) over (order by first_day_of_week)      as cumulative_cases_filed,
       sum(cases_disposed) over (order by first_day_of_week)   as cumulative_cases_disposed,
       (sum(cases_filed) over (order by first_day_of_week) -
        sum(cases_disposed) over (order by first_day_of_week)) as active_cases
from group_by_week
"""

eviction_cases_time_and_summary = spark.sql(query_eviction_cases_time_and_summary)
eviction_cases_time_and_summary.show()

+-----------------+-----------+--------------+----------------------+-------------------------+------------+
|first_day_of_week|cases_filed|cases_disposed|cumulative_cases_filed|cumulative_cases_disposed|active_cases|
+-----------------+-----------+--------------+----------------------+-------------------------+------------+
|       2019-12-30|       1137|           577|                  1137|                      577|         560|
|       2020-01-06|       3789|          1808|                  4926|                     2385|        2541|
|       2020-01-13|       3955|          1765|                  8881|                     4150|        4731|
|       2020-01-20|       3377|          1370|                 12258|                     5520|        6738|
|       2020-01-27|       2998|          1111|                 15256|                     6631|        8625|
|       2020-02-03|       3866|          1313|                 19122|                     7944|       11178|
|       2020-02-10|

## Covid case by week

In [89]:
query = """
select first_day_of_week, sum(CASE_COUNT) as covid_case_count
from
(select date_of_interest, cast(date_trunc('week', date_of_interest) as date) as first_day_of_week, CASE_COUNT
from covid_cases)
group by first_day_of_week
order by first_day_of_week
"""

covid_cases_time_and_summary = spark.sql(query)
covid_cases_time_and_summary.show()


+-----------------+----------------+
|first_day_of_week|covid_case_count|
+-----------------+----------------+
|       2020-02-24|               1|
|       2020-03-02|              45|
|       2020-03-09|            2932|
|       2020-03-16|           20476|
|       2020-03-23|           30148|
|       2020-03-30|           36098|
|       2020-04-06|           34177|
|       2020-04-13|           22964|
|       2020-04-20|           18276|
|       2020-04-27|           13109|
|       2020-05-04|            7894|
|       2020-05-11|            6678|
|       2020-05-18|            6004|
|       2020-05-25|            4128|
|       2020-06-01|            3142|
|       2020-06-08|            2387|
|       2020-06-15|            2272|
|       2020-06-22|            2188|
|       2020-06-29|            2201|
|       2020-07-06|            2514|
+-----------------+----------------+
only showing top 20 rows



In [90]:
covid_cases.printSchema()

root
 |-- date_of_interest: timestamp (nullable = true)
 |-- CASE_COUNT: integer (nullable = true)
 |-- PROBABLE_CASE_COUNT: integer (nullable = true)
 |-- CASE_COUNT_7DAY_AVG: integer (nullable = true)
 |-- ALL_CASE_COUNT_7DAY_AVG: integer (nullable = true)
 |-- BX_CASE_COUNT: integer (nullable = true)
 |-- BX_PROBABLE_CASE_COUNT: integer (nullable = true)
 |-- BX_CASE_COUNT_7DAY_AVG: integer (nullable = true)
 |-- BX_ALL_CASE_COUNT_7DAY_AVG: integer (nullable = true)
 |-- BK_CASE_COUNT: integer (nullable = true)
 |-- BK_PROBABLE_CASE_COUNT: integer (nullable = true)
 |-- BK_CASE_COUNT_7DAY_AVG: integer (nullable = true)
 |-- BK_ALL_CASE_COUNT_7DAY_AVG: integer (nullable = true)
 |-- MN_CASE_COUNT: integer (nullable = true)
 |-- MN_PROBABLE_CASE_COUNT: integer (nullable = true)
 |-- MN_CASE_COUNT_7DAY_AVG: integer (nullable = true)
 |-- MN_ALL_CASE_COUNT_7DAY_AVG: integer (nullable = true)
 |-- QN_CASE_COUNT: integer (nullable = true)
 |-- QN_PROBABLE_CASE_COUNT: integer (nullable = t

In [91]:
# type(covid_cases)
# covid_cases.printSchema()

## Case by zip code

In [92]:
######################### Case by zip code  #########################
query_eviction_cases_by_zipcode = """
with cases_zip as (select substr(postalcode, 1, 5) as zip_code
                   from eviction_cases
                            left join eviction_addresses on eviction_cases.indexnumberid = eviction_addresses.indexnumberid
                   where classification in ('Holdover', 'Non-Payment')
                     and court in ('Bronx County Civil Court',
                                   'Kings County Civil Court',
                                   'New York County Civil Court',
                                   'Queens County Civil Court',
                                   'Richmond County Civil Court',
                                   'Redhook Community Justice Center',
                                   'Harlem Community Justice Center')
                     and fileddate > '2020-03-20'
                     and postalcode is not null
                     and cast(substr(postalcode, 1, 5) as int) > 1
                     and cast(substr(postalcode, 1, 5) as int) < 20000
                   order by fileddate asc)

select zip_code,
       count(*) as total
from cases_zip
group by zip_code
order by zip_code
"""

eviction_cases_by_zipcode = spark.sql(query_eviction_cases_by_zipcode)
eviction_cases_by_zipcode.show()

+--------+-----+
|zip_code|total|
+--------+-----+
|   10001|  253|
|   10002|  221|
|   10003|  162|
|   10004|   24|
|   10005|   58|
|   10006|   18|
|   10007|   14|
|   10009|  119|
|   10010|   52|
|   10011|  219|
|   10012|  104|
|   10013|   64|
|   10014|  150|
|   10015|    1|
|   10016|  206|
|   10017|   86|
|   10018|  140|
|   10019|  372|
|   10020|    1|
|   10021|  117|
+--------+-----+
only showing top 20 rows



## NYPD Crime Data

In [93]:
query = """
select first_day_of_week, count(*) as case_count
from
(select CMPLNT_FR_DT, cast(date_trunc('week', CMPLNT_FR_DT) as date) as first_day_of_week, BORO_NM
from nypd_data_2020
union all
select CMPLNT_FR_DT, cast(date_trunc('week', CMPLNT_FR_DT) as date) as first_day_of_week, BORO_NM
from nypd_data_2021
)
where CMPLNT_FR_DT > '2020-01-01' and CMPLNT_FR_DT is not null
group by first_day_of_week
order by first_day_of_week asc
"""


nypd_data_result = spark.sql(query)
nypd_data_result.show()

+-----------------+----------+
|first_day_of_week|case_count|
+-----------------+----------+
|       2019-12-30|      4815|
|       2020-01-06|      8513|
|       2020-01-13|      8792|
|       2020-01-20|      8471|
|       2020-01-27|      8708|
|       2020-02-03|      8738|
|       2020-02-10|      8524|
|       2020-02-17|      8561|
|       2020-02-24|      8456|
|       2020-03-02|      8647|
|       2020-03-09|      8731|
|       2020-03-16|      6766|
|       2020-03-23|      5537|
|       2020-03-30|      5491|
|       2020-04-06|      5489|
|       2020-04-13|      5766|
|       2020-04-20|      6112|
|       2020-04-27|      6702|
|       2020-05-04|      6659|
|       2020-05-11|      7041|
+-----------------+----------+
only showing top 20 rows



In [94]:
query = """
select BORO_NM, count(*)
from
(select CMPLNT_FR_DT, BORO_NM
from nypd_data_2020
union all
select CMPLNT_FR_DT, BORO_NM
from nypd_data_2021
)
where CMPLNT_FR_DT > '2020-03-20' and CMPLNT_FR_DT is not null 
group by BORO_NM
"""


nypd_data_result_with_BORO_NM = spark.sql(query)
nypd_data_result_with_BORO_NM.show()

+-------------+--------+
|      BORO_NM|count(1)|
+-------------+--------+
|         null|     496|
|       QUEENS|   87823|
|     BROOKLYN|  116201|
|        BRONX|   88485|
|    MANHATTAN|   93416|
|STATEN ISLAND|   16692|
+-------------+--------+



## Export cleaned data

In [95]:
after_lockdown_eviction_cases
eviction_cases_by_zipcode.coalesce(1).write.format('com.databricks.spark.csv').option('header', 'true').mode('overwrite').save('data/cleaned_data/eviction_cases_by_zipcode')
eviction_cases_time_and_summary.coalesce(1).write.format('com.databricks.spark.csv').option('header', 'true').mode('overwrite').save('data/cleaned_data/eviction_cases_time_and_summary')
covid_cases_time_and_summary.coalesce(1).write.format('com.databricks.spark.csv').option('header', 'true').mode('overwrite').save('data/cleaned_data/covid_cases_time_and_summary')
nypd_data_result.coalesce(1).write.format('com.databricks.spark.csv').option('header', 'true').mode('overwrite').save('data/cleaned_data/nypd_data_cleaned')
nypd_data_result_with_BORO_NM.coalesce(1).write.format('com.databricks.spark.csv').option('header', 'true').mode('overwrite').save('data/cleaned_data/nypd_datawith_BORO_NM_cleaned')



In [96]:
# spark.stop()