In [1]:
import os
pyspark_submit_args = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

## setup
conf = SparkConf().setAppName("final_project")
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

def toFloatSafe(v):
    try:
        return float(v)
    except ValueError:
        return str(v) #if it is not a float type return as a string

# Mongo Import

In [3]:
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://52.53.176.185/msds697.case_clean")\
    .getOrCreate()

In [4]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df = df.drop("_id")

In [5]:
df.head()

Row(CaseID=2012194, Category='Abandoned Vehicle', Closing Time=3, Neighborhood='Outer Richmond', Police District='RICHMOND', Responsible Agency='DPT Abandoned Vehicles Work Queue', Source='Web')

### 1) Load Cleaned Data

In [6]:
# load and convert data
#filename = "./../data/311_Cases_small.csv"

#data_raw = sc.textFile(filename)\
#             .map(lambda x: x.split(","))

#data_raw = data_raw.map(lambda row:  [toFloatSafe(x) for x in row])

### 2) Create Data Frame

In [7]:
# define schema
#from pyspark.sql import Row
#from pyspark.sql.types import *
#from pyspark.sql import Row

#schema = StructType([
#    StructField("closing_time", FloatType(),False),
#     StructField("neighborhood", StringType(),False),
#     StructField("category", StringType(),False),
#     StructField("police_district", StringType(),False),
#    StructField("responsible_agency", StringType(), False),
#    StructField("source", StringType(), False)
#])

#df = ss.createDataFrame(data_raw.map(lambda x : Row(x[0],x[1],x[2],x[3],x[4], x[5])), schema)

In [8]:
#df.show()
#df.dtypes

### 3) Numericalize Categorical Variables

In [9]:
# String to Numbers
from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

In [10]:
dfnumeric = indexStringColumns(df, ['Neighborhood', 'Category', 'Police District', 'Responsible Agency', 'Source']) 

In [11]:
dfnumeric.filter("Neighborhood < 0.0").show() # no negatives, which is expected. This was a sanity check.

+------+------------+------------+--------+---------------+------------------+------+
|CaseID|Closing Time|Neighborhood|Category|Police District|Responsible Agency|Source|
+------+------------+------------+--------+---------------+------------------+------+
+------+------------+------------+--------+---------------+------------------+------+



In [12]:
dfnumeric = dfnumeric\
            .drop("CaseID")\
            .withColumnRenamed('Closing Time','Closing_Time')\
            .withColumnRenamed('Police District','Police_District')\
            .withColumnRenamed('Responsible Agency', 'Responsible_Agency')

In [13]:
dfnumeric.show()

+------------+------------+--------+---------------+------------------+------+
|Closing_Time|Neighborhood|Category|Police_District|Responsible_Agency|Source|
+------------+------------+--------+---------------+------------------+------+
|           3|         7.0|     2.0|            7.0|               2.0|   2.0|
|          31|        55.0|     1.0|            5.0|               0.0|   3.0|
|           1|        11.0|     3.0|            5.0|               0.0|   0.0|
|           0|         0.0|     0.0|            0.0|               0.0|   1.0|
|        1253|        25.0|     9.0|            9.0|              14.0|   1.0|
|           1|        11.0|     0.0|            5.0|               0.0|   0.0|
|          11|         0.0|     1.0|            0.0|               0.0|   1.0|
|           2|         5.0|     1.0|            7.0|               0.0|   1.0|
|           0|         0.0|     3.0|            0.0|               0.0|   0.0|
|           5|         2.0|    12.0|            9.0|

In [14]:
dfnumeric.filter("Neighborhood == 1").show()

+------------+------------+--------+---------------+------------------+------+
|Closing_Time|Neighborhood|Category|Police_District|Responsible_Agency|Source|
+------------+------------+--------+---------------+------------------+------+
|           6|         1.0|    22.0|            4.0|              23.0|   0.0|
|          14|         1.0|    19.0|            4.0|              73.0|   2.0|
|         324|         1.0|     5.0|            4.0|               4.0|   0.0|
|           3|         1.0|     5.0|            4.0|               4.0|   0.0|
|           0|         1.0|     0.0|            4.0|               0.0|   1.0|
|           2|         1.0|     5.0|            4.0|               4.0|   0.0|
|           0|         1.0|     4.0|            9.0|               0.0|   0.0|
|           4|         1.0|     3.0|            0.0|               0.0|   2.0|
|         170|         1.0|     1.0|            9.0|               0.0|   0.0|
|           0|         1.0|     4.0|            4.0|

In [15]:
# One-hot Encoding

from pyspark.ml.feature import OneHotEncoder
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        print(c + '-onehot')
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol = c, outputCol= c + '-onehot', dropLast = False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        #newdf.show()
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c + '-onehot', c)
    return newdf

dfhot = oneHotEncodeColumns(dfnumeric, ['Source', 'Responsible_Agency']) 

Source-onehot
Responsible_Agency-onehot


In [16]:
, 'Category', 'Police_District', 'Responsible_Agency', 'Source'
# not police district, category, police district, neighborhood

("'Category',", "'Police_District',", "'Responsible_Agency',", "'Source'")

In [17]:
dfhot.show()

