*** According to the Secret Service, the crime is responsible for about $350,000 of monetary losses each day in the United States and is considered to be the number one ATM-related crime. Trade group Global ATM Security Alliance estimates that skimming costs the U.S.-banking industry about $60 million a year.***

The easiest way that companies identify atm fraud is by recognizing a break in spending patterns.  For example, if you live in Wichita, KS and suddenly your card is used to buy something in Bend, OR – that may tip the scales in favor of possible fraud, and your credit card company might decline the charges and ask you to verify them.

![Databricks for Credit Card Fraud](https://thephp.cc/images/news/credit_card_with_padlock.jpg "Databricks for Credit Card Fraud")  
* [**ATM Fraud Analytics**](https://www.csoonline.com/article/2124891/fraud-prevention/atm-skimming--how-to-recognize-card-fraud.html) is the use of data analytics and machine learning to detect ATM fraud and is...  
  * Built on top of Databricks Platform
  * Uses a machine learning implementation to detect ATM fraud   
* This demo...  
  * demonstrates a ATM fraud detection workflow.  The dataset we use is internally mocked up data.

# Pipeline from Event Hubs / Kafka / Azure Storage 

Apache Spark 2.0 adds the first version of a new higher-level stream processing API, Structured Streaming. In this notebook we are going to take a quick look at how to use DataFrame API to build Structured Streaming applications. At large organizations it is typical for different teams to read and write streaming data. In this example, we are going to load JSON data that has been partitioned by timestamp into a DataFrame stream. Next, we want to compute real-time metrics like running counts and windowed counts on a stream of timestamped actions.

![stream](https://s3-us-west-2.amazonaws.com/db-jodwyer/images/atmFraudAws.png)

###Step 1: Explore Data

In [4]:
%sql use fraud_demo;
cache table atm_visits;

In [5]:
%sql select * from atm_visits limit 100

In [6]:
%sql select count(*) from atm_visits 

In [7]:
%sql select sum(amount), month, fraud_report from atm_visits where year = $year group by month, fraud_report order by month, fraud_report

In [8]:
%sql select sum(amount), month, fraud_report from atm_visits where year = $year group by month, fraud_report order by month, fraud_report

###Step 2 and 3: Enrich Data, Mask Card Numbers and Visualize

In [10]:
%sql create or replace view atm_dataset as 
  select concat(substr(card_number,0,4), '********',substr(card_number,-4)) masked_card_number, c.checking_savings, c.first_name, c.last_name, c.customer_since_date, c.customer_id, v.*, l.* 
    from atm_customers c 
      inner join  
        atm_visits v using (customer_id) 
      inner join 
        atm_locations l using (atm_id)

In [11]:
%sql select * from atm_dataset

In [12]:
%sql select count(1), city_state_zip.state state from atm_dataset where year = 2016 and fraud_report = 'Y' group by city_state_zip.state

###Step 4 and 5: Create & Use Spark ML Model

### Workflows with Pyspark.ML Pipeline
<img src="https://s3-us-west-2.amazonaws.com/pub-tc/ML-workflow.png" width="800">

##Model Export / Import

Databricks ML Model Export allows you to export models and full ML pipelines from Apache Spark. These exported models and pipelines can be imported into other (Spark and non-Spark) platforms to do scoring and make predictions. Model Export is targeted at low-latency, lightweight ML-powered applications

Choose the best Model and Export
```python
model.bestModel.write().overwrite().save("/models/atm_fraud")```

Documentation can be found [here](https://docs.azuredatabricks.net/spark/latest/mllib/index.html)

In [16]:
%scala

import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
val model = PipelineModel.load("/mnt/jodwyer/atm/model/")

In [17]:
%scala
val dtModel = model.stages.last.asInstanceOf[DecisionTreeClassificationModel]
display(dtModel)

### Step 6: Stream incoming data and score fraud in near real-time

In [19]:
%run ./helpers

In [20]:
from pyspark.sql.types  import *

json_schema = StructType([StructField("atm_id",LongType(),True),StructField("customer_id",LongType(),True),StructField("visit_id",LongType(),True),StructField("withdrawl_or_deposit",StringType(),True),StructField("amount",LongType(),True),StructField("fraud_report",StringType(),True),StructField("day",LongType(),True),StructField("month",LongType(),True),StructField("year",LongType(),True),StructField("hour",LongType(),True),StructField("min",LongType(),True),StructField("sec",LongType(),True),StructField("card_number",LongType(),True),StructField("checking_savings",StringType(),True),StructField("first_name",StringType(),True),StructField("last_name",StringType(),True),StructField("customer_since_date",DateType(),True),StructField("city_state_zip",StructType([StructField("city",StringType(),True),StructField("state",StringType(),True),StructField("zip",StringType(),True)]),True),StructField("pos_capability",StringType(),True),StructField("offsite_or_onsite",StringType(),True),StructField("bank",StringType(),True)])

In [21]:
%fs ls /mnt/jodwyer/atm/json

In [22]:
from pyspark.ml import *


streamingInputDF = spark.readStream.schema(json_schema).option("maxFilesPerTrigger", 1).json("/mnt/jodwyer/atm/json")

streamingFraudDataset = streamingInputDF.selectExpr("to_sql_timestamp(year, month, day, hour, min, sec) as timestamp", "withdrawl_or_deposit", "amount", "day", "month", "year", "hour", "min", "checking_savings", "pos_capability", "offsite_or_onsite", "bank", "fraud_report")

In [23]:
display(streamingFraudDataset)

In [24]:
model = PipelineModel.load("/mnt/jodwyer/atm/model/")

In [25]:
model.transform(streamingFraudDataset).createOrReplaceTempView("predictions")

In [26]:
%sql select timestamp, amount, bank, withdrawl_or_deposit, prediction, getFraudProbability(probability) fraud_probability from predictions 

In [27]:
%sql select sum(amount), withdrawl_or_deposit, prediction from predictions group by withdrawl_or_deposit, prediction