In [16]:
from pyspark.sql import SQLContext
import sys
import os
sqlContext = SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0xb1ee36cc>

In [17]:
# Convert csv file to Spark DataFrame (Databricks version)
def loadDataFrame(fileName, fileSchema):
    return (sc.read.format("csv")
            .schema(fileSchema)
            .option("header", "true")
            .option("mode", "DROPMALFORMED")
            .csv("s3a://%s:%s@%s/%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME, fileName)))

In [18]:
from pyspark.sql.types import *

paymentSchema = StructType([
    StructField("step", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("amount", FloatType(), True),
    StructField("nameOrig", StringType(), True),
    StructField("oldbalanceOrg", FloatType(), True),
    StructField("newbalanceOrig", FloatType(), True),
    StructField("nameDest", StringType(), True),
    StructField("oldbalanceDest", FloatType(), True),
    StructField("newbalanceDest", FloatType(), True),
    StructField("isFraud", IntegerType(), True),
    StructField("isFlaggedFraud", IntegerType(), True)])

1. Read from s3
2. split columns
3. filter header
4. filter empty rows

In [19]:
def convertToPaymentSchema(arr):
    res = arr
    res[0] = int(arr[0]) # step
    res[2] = float(arr[2]) # amount
    res[4] = float(arr[4]) # old balance
    res[5] = float(arr[5]) # new balance
    res[7] = float(arr[7]) # old balance destination
    res[8] = float(arr[8]) # new balance destination
    res[9] = int(arr[9]) # is fraud
    res[10] = int(arr[10]) # is flagged fraud

    return res

In [20]:
filename = '/vagrant/PS_20174392719_1491204439457_log.csv'

rdd = sc.textFile(filename) \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: line[0] != "step") \
    .filter(lambda line: len(line)>1) \
    .map(convertToPaymentSchema)
    
print rdd.take(5)


[[1, u'PAYMENT', 9839.64, u'C1231006815', 170136.0, 160296.36, u'M1979787155', 0.0, 0.0, 0, 0], [1, u'PAYMENT', 1864.28, u'C1666544295', 21249.0, 19384.72, u'M2044282225', 0.0, 0.0, 0, 0], [1, u'TRANSFER', 181.0, u'C1305486145', 181.0, 0.0, u'C553264065', 0.0, 0.0, 1, 0], [1, u'CASH_OUT', 181.0, u'C840083671', 181.0, 0.0, u'C38997010', 21182.0, 0.0, 1, 0], [1, u'PAYMENT', 11668.14, u'C2048537720', 41554.0, 29885.86, u'M1230701703', 0.0, 0.0, 0, 0]]


In [21]:
df = sqlContext.createDataFrame(rdd, paymentSchema)
print df.take(3)

[Row(step=1, type=u'PAYMENT', amount=9839.6396484375, nameOrig=u'C1231006815', oldbalanceOrg=170136.0, newbalanceOrig=160296.359375, nameDest=u'M1979787155', oldbalanceDest=0.0, newbalanceDest=0.0, isFraud=0, isFlaggedFraud=0), Row(step=1, type=u'PAYMENT', amount=1864.280029296875, nameOrig=u'C1666544295', oldbalanceOrg=21249.0, newbalanceOrig=19384.720703125, nameDest=u'M2044282225', oldbalanceDest=0.0, newbalanceDest=0.0, isFraud=0, isFlaggedFraud=0), Row(step=1, type=u'TRANSFER', amount=181.0, nameOrig=u'C1305486145', oldbalanceOrg=181.0, newbalanceOrig=0.0, nameDest=u'C553264065', oldbalanceDest=0.0, newbalanceDest=0.0, isFraud=1, isFlaggedFraud=0)]
