# Importing spark

In [1]:
# import findspark
# findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName("Python Spark").getOrCreate()
sc = spark.sparkContext

sc.setSystemProperty('spark.executor.memory', '8g')
sc.setSystemProperty('spark.driver.memory', '45G')


# Preparing the data

In [2]:
df_transactions = spark.read.option("header", True)\
    .option("delimiter", "|")\
    .option("delimiter", ",")\
    .option("inferSchema", "true")\
    .csv('data_stream/train.csv')\
    .withColumnRenamed('default_payment_next_month', 'label')

In [3]:
len(df_transactions.columns)

25

In [4]:
from pyspark.ml.feature import SQLTransformer
sqlTrans = SQLTransformer(
    statement="SELECT label, MARRIAGE, EDUCATION, PAY_0, PAY_2, PAY_3 FROM __THIS__")

In [5]:
train, test = df_transactions.randomSplit([0.8,0.2])

In [6]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["MARRIAGE", "EDUCATION","PAY_0", "PAY_2", "PAY_3"],
    outputCol="features")


In [7]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()

In [8]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[sqlTrans, assembler, lr])

In [9]:
lrModel = pipeline.fit(train)

In [10]:
predictions = lrModel.transform(test)

In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator


evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

evaluator.evaluate(predictions)


0.6930594638042049

In [12]:
from pyspark.streaming import StreamingContext

In [13]:
stream = StreamingContext(sc, 30)

In [14]:
def process(time, rdd_stream):
    print("========== %s ========" % str(time))
    try:
        rdd_transac = rdd_stream.map(lambda x: x.split(","))
        df_transac = spark.createDataFrame(rdd_transac)
        
        # Cette boucle for est utilisé pour caster les entiers sous forme de '123' en 123
        for c, i in zip(df_transac.columns, df_transactions.schema):
            df_transac = df_transac.withColumn(i.name, df_transac[c].cast(i.dataType))
            
        if df_transac.count() > 0:
            predictions = lrModel.transform(df_transac)
            print("AUC: %s" % evaluator.evaluate(predictions))
            
    except Exception as e:
        print(e)

In [None]:
stream_transac = stream.textFileStream("./data_stream/output/")

stream_transac.foreachRDD(process)
stream.start()
stream.awaitTermination()

RDD is empty
AUC: 0.7178187034194977
AUC: 0.7368656823394498
AUC: 0.6645741924339265
