In [None]:
#!/usr/bin/env python

import os
import sys
import subprocess
from datetime import datetime, timedelta

try:
    from pyspark import SparkContext, SQLContext, StorageLevel
    from pyspark.sql import SparkSession
except ImportError:
    import findspark
    findspark.init()
    from pyspark import SparkContext, SQLContext, StorageLevel
    from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.sql.types import StringType

from log_template import *


PIPO YAML version : 5.1.1
PIPO main config file: /shared/.pipo/main_conf.yml
PIPO version: 3.0.1


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    split, concat_ws, to_timestamp, col, broadcast,
    round as spark_round, lit, udf
)
from pyspark.sql.types import (
    IntegerType, TimestampType, DoubleType
)
import math


In [None]:
spark = SparkSession.builder \
    .appName("Fraud_detect") \
    .master("yarn") \
    .config("spark.sql.csv.inferSchema", "true") \
    .config("spark.sql.csv.header", "true") \
    .getOrCreate()


In [None]:
customers = spark.read.csv("/user/markous/customer.csv", header=True)
transactions = spark.read.csv("/user/markous/transactions.csv", header=True)


In [None]:
customers.show(5)
customers.printSchema()


+----------------+-------+--------+------+--------------------+--------------+-----+-----+-------+---------+--------------------+--------------------+----+----+
|          cc_num|  first|    last|gender|              street|          city|state|  zip|    lat|     long|                 job|                 dob|_c12|_c13|
+----------------+-------+--------+------+--------------------+--------------+-----+-----+-------+---------+--------------------+--------------------+----+----+
|3526015186182660|   Carl|   Gomez|     M|204 Cohen Meadow ...|Hathaway Pines|   CA|95233|38.1919|-120.3644|Data processing m...|1958-10-12T00:00:...|null|null|
|4170242670039980|Rebecca|Trujillo|     F|       242 Cody Pass|      Colstrip|   MT|59323|45.9344|-106.6368|          Air broker|1983-08-09T00:00:...|null|null|
|   4006862159277| Cheryl|    Rice|     F|0771 Solis Road A...|      Brooklyn|   NY|11228|40.6174| -74.0121|     Tourism officer|1957-07-24T00:00:...|null|null|
|3593533875650650|  Cindy|     Ray

In [None]:
transactions.show(5)  
transactions.printSchema()  

+----------------+-------+-------+--------------------+--------------------+----------+----------+-------------+--------------------+---+---------+-----------+--------+
|          cc_num|  first|   last|           trans_num|          trans_date|trans_time| unix_time|     category|            merchant|amt|merch_lat| merch_long|is_fraud|
+----------------+-------+-------+--------------------+--------------------+----------+----------+-------------+--------------------+---+---------+-----------+--------+
| 180094108369013|   John|Holland|80f5177be11f0bcd7...|2012-01-01T00:00:...|   0:12:15|1325376735|personal_care|         Hills-Boyer| 64|39.011566|-119.937831|       0|
|4368593032190500|  Carla|Fleming|7933d389bf8ef8a11...|2012-01-01T00:00:...|   0:16:58|1325377018|gas_transport|      Friesen-DAmore|133|40.149071| -75.589697|       0|
|   4361355512072|Matthew| Nelson|1467c318b5d73d22d...|2012-01-01T00:00:...|   0:36:42|1325378202|entertainment|         Larson-Moen|119|47.297797| -96.819

# Compute Age

In [None]:

customerAgeDF = customers.withColumn("age", 
                                      (F.datediff(F.current_date(), F.to_date(customers['dob'])) / 365).cast(IntegerType()))

In [41]:
customerAgeDF.show(2)

+----------------+-------+--------+------+--------------------+--------------+-----+-----+-------+---------+--------------------+--------------------+----+----+---+
|          cc_num|  first|    last|gender|              street|          city|state|  zip|    lat|     long|                 job|                 dob|_c12|_c13|age|
+----------------+-------+--------+------+--------------------+--------------+-----+-----+-------+---------+--------------------+--------------------+----+----+---+
|3526015186182660|   Carl|   Gomez|     M|204 Cohen Meadow ...|Hathaway Pines|   CA|95233|38.1919|-120.3644|Data processing m...|1958-10-12T00:00:...|null|null| 66|
|4170242670039980|Rebecca|Trujillo|     F|       242 Cody Pass|      Colstrip|   MT|59323|45.9344|-106.6368|          Air broker|1983-08-09T00:00:...|null|null| 41|
+----------------+-------+--------+------+--------------------+--------------+-----+-----+-------+---------+--------------------+--------------------+----+----+---+
only showi

# Compute Distance

In [None]:
def get_distance(lat1, lon1, lat2, lon2):
    r = 6371
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
    c = 2 * math.asin(math.sqrt(a))
    return r * c

distance_udf = udf(get_distance, DoubleType())

transactionDF = transactions.withColumn("trans_date", split(col("trans_date"), "T").getItem(0)) \
    .withColumn("trans_time", concat_ws(" ", col("trans_date"), col("trans_time"))) \
    .withColumn("trans_time", to_timestamp(col("trans_time"), "yyyy-MM-dd HH:mm:ss").cast(TimestampType()))


In [62]:
customerAgeDF.write \
    .option("delimiter", ",") \
    .mode("overwrite") \
    .csv("/user/markousm/customerAgeDF.csv")


In [63]:
transactionDF.write \
    .option("delimiter", ",") \
    .mode("overwrite") \
    .csv("/user/markousm/transactionDF.csv")


In [61]:
customerAgeDF.printSchema()

root
 |-- cc_num: string (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- age: integer (nullable = true)



In [60]:
transactionDF.printSchema()

root
 |-- cc_num: string (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- trans_date: string (nullable = true)
 |-- trans_time: timestamp (nullable = true)
 |-- unix_time: string (nullable = true)
 |-- category: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- merch_lat: string (nullable = true)
 |-- merch_long: string (nullable = true)
 |-- is_fraud: string (nullable = true)



In [64]:
spark.stop()