# Loading the dataset

In [58]:
import pandas as pd
import os
import boto3

In [59]:
s3 = boto3.resource('s3')
s3.Bucket('aws-logs-862438390833-us-east-1').download_file('elasticmapreduce/tp2-dataset.zip','tp2-dataset.zip')

In [60]:
import zipfile
with zipfile.ZipFile("tp2-dataset.zip","r") as zip_ref:
    zip_ref.extractall('tp2-dataset')


In [61]:
files = os.listdir("tp2-dataset")

In [62]:
data = []
for file in files:
    data.append(pd.read_csv("tp2-dataset/" + file)) 

In [63]:
data[5].head()

Unnamed: 0,No,year,month,day,hour,PM2.5,PM10,SO2,NO2,CO,O3,TEMP,PRES,DEWP,RAIN,wd,WSPM,station
0,1,2013,3,1,0,5.0,14.0,4.0,12.0,200.0,85.0,-0.5,1024.5,-21.4,0.0,NNW,5.7,Nongzhanguan
1,2,2013,3,1,1,8.0,12.0,6.0,14.0,200.0,84.0,-0.7,1025.1,-22.1,0.0,NW,3.9,Nongzhanguan
2,3,2013,3,1,2,3.0,6.0,5.0,14.0,200.0,83.0,-1.2,1025.3,-24.6,0.0,NNW,5.3,Nongzhanguan
3,4,2013,3,1,3,5.0,5.0,5.0,14.0,200.0,84.0,-1.4,1026.2,-25.5,0.0,N,4.9,Nongzhanguan
4,5,2013,3,1,4,5.0,5.0,6.0,21.0,200.0,77.0,-1.9,1027.1,-24.5,0.0,NNW,3.2,Nongzhanguan


In [64]:
data[4].shape

(35064, 18)

In [65]:
df = pd.concat(data)

In [66]:
df.shape

(385704, 18)

# Data Preprocessing

### Dropping the rows where the temperature (label) is null 

In [67]:
df = df[df['TEMP'].notna()]

In [68]:
df.shape

(385325, 18)

In [69]:
df.describe()

Unnamed: 0,No,year,month,day,hour,PM2.5,PM10,SO2,NO2,CO,O3,TEMP,PRES,DEWP,RAIN,WSPM
count,385325.0,385325.0,385325.0,385325.0,385325.0,377282.0,379362.0,376973.0,373964.0,365922.0,373133.0,385325.0,385323.0,385320.0,385319.0,385321.0
mean,17524.858732,2014.661431,6.526161,15.726086,11.499804,79.319774,103.912157,15.711225,50.200872,1217.891556,57.480654,13.516648,1010.677216,2.47112,0.064492,1.728138
std,10120.701659,1.176838,3.447381,8.800716,6.922627,80.338681,91.18568,21.430556,35.036553,1153.626908,56.634546,11.440518,10.462417,13.801826,0.82317,1.249822
min,1.0,2013.0,1.0,1.0,0.0,2.0,2.0,0.2856,1.0265,100.0,0.2142,-19.9,982.4,-43.4,0.0,0.0
25%,8759.0,2014.0,4.0,8.0,5.0,20.0,36.0,3.0,23.0,500.0,11.0,3.1,1002.2,-9.0,0.0,0.9
50%,17531.0,2015.0,7.0,16.0,11.0,55.0,81.0,7.0,43.0,900.0,45.0,14.5,1010.3,3.0,0.0,1.4
75%,26289.0,2016.0,10.0,23.0,18.0,110.0,145.0,19.0,71.0,1500.0,82.0,23.2,1019.0,15.1,0.0,2.2
max,35064.0,2017.0,12.0,31.0,23.0,957.0,999.0,500.0,290.0,10000.0,1071.0,41.6,1042.8,29.1,72.5,12.9


### Counting the null values for each explanatory variable

In [70]:
df['PM2.5'].isna().sum()

8043

In [71]:
df['PM10'].isna().sum()

5963

In [72]:
df['SO2'].isna().sum()

8352

In [73]:
df['NO2'].isna().sum()

11361

In [74]:
df['CO'].isna().sum()

19403

In [75]:
df['O3'].isna().sum()

12192

In [76]:
df['PRES'].isna().sum()

2

In [77]:
df['DEWP'].isna().sum()

5

