# PLSE ETL - Part 1: "13 years of data" to DB

In [1]:
import pandas as pd # used only to convert xlsx to csv

In [2]:
from functional import seq

In [3]:
from pyspark.sql import SparkSession

## 1.A

In [4]:
spark = SparkSession \
        .builder \
        .appName('plse') \
        .getOrCreate()

**While using this notebook as a development tool, if possible, AVOID redeclaration of `case_data`**

It takes some time to load the 3m+ rows x N ~ 0.5G (20-30min)

In [None]:
case_data = pd.ExcelFile('../tmp/PA2459713_Philadelphia_CaseData_Deliverable.xlsx')

In [5]:
def extract(x):
    f = '../tmp/case_data_{}.csv'.format(x)
    #df = pd.read_excel(case_data, 'Case Data {}'.format(x))
    #df.to_csv(f)
    
    return spark.read.format('csv').option('header', 'true').load(f)

**While using this notebook as a development tool, if possible, AVOID redeclaration of `case_data_dfs`**

In [6]:
case_data_dfs = [extract(i) for i in seq(1, 2, 3, 4)]

___________________________________________

## 1.B

**Create "Wide" CaseData Spark Dataframe**

In [17]:
df = spark.createDataFrame([],case_data_dfs[0].schema)

for _df in case_data_dfs:
    df = df.union(_df)
    _df.toPandas().to_csv('../tmp/processed_case_data.csv', chunksize=1000000)

Py4JJavaError: An error occurred while calling o30.collectToPython.
: java.lang.OutOfMemoryError: Java heap space


In [14]:
df_count = df.count()

In [16]:
assert df_count == 3668286, 'CaseData Dataframe Corrupt'

**In-memory lazy analysis**

Note: ATM in the code `df` is a spark dataframe. It contains ALL of the the case data.

In [10]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- CountyName: string (nullable = true)
 |-- DocketNumber: string (nullable = true)
 |-- FiledDate: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- MiddleName: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCode: string (nullable = true)
 |-- OffenseTrackingNumber: string (nullable = true)
 |-- GenderCode: string (nullable = true)
 |-- RaceCode: string (nullable = true)
 |-- BirthDate: string (nullable = true)
 |-- OriginatingOffenseSequence: string (nullable = true)
 |-- StatuteType: string (nullable = true)
 |-- StatuteTitle: string (nullable = true)
 |-- StatuteSection: string (nullable = true)
 |-- StatuteSubSection: string (nullable = true)
 |-- InchoateStatuteTitle: string (nullable = true)
 |-- InchoateStatuteSection: string (nullable = true)
 |-- InchoateStatuteSubSection: string (nullable = true)
 |-- OffenseDisposi

*Note: `df.OffenseTrackingNumber` is a string - see `df.printSchema()` stdout for more info*

In [11]:
null_otn = df.filter(df.OffenseTrackingNumber != "").count() # rows in the in-memory dataframe without a OTN value

Q: How many rows are have "no value" for OTN?

A: 1.75% (of 3668286 rows)

In [12]:
print(df_count / null_otn, '%')

1.1754866689760124 %


*Note: if you uncomment then run the block below you'll see the first five row of the in-memory database*

In [15]:
# print(df.show(5))

Error below occurs when the docker engine is out of resources.

Py4JJavaError: An error occurred while calling o61.collectToPython.
: java.lang.OutOfMemoryError: GC overhead limit exceeded

You have to increase resource allocation your machine provides to the docker engine.

My OS is Ubuntu 18.04 

```
$ docker info
...
WARNING: No swap limit support

$ echo 'GRUB_CMDLINE_LINUX="cgroup_enable=memory swapaccount=1"' >> /etc/default/grub
-OR edit the file however-
$ vi /etc/default/grub

$ sudo update-grub
```

Restart system then Error is not thrown


In [15]:
df.toPandas().to_csv('../tmp/processed_case_data.csv')

Py4JJavaError: An error occurred while calling o61.collectToPython.
: java.lang.IllegalStateException: SparkContext has been shutdown
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2053)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


**Load into PostgresSQL Database**

In [None]:
# ...TDB, where will this data persist?

In [9]:
df.show(5)

+---+----------+------------+---------+--------+---------+----------+----+-----+-------+---------------------+----------+--------+---------+--------------------------+-----------+------------+--------------+-----------------+--------------------+----------------------+-------------------------+------------------+-----------+----------------------+------------------+---------------+-------------------+------------+--------------+
|_c0|CountyName|DocketNumber|FiledDate|LastName|FirstName|MiddleName|City|State|ZipCode|OffenseTrackingNumber|GenderCode|RaceCode|BirthDate|OriginatingOffenseSequence|StatuteType|StatuteTitle|StatuteSection|StatuteSubSection|InchoateStatuteTitle|InchoateStatuteSection|InchoateStatuteSubSection|OffenseDisposition|OffenseDate|OffenseDispositionDate|OffenseDescription|CaseDisposition|CaseDispositionDate|OffenseGrade|DisposingJudge|
+---+----------+------------+---------+--------+---------+----------+----+-----+-------+---------------------+----------+--------+----