In [1]:
import pandas as pd
import numpy as np
import pyspark

# Establish Spark Context
`local[*]` lets pyspark know that we're using only one machine, as opposed to a set of machines in a cluster, etc.

The * tells `pyspark` to use all the cores on your machine.

It's important to not run more than once because once it's running, you'd have to kill the old SparkContext and start again. 

In [2]:
sc = pyspark.SparkContext('local[*]')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/16 16:25:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark = pyspark.sql.SparkSession(sc)

# Spark DataFrames
Spark DataFrames are quite similar to Pandas DataFrames, but a they're *distributed* which means that they're broken up into pieces. When you run a command such as collect or show, they're assembled together and move from memory to your disk. 

The general idea with Spark's distributed data is that operations should keep data in RAM as much as possible, and have very little written to disk. 

This coupled with the fact that the data is broken into pieces makes Spark really fast. Distributed data can be worked on with multiple threads.

In [4]:
df = spark.read.csv('../data/01_raw/NFIP/nfip-flood-policies.csv', header=True)

# Operating on data with Spark
When working with data in Spark, almost everything possible with Pandas is possible with Spark. 

We are using DataFrames instead of RDDs (I honestly don't know much about them but it seems like DataFrames play better with .csv files and I'm already comfortable with them), so we're relying on PySpark SQL. 

This means that instead of DataFrame based terminology and commands, we use SQL like functions. `where`, `select`, and `like` all play with DataFrames in a unique syntax that combines SQL and Pandas.

It's not too difficult to pick up (at least, I don't think so).

In [5]:
# Getting column names from schema
schema = df.schema
col_names = [str(col_data).split("'")[1] for col_data in schema]

# Finding which columns will be most useful
Given that the total number of rows is 50406943, we will try to find which columns have the most non-null rows so that they can be used for the model

In [6]:
col_dict = {}
for col in col_names:
    col_count = df.select(col).na.drop().count()
    col_dict[col] = col_count

[Stage 1:>                                                         (0 + 8) / 97]

[Stage 1:>                                                        (0 + 10) / 97][Stage 1:====>                                                     (8 + 8) / 97]



























































                                                                                

[Stage 4:>                                                         (0 + 8) / 97]

[Stage 4:=>                                                        (2 + 8) / 97][Stage 4:==>                                                       (5 + 8) / 97]

[Stage 4:====>                                                     (8 + 8) / 97]

















































                                                                                

[Stage 7:>                                                         (0 + 8) / 97]

[Stage 7:=>                                                        (2 + 8) / 97][Stage 7:====>                                                     (7 + 8) / 97]

[Stage 7:====>                                                     (8 + 8) / 97]













































                                                                                

[Stage 10:>                                                        (0 + 8) / 97]

[Stage 10:=>                                                       (3 + 8) / 97][Stage 10:====>                                                    (8 + 8) / 97]





















































                                                                                

[Stage 13:>                                                        (0 + 8) / 97]

[Stage 13:>                                                        (1 + 8) / 97][Stage 13:====>                                                    (8 + 8) / 97]







































                                                                                

[Stage 16:>                                                        (0 + 8) / 97]

[Stage 16:>                                                        (0 + 9) / 97][Stage 16:====>                                                    (7 + 8) / 97]

[Stage 16:====>                                                    (8 + 8) / 97]





































                                                                                

[Stage 19:>                                                        (0 + 8) / 97]

[Stage 19:==>                                                      (4 + 8) / 97][Stage 19:====>                                                    (8 + 8) / 97]



















































                                                                                

[Stage 22:>                                                        (0 + 8) / 97]

[Stage 22:=>                                                       (3 + 8) / 97][Stage 22:====>                                                    (7 + 8) / 97]

[Stage 22:====>                                                    (8 + 8) / 97]























































                                                                                

[Stage 25:>                                                        (0 + 8) / 97]

[Stage 25:===>                                                     (6 + 8) / 97][Stage 25:====>                                                    (7 + 8) / 97]

[Stage 25:====>                                                    (8 + 8) / 97]









































































[Stage 28:>                                                        (0 + 8) / 97]

[Stage 28:==>                                                      (4 + 8) / 97][Stage 28:====>                                                    (8 + 8) / 97]

















































                                                                                

[Stage 31:>                                                        (0 + 8) / 97]

[Stage 31:=>                                                       (2 + 8) / 97][Stage 31:====>                                                    (8 + 8) / 97]





























































                                                                                

[Stage 34:>                                                        (0 + 8) / 97]

[Stage 34:==>                                                      (4 + 8) / 97][Stage 34:====>                                                    (8 + 8) / 97]

















































                                                                                

[Stage 37:>                                                        (0 + 8) / 97]

[Stage 37:>                                                        (1 + 8) / 97][Stage 37:====>                                                    (7 + 8) / 97]

[Stage 37:====>                                                    (8 + 8) / 97]



















23/04/16 16:33:39 ERROR Executor: Exception in task 54.0 in stage 37.0 (TID 1231)
java.io.FileNotFoundException: 
File file:/home/jerome/learning/school/MACHINE LEARNING CSE4309/final project/forest-for-flood/data/NFIP/nfip-flood-policies.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNe

Py4JJavaError: An error occurred while calling o147.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 37.0 failed 1 times, most recent failure: Lost task 54.0 in stage 37.0 (TID 1231) (Starscream executor driver): java.io.FileNotFoundException: 
File file:/home/jerome/learning/school/MACHINE LEARNING CSE4309/final project/forest-for-flood/data/NFIP/nfip-flood-policies.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1589)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: 
File file:/home/jerome/learning/school/MACHINE LEARNING CSE4309/final project/forest-for-flood/data/NFIP/nfip-flood-policies.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1589)


In [91]:
for col in col_names:
    print(f"{col} percentage: {col_dict[col] * 100/50406943}")

agriculturestructureindicator percentage: 22.781841779216805
basefloodelevation percentage: 33.26959145290759
basementenclosurecrawlspacetype percentage: 99.99840894933858
cancellationdateoffloodpolicy percentage: 13.476091974075873
censustract percentage: 99.07330424699629
condominiumindicator percentage: 99.99998809687784
construction percentage: 99.99997420990199
countycode percentage: 99.90279315291943
crsdiscount percentage: 100.0
deductibleamountinbuildingcoverage percentage: 98.68670274251704
deductibleamountincontentscoverage percentage: 88.96663104525105
elevatedbuildingindicator percentage: 99.99948816574732
elevationcertificateindicator percentage: 35.31367891125633
elevationdifference percentage: 100.0
federalpolicyfee percentage: 100.0
floodzone percentage: 99.664441067176
hfiaasurcharge percentage: 100.0
houseofworshipindicator percentage: 31.604162148853185
latitude percentage: 99.32807073819176
locationofcontents percentage: 69.46895390978183
longitude percentage: 99.32