In [78]:
df['RAIN'].isna().sum()

6

In [79]:
df['WSPM'].isna().sum()

4

In [80]:
df['wd'].isna().sum()

1439

In [81]:
df.iloc[0].values

array([1, 2013, 3, 1, 0, 6.0, 18.0, 5.0, nan, 800.0, 88.0, 0.1, 1021.1,
       -18.6, 0.0, 'NW', 4.4, 'Gucheng'], dtype=object)

In [82]:
from sklearn.impute import SimpleImputer
import numpy as np

imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
df['PM2.5'] = imputer.fit_transform(df['PM2.5'].values.reshape(-1, 1))
df['PM10'] = imputer.fit_transform(df['PM10'].values.reshape(-1, 1))
df['SO2'] = imputer.fit_transform(df['SO2'].values.reshape(-1, 1))
df['NO2'] = imputer.fit_transform(df['NO2'].values.reshape(-1, 1))
df['CO'] = imputer.fit_transform(df['CO'].values.reshape(-1, 1))
df['O3'] = imputer.fit_transform(df['O3'].values.reshape(-1, 1))
df['PRES'] = imputer.fit_transform(df['PRES'].values.reshape(-1, 1))
df['DEWP'] = imputer.fit_transform(df['DEWP'].values.reshape(-1, 1))
df['RAIN'] = imputer.fit_transform(df['RAIN'].values.reshape(-1, 1))
df['WSPM'] = imputer.fit_transform(df['WSPM'].values.reshape(-1, 1))


In [83]:
imputerString = SimpleImputer(strategy="most_frequent")
df['wd'] = imputerString.fit_transform(df['wd'].values.reshape(-1, 1))

### Convert temperature into categories

In [84]:
def convertToCustomCategories(x):
    if x < 0:
        return 0
    if x < 10 and x >= 0:
        return 1
    if x < 20 and x >= 10:
        return 2
    if x < 30 and x >= 20:
        return 3
    if x >= 30:
        return 4
    else:
        print('Value not in range')
        print(x)
        return 2 #returning the halfway value (this gives less weight to an incorrect value)

In [85]:
df.TEMP = df.TEMP.apply(lambda x : convertToCustomCategories(x))

In [86]:
df.head()

Unnamed: 0,No,year,month,day,hour,PM2.5,PM10,SO2,NO2,CO,O3,TEMP,PRES,DEWP,RAIN,wd,WSPM,station
0,1,2013,3,1,0,6.0,18.0,5.0,50.200872,800.0,88.0,1,1021.1,-18.6,0.0,NW,4.4,Gucheng
1,2,2013,3,1,1,6.0,15.0,5.0,50.200872,800.0,88.0,0,1021.5,-19.0,0.0,NW,4.0,Gucheng
2,3,2013,3,1,2,5.0,18.0,15.711225,50.200872,700.0,52.0,0,1021.5,-19.8,0.0,WNW,4.6,Gucheng
3,4,2013,3,1,3,6.0,20.0,6.0,50.200872,1217.891556,57.480654,0,1022.7,-21.2,0.0,W,2.8,Gucheng
4,5,2013,3,1,4,5.0,17.0,5.0,50.200872,600.0,73.0,0,1023.0,-21.4,0.0,WNW,3.6,Gucheng


### Convert Wind into One hot encoded variable

In [87]:
df_wind = pd.get_dummies(df['wd'])

In [88]:
df_wind.head()

Unnamed: 0,E,ENE,ESE,N,NE,NNE,NNW,NW,S,SE,SSE,SSW,SW,W,WNW,WSW
0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0
2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0
3,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0
4,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0


In [89]:
df = pd.concat([df,df_wind], axis=1)

In [90]:
df.head()

Unnamed: 0,No,year,month,day,hour,PM2.5,PM10,SO2,NO2,CO,...,NNW,NW,S,SE,SSE,SSW,SW,W,WNW,WSW
0,1,2013,3,1,0,6.0,18.0,5.0,50.200872,800.0,...,0,1,0,0,0,0,0,0,0,0
1,2,2013,3,1,1,6.0,15.0,5.0,50.200872,800.0,...,0,1,0,0,0,0,0,0,0,0
2,3,2013,3,1,2,5.0,18.0,15.711225,50.200872,700.0,...,0,0,0,0,0,0,0,0,1,0
3,4,2013,3,1,3,6.0,20.0,6.0,50.200872,1217.891556,...,0,0,0,0,0,0,0,1,0,0
4,5,2013,3,1,4,5.0,17.0,5.0,50.200872,600.0,...,0,0,0,0,0,0,0,0,1,0


