# Credit Card Fraud Analysis

## Project purpose

The presented project concerns the analysis of all credit card transactions of the day, eventually discerning fraudulent actions that may lead to investigating further frauds. The transactions get generated and then injected into a pipeline that makes use of the Linear Regression algorithm to predict the possible fraudulent transactions.

## What's the point of a credit card analyser? 

As of the most recent data available, the global financial impact of credit card fraudulent activities continues to reach significant figures, totaling tens of billions of dollars. From a European perspective, credit card fraud remains a prevalent concern. In a recent report by the European Central Bank (ECB), it was revealed that the cumulative losses due to card fraud in the Single European Payment Area (SEPA) reached €1.8 billion in 2018. This underscores the substantial financial ramifications of fraudulent activities in the realm of electronic payments. Notably, the scale of the issue extends beyond national borders, affecting a considerable number of individuals. Recent statistics suggest that one in ten Europeans may have fallen victim to credit card fraud, with a median loss amounting to €399. These findings emphasize the ongoing challenges posed by credit card fraud and the importance of robust measures to safeguard financial transactions in the European context.

## How to read prediction results?

In [None]:
0 => No fraud Detected
1 => Fraud Detected

![1 or 0](pics/1_0.png)

## Data pipeline

![Data Pipeline](pics/pipeline.png)

## Transactions generator

Regarding how these transactions get generated, it is all stored in the transactions generator directory. It is constructed through modules that will generate customers, terminals, add frauds and even generate the dataset. The frauds are generated by an overall consideration of some scenarios but for this project it has been choosen to follow some standard scenarios, as an instance, when the transaction amount surpasses a limit of 120. If a transaction falls in this scenario, then it has to be considered as a fraud. To emulate a more realistic situation, the features have been censored. When the censored dataset is generated, through HTTP POST, the dataframe will be sent as json data to logstash, which will listen the port 5044. The transaction generator generates transactions of the current day, and dumps them to logstash every day at 23:59. 

## Logstash

Logstash is the first checkpoint of the pipeline which is an interface for kafka and the Elastic suite. It is used for ingesting the data from the transaction generator; To set up logstash to accomodate this situation, the logstash.conf has been modeled to receive data through HTTP as an input listening to port 5044. Then it was necessary to check whether the data received was actually a json or a csv that got converted into a json. The final step of this process is to generate a csv file from the data received renaming the columns, assigning the features the respective labels. Then this csv file has to be sent to kafka. To prevent logstash from entering in a loop where it reads and duplicates the data, it has been used a peculiar workaround, which was a .log file that forced logstash into a reading mode of the csv file, producing this log file at the end of this process so that logstash could know that the process has ended. This is the section that has been just explained:

In [None]:

input {
  file {
    path => "/usr/share/logstash/transactions.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    mode => "read"
    file_completed_action => "log"
    file_completed_log_path => "/usr/share/logstash/completed.log"
  }
}


## Kafka

Apache Kafka stands as an open-source distributed event streaming platform, offering robust support for high-performance data pipelines, streaming analytics, seamless data integration, and the execution of mission-critical applications. In this context, in the cluster, a topic named "transactions" is created to store what has been received from logstash and then sent to the consumers.

## Spark

Apache Spark serves as an open-source unified analytics engine designed for large-scale data processing, providing a versatile solution for analytics, machine learning, and big data processing tasks. In this scenario, it will be used to apply the Linear Regression to create a prediction column which will tell us whether a transaction is fraudulent or not:

In [None]:
input_features=['TX_AMOUNT','TX_FRAUD']

assembler = VectorAssembler(inputCols=input_features, 
                                outputCol='features')

regression= LogisticRegression(featuresCol= 'features', 
                                    labelCol='TX_FRAUD')
pipeline=Pipeline(stages=[assembler, regression])
pipelineFit= pipeline.fit(train_set)
updated_train_set = pipelineFit.transform(train_set)

## Spark streaming

Spark streaming processes the data that is being received from Kafka and then sends it to Elasticsearch. The first step is to read the data from kafka and to subscribe to the "transactions" topic:

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafkaServer) \
  .option("subscribe", topic) \
  .load()

The received data will be, fit into a schema, elaborated and then written into the stream:

In [None]:
df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", dataKafka).alias("data")) \
    .select("data.*") \
    .writeStream \
    .foreachBatch(process) \
    .start() \
    .awaitTermination()

The "process" method will just invoke the model to the data, convert "TX_DATETIME" from unix timestamp to standard datetime, and send the data to Elasticsearch:

In [None]:
def process(batch_df: DataFrame, batch_id: int):

    if not batch_df.rdd.isEmpty():
        print("----------------- \n Processing\n")
        batch_df.show()

        batch_df = batch_df.withColumn('TX_DATETIME', 
                            from_unixtime(col('TX_DATETIME') / 1000))
        data2=pipelineFit.transform(batch_df)
        data2.summary()
        data2.show()

        print("--------------------- \nSending data to ES \n")
        data2.select("TRANSACTION_ID", "TX_DATETIME", "CUSTOMER_ID", 
                     "TERMINAL_ID", "TX_AMOUNT",
                     "TX_TIME_SECONDS", "TX_DURING_NIGHT", 
                     "@timestamp", "prediction") \
        .write \
        .format("org.elasticsearch.spark.sql") \
        .mode('append') \
        .option("es.mapping.id","TRANSACTION_ID") \
        .option("es.nodes", elastic_host).save(elastic_index)

## Elasticsearch and Kibana

Elasticsearch stands as a distributed, open-source search, and analytics engine designed for diverse data types, providing a powerful solution for effective data exploration and analysis. The Spark Stream node effectively correlates the index based on the information and code supplied in the script. The data will be visualized and organized thanks to Kibana:

![Graphs](pics/graphs.png)

![Meme](pics/meme1.png)