In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1578069659458_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [83]:
glueContext._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1. Create Glue crawler

Create a Glue crawler for the csv file on the bucket (https://www.kaggle.com/rouseguy/bankbalanced/data)

In [2]:
data = glueContext.create_dynamic_frame.from_catalog(database="bank", table_name="vtr")
print ("Count: ", data.count())
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count:  11162
root
|-- age: long
|-- job: string
|-- marital: string
|-- education: string
|-- default: string
|-- balance: long
|-- housing: string
|-- loan: string
|-- contact: string
|-- day: long
|-- month: string
|-- duration: long
|-- campaign: long
|-- pdays: long
|-- previous: long
|-- poutcome: string
|-- deposit: string

### 2. Exploring the data


In [3]:
df = data.toDF()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns).transpose()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                   0          1           2          3         4
age               59         56          41         55        54
job           admin.     admin.  technician   services    admin.
marital      married    married     married    married   married
education  secondary  secondary   secondary  secondary  tertiary
default           no         no          no         no        no
balance         2343         45        1270       2476       184
housing          yes         no         yes        yes        no
loan              no         no          no         no        no
contact      unknown    unknown     unknown    unknown   unknown
day                5          5           5          5         5
month            may        may         may        may       may
duration        1042       1467        1389        579       673
campaign           1          1           1          1         2
pdays             -1         -1          -1         -1        -1
previous           0     

In [5]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'bigint']
df.select(numeric_features).describe().toPandas().transpose()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

              0                   1                   2      3      4
summary   count                mean              stddev    min    max
age       11162  41.231947679627304  11.913369192215518     18     95
balance   11162  1528.5385235620856   3225.413325946149  -6847  81204
day       11162  15.658036194230425   8.420739541006462      1     31
duration  11162  371.99381831213043  347.12838571630687      2   3881
campaign  11162   2.508421429851281  2.7220771816614824      1     63
pdays     11162   51.33040673714388  108.75828197197717     -1    854
previous  11162  0.8325568894463358   2.292007218670508      0     58

In [6]:
df = df.select('age', 'job', 'marital', 'education', 'default', 'balance', 'housing', 'loan', 'contact', 'duration', 'campaign', 'pdays', 'previous', 'poutcome', 'deposit')
cols = df.columns
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- age: long (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: long (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- campaign: long (nullable = true)
 |-- pdays: long (nullable = true)
 |-- previous: long (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)

In [7]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: long (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: long (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- campaign: long (nullable = true)
 |-- pdays: long (nullable = true)
 |-- previous: long (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)

In [13]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                           0                        ...                                                                          4
label                                                      1                        ...                                                                          1
features   (0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...                        ...                          (0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
age                                                       59                        ...                                                                         54
job                                                   admin.                        ...                                                                     admin.
marital                                              married                        ...                                                                    married
education             

In [14]:
from pyspark.ml.linalg import Vectors
def extract(row):
    return (row.label, ) + tuple(row.features.toArray().tolist())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
df2 = df.rdd.map(extract).toDF(["label"])  # Vector values will be named _2, _3, ...

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
df2.groupBy('label').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 5873|
|  1.0| 5289|
+-----+-----+

In [32]:
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as func
df2 = df2.withColumn("label", func.round(df2["label"]).cast('integer'))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
pd.DataFrame(df2.take(10), columns=df2.columns).transpose()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

            0       1       2       3   ...         6       7      8       9
label     1.0     1.0     1.0     1.0   ...       1.0     1.0    1.0     1.0
_2        0.0     0.0     0.0     0.0   ...       1.0     0.0    0.0     0.0
_3        0.0     0.0     0.0     0.0   ...       0.0     0.0    0.0     0.0
_4        0.0     0.0     1.0     0.0   ...       0.0     0.0    1.0     0.0
_5        1.0     1.0     0.0     0.0   ...       0.0     0.0    0.0     0.0
_6        0.0     0.0     0.0     1.0   ...       0.0     0.0    0.0     1.0
_7        0.0     0.0     0.0     0.0   ...       0.0     1.0    0.0     0.0
_8        0.0     0.0     0.0     0.0   ...       0.0     0.0    0.0     0.0
_9        0.0     0.0     0.0     0.0   ...       0.0     0.0    0.0     0.0
_10       0.0     0.0     0.0     0.0   ...       0.0     0.0    0.0     0.0
_11       0.0     0.0     0.0     0.0   ...       0.0     0.0    0.0     0.0
_12       0.0     0.0     0.0     0.0   ...       0.0     0.0    0.0     0.0

In [34]:
train, test = df2.randomSplit([0.7, 0.3], seed = 42)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
test.groupBy('label').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+
|label|count|
+-----+-----+
|    1| 1607|
|    0| 1728|
+-----+-----+

In [36]:
from awsglue.dynamicframe import DynamicFrame

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
dynamicDataFrame = DynamicFrame.fromDF(
                       train, glueContext , 
                       "dynamicDataFrame")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
glueContext.write_dynamic_frame.from_options(
    frame = dynamicDataFrame,
    connection_type = "s3", 
    connection_options = {"path": "s3://lla-mggaska/train/"}, format = "csv", format_options = {"writeHeader": False}, transformation_ctx = "datasink2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7fa1612be2e8>

In [40]:
dynamicDataFrameTest = DynamicFrame.fromDF(
                       test, glueContext , 
                       "dynamicDataFrame")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
glueContext.write_dynamic_frame.from_options(
    frame = dynamicDataFrameTest,
    connection_type = "s3", 
    connection_options = {"path": "s3://lla-mggaska/test/"}, format = "csv", format_options = {"writeHeader": False}, transformation_ctx = "datasink2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7fa1612c4da0>