In [91]:
df.shape

(385325, 34)

### Convert station into One hot encoded variable

In [92]:
df = pd.concat([df,pd.get_dummies(df['station'])], axis=1)

In [93]:
df.shape

(385325, 45)

In [94]:
df.head()

Unnamed: 0,No,year,month,day,hour,PM2.5,PM10,SO2,NO2,CO,...,Changping,Dingling,Dongsi,Guanyuan,Gucheng,Huairou,Nongzhanguan,Shunyi,Tiantan,Wanliu
0,1,2013,3,1,0,6.0,18.0,5.0,50.200872,800.0,...,0,0,0,0,1,0,0,0,0,0
1,2,2013,3,1,1,6.0,15.0,5.0,50.200872,800.0,...,0,0,0,0,1,0,0,0,0,0
2,3,2013,3,1,2,5.0,18.0,15.711225,50.200872,700.0,...,0,0,0,0,1,0,0,0,0,0
3,4,2013,3,1,3,6.0,20.0,6.0,50.200872,1217.891556,...,0,0,0,0,1,0,0,0,0,0
4,5,2013,3,1,4,5.0,17.0,5.0,50.200872,600.0,...,0,0,0,0,1,0,0,0,0,0


# Separating dataset into train and test

In [95]:
print(df.columns.values)

['No' 'year' 'month' 'day' 'hour' 'PM2.5' 'PM10' 'SO2' 'NO2' 'CO' 'O3'
 'TEMP' 'PRES' 'DEWP' 'RAIN' 'wd' 'WSPM' 'station' 'E' 'ENE' 'ESE' 'N'
 'NE' 'NNE' 'NNW' 'NW' 'S' 'SE' 'SSE' 'SSW' 'SW' 'W' 'WNW' 'WSW'
 'Aotizhongxin' 'Changping' 'Dingling' 'Dongsi' 'Guanyuan' 'Gucheng'
 'Huairou' 'Nongzhanguan' 'Shunyi' 'Tiantan' 'Wanliu']


In [96]:
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA

X = df[[ 'DEWP',  'No', 'PRES', 'RAIN','WSPM',
         'hour', 'month', 'year',
      'E', 'ENE', 'ESE', 'N', 'NE', 'NNE', 'NNW', 'NW', 'S', 'SE', 'SSE', 'SSW', 'SW', 'W',
 'WNW', 'WSW', 'E', 'ENE', 'ESE', 'N', 'NE','NNE', 'NNW', 'NW', 'S', 'SE', 'SSE',
 'SSW', 'SW', 'W', 'WNW', 'WSW', 'Aotizhongxin', 'Changping', 'Dingling', 'Dongsi',
 'Guanyuan', 'Gucheng', 'Huairou', 'Nongzhanguan', 'Shunyi', 'Tiantan','Wanliu']].values


PCA = PCA(n_components=10)

X = PCA.fit_transform(X)

Y = df['TEMP'].values

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.99)

In [97]:
Y_train[0]

1

# Other library test (other than SKlearn)

In [98]:
'''
from pyspark.ml.linalg import Vectors

data_train = np.concatenate((X_train, Y_train.reshape(-1,1)),axis=1)

print(X_train.shape)
print(data_train.shape)

dff = map(lambda x: ( Vectors.dense(x[:-1]), int(x[-1])), data_train)
spark_DF = spark.createDataFrame(dff,schema=["features", "label"])
spark_DF.show(5)
'''

'\nfrom pyspark.ml.linalg import Vectors\n\ndata_train = np.concatenate((X_train, Y_train.reshape(-1,1)),axis=1)\n\nprint(X_train.shape)\nprint(data_train.shape)\n\ndff = map(lambda x: ( Vectors.dense(x[:-1]), int(x[-1])), data_train)\nspark_DF = spark.createDataFrame(dff,schema=["features", "label"])\nspark_DF.show(5)\n'

In [99]:
'''
data_test = np.concatenate((X_test, Y_test.reshape(-1,1)),axis=1)


dff_test = map(lambda x: ( Vectors.dense(x[:-1]), int(x[-1])), data_test)
spark_DF_test = spark.createDataFrame(dff_test,schema=["features", "label"])
spark_DF_test.show(5)
'''