+------------+------------+--------+---------------+-------------+------------------+
|Closing_Time|Neighborhood|Category|Police_District|       Source|Responsible_Agency|
+------------+------------+--------+---------------+-------------+------------------+
|           3|         7.0|     2.0|            7.0|(8,[2],[1.0])|   (355,[2],[1.0])|
|          31|        55.0|     1.0|            5.0|(8,[3],[1.0])|   (355,[0],[1.0])|
|           1|        11.0|     3.0|            5.0|(8,[0],[1.0])|   (355,[0],[1.0])|
|           0|         0.0|     0.0|            0.0|(8,[1],[1.0])|   (355,[0],[1.0])|
|        1253|        25.0|     9.0|            9.0|(8,[1],[1.0])|  (355,[14],[1.0])|
|           1|        11.0|     0.0|            5.0|(8,[0],[1.0])|   (355,[0],[1.0])|
|          11|         0.0|     1.0|            0.0|(8,[1],[1.0])|   (355,[0],[1.0])|
|           2|         5.0|     1.0|            7.0|(8,[1],[1.0])|   (355,[0],[1.0])|
|           0|         0.0|     3.0|            0.0|(8

### 4) Create a Feature Vector

In [18]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
input_cols= ["Responsible_Agency", "Source"]

#VectorAssembler takes a number of collumn names(inputCols) and output column name (outputCol)
#and transforms a DataFrame to assemble the values in inputCols into one single vector with outputCol.
va = VectorAssembler(outputCol="features", inputCols=input_cols)
#lpoints - labeled data.
lpoints = va.transform(dfhot).select("features", "Closing_Time").withColumnRenamed("Closing_Time", "label")

In [19]:
lpoints.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(363,[2,357],[1.0...|    3|
|(363,[0,358],[1.0...|   31|
|(363,[0,355],[1.0...|    1|
|(363,[0,356],[1.0...|    0|
|(363,[14,356],[1....| 1253|
|(363,[0,355],[1.0...|    1|
|(363,[0,356],[1.0...|   11|
|(363,[0,356],[1.0...|    2|
|(363,[0,355],[1.0...|    0|
|(363,[20,355],[1....|    5|
|(363,[23,355],[1....|    6|
|(363,[9,356],[1.0...|    0|
|(363,[5,355],[1.0...|   18|
|(363,[0,355],[1.0...|    1|
|(363,[8,355],[1.0...|    0|
|(363,[0,355],[1.0...|    0|
|(363,[5,355],[1.0...|    0|
|(363,[73,357],[1....|   14|
|(363,[0,357],[1.0...|  119|
|(363,[5,358],[1.0...|  137|
+--------------------+-----+
only showing top 20 rows



### 5) Train Model

In [20]:
# ******************* TEMP ******************* # (currently under construction, waiting for data)
# (later, divide the dataset by time, and have two files reading in,
#  instead of a train test split)

# Train test split

splits = lpoints.randomSplit([0.8, 0.2])

adulttrain = splits[0].cache()
adultvalid = splits[1].cache()

In [21]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrmodel = lr.fit(adulttrain)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrmodel.coefficients))
print("Intercept: %s" % str(lrmodel.intercept))

Coefficients: [-10.7119504508,-28.3294341603,-23.5739942193,127.080235426,0.0,22.1572039055,278.961646106,-19.0929013107,-11.6339191296,-23.538966951,-24.2161838353,-2.31169684702,-12.4211901431,-6.19601633375,37.189028387,-18.6731767588,-28.4006515988,84.4621621395,-17.7999064714,-25.2247246365,-3.90288353562,36.9037183844,129.718137095,24.959102422,93.9994468839,103.26713806,-21.4648341788,149.850988176,-22.0001269349,-23.5446698111,18.5843260821,37.2914337894,-1.51569755173,31.0309456175,-13.9135823848,-13.0737573786,81.5581142761,-21.8108399544,0.0,-14.8781855474,-21.8229726414,239.322516164,70.014200497,38.2116281336,60.9429493955,21.676829433,110.928321541,-16.8732250017,0.0,0.0,-12.5338321371,0.0,62.9937192095,8.70183899322,102.012787882,-10.5536462768,177.007289459,39.928513869,-23.6484177387,44.1939707332,13.0934385455,2.8415836651,25.1991267015,0.0,70.7312837791,-0.619270958526,39.1322515282,105.007366789,194.103527019,29.5500188373,60.5060987146,31.8085616352,149.728732478,0

### 6) Evaluate Model

In [27]:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from pyspark.mllib.evaluation import RegressionMetrics

# convert to RDD
parsedData = adultvalid.rdd

parsedData.take(1)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.0 failed 4 times, most recent failure: Lost task 0.3 in stage 32.0 (TID 185, ip-172-31-25-153.us-west-2.compute.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/root/appcache/application_1547758323016_0021/container_1547758323016_0021_01_000006/pyspark.zip/pyspark/worker.py", line 181, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1803)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1791)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1790)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1790)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:871)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2024)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1962)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/root/appcache/application_1547758323016_0021/container_1547758323016_0021_01_000006/pyspark.zip/pyspark/worker.py", line 181, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [23]:

# Instantiate metrics object
metrics = RegressionMetrics(valuesAndPreds)

# Squared Error
print("MSE = %s" % metrics.meanSquaredError)
print("RMSE = %s" % metrics.rootMeanSquaredError)

# R-squared
print("R-squared = %s" % metrics.r2)

# Mean absolute error
print("MAE = %s" % metrics.meanAbsoluteError)

# Explained variance
print("Explained variance = %s" % metrics.explainedVariance)

NameError: name 'valuesAndPreds' is not defined

In [None]:
predictions = lrmodel.transform(adultvalid)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator.evaluate(predictions)
print("F1 = {:.4f}".format(f1)) # F1 = 0.3843

In [None]:
adulttrain.show()