In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext(appName="LogParser-py-Streaming Mod12")
ssc = StreamingContext(sc, 120)

In [2]:
from pyspark.ml import PipelineModel

In [3]:
from pyspark.sql import SQLContext
from pyspark.sql import Row
import sys
from datetime import datetime
import pyspark.sql.functions as F

In [4]:
now = datetime.now()
print(now)

2019-07-30 17:01:01.221873


In [5]:
filepath = '/user/edureka_524533/Flume_Fraud/' + now.strftime('%Y-%m-%d') 
print(filepath)

/user/edureka_524533/Flume_Fraud/2019-07-30


In [6]:
def getSparkSessionInstance(sparkConf):
        if ("sparkSessionSingletonInstance" not in globals()):
            globals()["sparkSessionSingletonInstance"] = SparkSession \
                .builder \
                .config(conf=sparkConf) \
                .getOrCreate()
        return globals()["sparkSessionSingletonInstance"]

In [7]:
#validList=['CASH_OUT','TRANSFER']

In [8]:
lines = ssc.textFileStream(filepath)

In [9]:
def process(t, rdd):
    if rdd.isEmpty():
            print("==== EMPTY ====")
            return
    print("=== RDD Found ===")
    rowRdd = rdd.map(lambda x: Row(line=x))
    spark = getSparkSessionInstance(rdd.context.getConf())
    df = spark.createDataFrame(rowRdd)
    df.printSchema()
    split_col = F.split(df['line'], ',')
    df = df.withColumn('step', split_col.getItem(0))
    df = df.withColumn('type', split_col.getItem(1))
    df = df.withColumn('amount', split_col.getItem(2))
    df = df.withColumn('nameOrig', split_col.getItem(3))
    df = df.withColumn('oldbalanceOrg', split_col.getItem(4))
    df = df.withColumn('newbalanceOrig', split_col.getItem(5))
    df = df.withColumn('nameDest', split_col.getItem(6))
    df = df.withColumn('oldbalanceDest', split_col.getItem(7))
    df = df.withColumn('newbalanceDest', split_col.getItem(8))
    df = df.withColumn('isFraud', split_col.getItem(9))
    df = df.withColumn('isFlaggedFraud', split_col.getItem(10))
    #Set the datatypes
    df = df.withColumn('step',df.type.cast("int"))
    df = df.withColumn('type', df.type.cast("string"))
    df = df.withColumn('amount', df.amount.cast("double"))
    df = df.withColumn('nameOrig', df.nameOrig.cast("string"))
    df = df.withColumn('oldbalanceOrg', df.oldbalanceOrg.cast("double"))
    df = df.withColumn('newbalanceOrig', df.newbalanceOrig.cast("double"))
    df = df.withColumn('nameDest', df.nameDest.cast("string"))
    df = df.withColumn('oldbalanceDest', df.oldbalanceDest.cast("double"))
    df = df.withColumn('newbalanceDest', df.newbalanceDest.cast("double"))
    df = df.withColumn('isFraud', df.isFraud.cast("int"))
    df = df.withColumn('isFlaggedFraud', df.isFlaggedFraud.cast("int"))
    #delete the unwanted column
    df.drop('line')
    
    df.printSchema()

    col1=['newbalanceOrig','oldbalanceOrg','amount']
    col2=['newbalanceDest','oldbalanceDest','amount']
    df = df.withColumn('adjustedBalanceOrg',sum(df[cols] for cols in col1))
    df = df.withColumn('adjustedBalanceDest',sum(df[cols] for cols in col2))
    pipeline = PipelineModel.load("use_cases/fraud_model1.1")
    predictions = pipeline.transform(df)
    predictions.select('isFraud','prediction').show()
    

In [10]:
lines.pprint()
lines.foreachRDD(process)

In [11]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

-------------------------------------------
Time: 2019-07-30 17:02:00
-------------------------------------------
1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0
1,TRANSFER,215310.3,C1670993182,705.0,0.0,C1100439041,22425.0,0.0,0,0

=== RDD Found ===
root
 |-- line: string (nullable = true)

root
 |-- line: string (nullable = true)
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)

+-------+----------+
|isFraud|prediction|
+-------+----------+
|      1|       0.0|
|      0|       1.0|
+-------+----------+

-------------------------------------------
Time: 2019-07-30 

KeyboardInterrupt: 

-------------------------------------------
Time: 2019-07-30 17:06:00
-------------------------------------------

==== EMPTY ====