'\ndata_test = np.concatenate((X_test, Y_test.reshape(-1,1)),axis=1)\n\n\ndff_test = map(lambda x: ( Vectors.dense(x[:-1]), int(x[-1])), data_test)\nspark_DF_test = spark.createDataFrame(dff_test,schema=["features", "label"])\nspark_DF_test.show(5)\n'

In [100]:
'''
from pyspark.ml.classification import LogisticRegression
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


#sc = SparkContext()

#data = [X_train[0], Y_train[0]]
#data = spark.createDataFrame(X_train)


#assem = assembler.fit(X_train)
#train_set = assem.transform(X_train)
df_train = sc.parallelize(map(lambda x: ( Vectors.dense(x[:-1]), int(x[-1])), data_train)).toDF(["features", "label"])

df_test = sc.parallelize(map(lambda x: ( Vectors.dense(x[:-1]), int(x[-1])), data_test)).toDF(["features", "label"])


logisticRegression = LogisticRegression(maxIter=5, regParam=0.01)
model = logisticRegression.fit(df_train)
'''

'\nfrom pyspark.ml.classification import LogisticRegression\nfrom pyspark import SparkContext\nfrom pyspark import SparkConf\nfrom pyspark.sql import SparkSession\nfrom pyspark.ml.feature import VectorAssembler\n\nspark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()\nspark.conf.set("spark.sql.execution.arrow.enabled", "true")\n\n\n#sc = SparkContext()\n\n#data = [X_train[0], Y_train[0]]\n#data = spark.createDataFrame(X_train)\n\n\n#assem = assembler.fit(X_train)\n#train_set = assem.transform(X_train)\ndf_train = sc.parallelize(map(lambda x: ( Vectors.dense(x[:-1]), int(x[-1])), data_train)).toDF(["features", "label"])\n\ndf_test = sc.parallelize(map(lambda x: ( Vectors.dense(x[:-1]), int(x[-1])), data_test)).toDF(["features", "label"])\n\n\nlogisticRegression = LogisticRegression(maxIter=5, regParam=0.01)\nmodel = logisticRegression.fit(df_train)\n'

In [101]:
#evaluation = model.evaluate(df_test)

In [102]:
#print(evaluation.roc())

# Training

In [103]:
import pickle
from joblibspark import register_spark


from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.utils import parallel_backend
from sklearn.model_selection import cross_val_score
#from spark_sklearn import GridSearchCV
from spark_sklearn.util import createLocalSparkSession
from sklearn.model_selection import GridSearchCV


os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

register_spark()

#sc = createLocalSparkSession().sparkContext



SVM_Model = svm.SVC(kernel='linear', C=1)

Random_Forest_Model = RandomForestClassifier()

Gradient_Boosting_Model = GradientBoostingClassifier()

print("Starting parallel tasks :")

with parallel_backend('spark', n_jobs=4):
    scoresSVM = cross_val_score(SVM_Model, X_train, Y_train, cv=5)
    scoresRandomForest = cross_val_score(Random_Forest_Model, X_train, Y_train, cv=5)
    scoresGradientBoosting = cross_val_score(Gradient_Boosting_Model, X_train, Y_train, cv=5)


print("Random_Forest score : ")
print(scoresRandomForest)
print( "SVM score : ")
print(scoresSVM)
print("Gradient_Boosting score : ")
print(scoresGradientBoosting)
    
fileName = "SVM" + string(np.average(scoresSVM)) 
f = open(fileName + '.pckl', 'wb') 
pickle.dump(SVM_Model , f) 
f.close()

fileName = "Random_Forest" + string(np.average(scoresRandomForest)) 
f = open(fileName + '.pckl', 'wb') 
pickle.dump(randomForest , f) 
f.close()

fileName = "Gradient_Boosting" + string(np.average(scoresGradientBoosting)) 
f = open(fileName + '.pckl', 'wb') 
pickle.dump(Gradient_Boosting_Model , f) 
f.close()


Starting parallel tasks :




Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 1 times, most recent failure: Lost task 0.0 in stage 24.0 (TID 24, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/charles-olivierfavreau/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 267, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor45.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/charles-olivierfavreau/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 267, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
