In [40]:
import findspark
findspark.init()
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit, monotonically_increasing_id, col

In [2]:
sc = SparkContext.getOrCreate()

In [3]:
sql_context = SQLContext(sc)

In [4]:
df = sql_context.read.format('com.databricks.spark.csv')\
        .options(header='true', inferschema='true')\
        .load('/Users/namngo/personal/neptune/data/PS_20174392719_1491204439457_log.csv')

In [5]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [50]:
df.show(5, False)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|type    |amount  |nameOrig   |oldbalanceOrg|newbalanceOrig|nameDest   |oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|1   |PAYMENT |9839.64 |C1231006815|170136.0     |160296.36     |M1979787155|0.0           |0.0           |0      |0             |
|1   |PAYMENT |1864.28 |C1666544295|21249.0      |19384.72      |M2044282225|0.0           |0.0           |0      |0             |
|1   |TRANSFER|181.0   |C1305486145|181.0        |0.0           |C553264065 |0.0           |0.0           |1      |0             |
|1   |CASH_OUT|181.0   |C840083671 |181.0        |0.0           |C38997010  |21182.0       |0.0           |1      |0             |
|1   |PAYMENT |11668.14|C2048537720|41554.0      |29885.86      |M1230701703|0.0   

In [12]:
orig_account_df = df.select('nameOrig').distinct()

In [13]:
dest_account_df = df.select('nameDest').distinct()

In [18]:
all_account_df = orig_account_df.union(dest_account_df).distinct()

In [29]:
accounts = all_account_df.withColumnRenamed('nameOrig', '~id').withColumn('~label', lit('ACCOUNT'))

In [31]:
accounts.show(4)

+-----------+-------+
|        ~id| ~label|
+-----------+-------+
| C100021340|ACCOUNT|
|C1000262126|ACCOUNT|
|C1000507192|ACCOUNT|
|C1000547990|ACCOUNT|
+-----------+-------+
only showing top 4 rows



In [38]:
accounts.write.format('com.databricks.spark.csv')\
    .mode('overwrite')\
    .options(header="true")\
    .save('/Users/namngo/Desktop/aml-accounts')

In [59]:
transactions = df\
    .withColumn('~id', monotonically_increasing_id())\
    .withColumnRenamed('nameOrig', '~from')\
    .withColumnRenamed('nameDest', '~to')\
    .withColumnRenamed('type', '~label')\
    .withColumnRenamed('amount', 'amount:Double')\
    .withColumnRenamed('oldbalanceOrg', 'oldbalanceOrg:Double')\
    .withColumnRenamed('newbalanceOrig', 'newbalanceOrig:Double')\
    .withColumnRenamed('oldbalanceDest', 'oldbalanceDest:Double')\
    .withColumnRenamed('newbalanceDest', 'newbalanceDest:Double')\
    .withColumnRenamed('isFraud', 'isFraud:Int')\
    .withColumnRenamed('isFlaggedFraud', 'isFlaggedFraud:Int')\
    .select(
        '~id', '~label', '~from', '~to', 'amount:Double',
        'oldbalanceOrg:Double', 'newbalanceOrig:Double',
        'oldbalanceDest:Double', 'newbalanceDest:Double',
        'isFraud:Int', 'isFlaggedFraud:Int',
    )

transactions.write.format('com.databricks.spark.csv')\
    .mode('overwrite')\
    .options(header="true")\
    .save('/Users/namngo/Desktop/aml-transactions')