##Building a PYSPARK connection

In [None]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=af091e07820df96d578cbfab36b72237f84a7c54a27b287efe58630ed72f90dc
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
df = spark.createDataFrame([(1, "John"), (2, "Jane"), (3, "Smith")], ["id", "name"])
df.show()

+---+-----+
| id| name|
+---+-----+
|  1| John|
|  2| Jane|
|  3|Smith|
+---+-----+



In [None]:
rdd = df.rdd
rdd.collect()

[Row(id=1, name='John'), Row(id=2, name='Jane'), Row(id=3, name='Smith')]

In [None]:
rdd.count()

3

##Creating a SparkSession

In [None]:
# create a SparkSession
spark = SparkSession.builder.appName("RDD Example").getOrCreate()

# create an RDD with some data
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)

# perform some transformations on the RDD
squared_rdd = rdd.map(lambda x: x**2)

# retrieve the results
result = squared_rdd.collect()

# print the results
print(result)

[1, 4, 9, 16, 25]


##Mounting Google Drive in a Colab notebook

In [None]:
# from google.colab import files
# uploaded = files.upload()

In [None]:
rdd1 = spark.sparkContext.parallelize([(1,2),(2,1),(2,2)])
rdd2 = spark.sparkContext.parallelize([(2,5),(3,1)])
a = rdd1.collect()
b = rdd2.collect()
print(a,b)

[(1, 2), (2, 1), (2, 2)] [(2, 5), (3, 1)]


##Creating a SparkContext object (sc)

In [None]:
# from pyspark import SparkContext, SparkConf
# conf = SparkConf().setAppName("Fraudulent Transactions Analysis")

# Create a SparkContext object
# sc = SparkContext(conf=conf)

In [None]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("Fraudulent Transactions Analysis")
sc = SparkContext.getOrCreate(conf=conf)

##Loading a dataset into RDD

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
dataset = sc.textFile("/content/drive/MyDrive/fraud_data.csv")

In [None]:
#eliminate=dataset.filter(lambda line: line != header )

In [None]:
transactions = dataset.map(lambda line: line.split(","))

In [None]:
transactions.count()

6362621

In [None]:
transactions.take(5)

[['step',
  'type',
  'amount',
  'nameOrig',
  'oldbalanceOrg',
  'newbalanceOrig',
  'nameDest',
  'oldbalanceDest',
  'newbalanceDest',
  'isFraud',
  'isFlaggedFraud'],
 ['1',
  'PAYMENT',
  '9839.64',
  'C1231006815',
  '170136.0',
  '160296.36',
  'M1979787155',
  '0.0',
  '0.0',
  '0',
  '0'],
 ['1',
  'PAYMENT',
  '1864.28',
  'C1666544295',
  '21249.0',
  '19384.72',
  'M2044282225',
  '0.0',
  '0.0',
  '0',
  '0'],
 ['1',
  'TRANSFER',
  '181.0',
  'C1305486145',
  '181.0',
  '0.0',
  'C553264065',
  '0.0',
  '0.0',
  '1',
  '0'],
 ['1',
  'CASH_OUT',
  '181.0',
  'C840083671',
  '181.0',
  '0.0',
  'C38997010',
  '21182.0',
  '0.0',
  '1',
  '0']]

In [None]:
header = transactions.first()

In [None]:
new_rdd = transactions.filter(lambda x: x != header)

In [None]:
new_rdd.take(5)

[['1',
  'PAYMENT',
  '9839.64',
  'C1231006815',
  '170136.0',
  '160296.36',
  'M1979787155',
  '0.0',
  '0.0',
  '0',
  '0'],
 ['1',
  'PAYMENT',
  '1864.28',
  'C1666544295',
  '21249.0',
  '19384.72',
  'M2044282225',
  '0.0',
  '0.0',
  '0',
  '0'],
 ['1',
  'TRANSFER',
  '181.0',
  'C1305486145',
  '181.0',
  '0.0',
  'C553264065',
  '0.0',
  '0.0',
  '1',
  '0'],
 ['1',
  'CASH_OUT',
  '181.0',
  'C840083671',
  '181.0',
  '0.0',
  'C38997010',
  '21182.0',
  '0.0',
  '1',
  '0'],
 ['1',
  'PAYMENT',
  '11668.14',
  'C2048537720',
  '41554.0',
  '29885.86',
  'M1230701703',
  '0.0',
  '0.0',
  '0',
  '0']]

In [None]:
dataset.take(5)

['step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud',
 '1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0',
 '1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0',
 '1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0',
 '1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0']

###Creating new RDD with datatype

In [None]:
header = dataset.first()


In [None]:
rdd = dataset.filter(lambda row: row != header)
rdd = rdd.map(lambda row: row.split(","))

#SPARK-SQL 

In [None]:
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, LongType

In [None]:
schema2 = StructType().add("step",IntegerType(),True).add("type",StringType(),True).add("amount",DoubleType(),True).add("origin",StringType(),True).add("sender_old_balance",DoubleType(),True).add("sender_new_balance",DoubleType(),True).add("destination",StringType(),True).add("receiver_old_balance",DoubleType(),True).add("receiver_new_balance",DoubleType(),True).add("isfraud",IntegerType(),True).add("isFlaggedFraud",IntegerType(),True)

In [None]:
df_with_schema2 = spark.read.format("csv").option("header", "True").schema(schema2).load("/content/drive/MyDrive/fraud_data.csv")

In [None]:
df_with_schema2.registerTempTable("Online_fraud")

In [None]:
q=spark.sql('select * from Online_fraud limit 10')
q.show()

+----+--------+--------+-----------+------------------+------------------+-----------+--------------------+--------------------+-------+--------------+
|step|    type|  amount|     origin|sender_old_balance|sender_new_balance|destination|receiver_old_balance|receiver_new_balance|isfraud|isFlaggedFraud|
+----+--------+--------+-----------+------------------+------------------+-----------+--------------------+--------------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|          170136.0|         160296.36|M1979787155|                 0.0|                 0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|           21249.0|          19384.72|M2044282225|                 0.0|                 0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|             181.0|               0.0| C553264065|                 0.0|                 0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|             181.0|               0.0|  C38997010|  

In [None]:
rdd = rdd.map(lambda row: (int(row[0]), row[1], float(row[2]), row[3], float(row[4]), float(row[5]), row[6], float(row[7]), float(row[8]), int(row[9]), int(row[10])))

In [None]:
data_types = rdd.map(lambda x: type(x)).distinct().collect()
print(data_types)

[<class 'tuple'>]


##SPARK-SQL---trial queries

In [None]:
df = spark.read.format("csv").option("header", "true").load("/content/drive/MyDrive/fraud_data.csv")

In [None]:
df.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)



In [None]:
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

#PANDAS

In [None]:
import pandas as pd
df = pd.read_csv('/content/drive/MyDrive/fraud_data.csv')

In [None]:
df.head(5)

Unnamed: 0,step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
0,1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0
1,1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0
2,1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0
3,1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0
4,1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0


In [None]:
df.columns

Index(['step', 'type', 'amount', 'nameOrig', 'oldbalanceOrg', 'newbalanceOrig',
       'nameDest', 'oldbalanceDest', 'newbalanceDest', 'isFraud',
       'isFlaggedFraud'],
      dtype='object')

#QUEURIES AND ITS SOLUTION IN 3 WAYS

In [None]:
# 1.What is the maximum amount that was transferred in a single transaction  from and to  to which account?
# 2.What is the maximum amount that was transferred in a single transaction  from and to  to which account in case of fraud as flag?
# 3. How many customers initiated transactions that were flagged as fraudulent?
# 4. What is the total amount of money lost due to fraudulent activity?
# 5.Which type of transaction has the highest percentage of fraudulent and Non-Fraudulent activity?
# 6. How many transactions involved customers with negative balances before the transaction?
# 7. What is the maximum amount of money transferred in a single transaction that was not flagged as fraudulent?
#8. How many transactions were processed in each step?
#9 .which name dest is credited with most amount in case of fraud?
# 10.top 10 destaccount which has maximum amount in case of fraud ?

##Q 1.What is the maximum amount that was transferred in a single transaction  from and to  to which account?

####Using SparkSQL

In [None]:
q1=spark.sql('select origin,destination,amount as maximum_amt_in_single_trans from Online_fraud order by amount desc limit 1')
q1.show()

+-----------+-----------+---------------------------+
|     origin|destination|maximum_amt_in_single_trans|
+-----------+-----------+---------------------------+
|C1715283297| C439737079|              9.244551664E7|
+-----------+-----------+---------------------------+



#### Using RDD

In [None]:
# Sort the RDD by amount in descending order and take the first row
max_amount_row = rdd.takeOrdered(1, key=lambda row: -float(row[2]))[0]

# Extract the relevant columns from the row
origin = max_amount_row[3]
destination = max_amount_row[6]
max_amount = max_amount_row[2]

# Print the result
print(f"Origin: {origin}, Destination: {destination}, Maximum Amount Transferred: {max_amount}")

Origin: C1715283297, Destination: C439737079, Maximum Amount Transferred: 92445516.64


####Using Pandas

In [None]:
#SOLUTION BY PANDAS DATAFRAME

q1 = df[['nameOrig', 'nameDest', 'amount']].sort_values('amount', ascending=False).head(1)
q1.columns = ['Origin', 'Destination', 'Maximum Amount Transferred']
print(q1)

              Origin Destination  Maximum Amount Transferred
3686583  C1715283297  C439737079                 92445516.64


##Q2.What is the maximum amount that was transferred in a single transaction in case of fraud?

####USING SPARK-SQL

In [None]:
#Sql Query
# 1.What is the maximum amount that was transferred in a single transaction in case of fraud?
q1=spark.sql('select amount,origin,destination as maximum_amt_in_single_trans from Online_fraud where isFraud=1 order by amount desc limit 1')
q1.show()

+------+--------+---------------------------+
|amount|  origin|maximum_amt_in_single_trans|
+------+--------+---------------------------+
| 1.0E7|C7162498|                 C945327594|
+------+--------+---------------------------+



####USING PYSPARK

In [None]:
#in this problem when i used 'transactions' rdd which contains header & i apply max() on ammount field i got the max_amount='ammount'
#so to overcome this i have created 'new_rdd' where i exclude the header.

In [None]:
fraudulent_transactions = rdd.filter(lambda x: x[9] == 1)

In [None]:
max_amount_transferred = fraudulent_transactions.map(lambda x: x[2]).max()

In [None]:
count_of_max_amount_fraudulent_transactions = fraudulent_transactions.filter(lambda x: x[2] == max_amount_transferred).count()

In [None]:
print("Maximum amount transferred in a single transaction:", max_amount_transferred)

Maximum amount transferred in a single transaction: 10000000.0


In [None]:
print("Number of fraudulent transactions with the maximum amount transferred:", count_of_max_amount_fraudulent_transactions)

Number of fraudulent transactions with the maximum amount transferred: 287


####USING PANDAS

In [None]:
max_amount_transferred = df[df['isFraud']==1]
max_amount_transferred = max_amount_transferred['amount'].max()
max_amount_transferred

10000000.0

In [None]:
max_amount_transferred = df[df['isFraud']==1]
max_amount_transferred = max_amount_transferred[['nameOrig', 'nameDest', 'amount']].sort_values('amount', ascending=False).head(1)
max_amount_transferred

Unnamed: 0,nameOrig,nameDest,amount
3760283,C1214015158,C2110157840,10000000.0


In [None]:
print("Maximum amount transferred in a single transaction:", max_amount_transferred)

Maximum amount transferred in a single transaction:             nameOrig     nameDest      amount
3760283  C1214015158  C2110157840  10000000.0


In [None]:
# num_fraudulent_max = df[df['amount'] == max_amount_transferred]
# num_fraudulent_max

In [None]:
# print("Number of transactions flagged as fraudulent with maximum amount:", num_fraudulent_max)

##Q3. How many customers initiated transactions that were flagged as fraudulent?

####USING SPARK-SQL

In [None]:
# 2. How many customers initiated transactions that were flagged as fraudulent?
q2=spark.sql('select count(origin) as flaged_fraud from online_fraud where isfraud=1')
q2.show()

+------------+
|flaged_fraud|
+------------+
|        8213|
+------------+



####USING PYSPARK

In [None]:
fraud_rdd= transactions.filter(lambda x: x[9] == '1')

In [None]:
customer_count = fraud_rdd.map(lambda x: x[3]).distinct().count()
customer_count

8213

In [None]:
print("Number of customers who initiated fraudulent transactions:", customer_count)

Number of customers who initiated fraudulent transactions: 8213


####USING PANDAS

In [None]:
fraud_transactions_by_customer = df[df['isFraud'] == 1].groupby('nameOrig')['isFraud'].count()
fraud_transactions_by_customer

nameOrig
C1000036340    1
C1000086512    1
C1000331499    1
C1000484178    1
C1000513158    1
              ..
C998715487     1
C998785780     1
C998822926     1
C999561448     1
C99979309      1
Name: isFraud, Length: 8213, dtype: int64

In [None]:
num_fraud_customers =len(fraud_transactions_by_customer)
num_fraud_customers

8213

##Q4. What is the total amount of money lost due to fraudulent activity?

####USING SPARK-SQL

In [None]:

q3=spark.sql('SELECT SUM(amount) as total_lost FROM online_fraud WHERE isFraud = 1') #1 lakh 20 thousand crore
q3.show()

+-----------------+
|       total_lost|
+-----------------+
|1.205641542784E10|
+-----------------+



####USING PYSPARK

In [None]:
fraud_rdd = rdd.filter(lambda row: row[9] == 1)

In [None]:
amount_rdd = fraud_rdd.map(lambda row: row[2])

In [None]:
total_amount = amount_rdd.sum()

In [None]:
total_amount

12056415427.839998

####USING PANDAS

In [None]:
# Filter the DataFrame to keep only the rows where isFraud is equal to 1
fraud_df = df[df['isFraud'] == 1]

In [None]:
# Select the amount column from the resulting DataFrame
amount_series = fraud_df['amount']

In [None]:
# Calculate the total amount
total_amount = amount_series.sum()
print("Total amount of money lost due to fraudulent activity: ", total_amount)

Total amount of money lost due to fraudulent activity:  12056415427.839998


##Q 5.Which type of transaction has the highest percentage of fraudulent activity?

####USING SPARK-SQL

In [None]:

q4=spark.sql('SELECT type, COUNT(*) as num_transactions, COUNT(CASE WHEN isFraud = 1 THEN 1 END) as num_fraudulent_transactions, (COUNT(CASE WHEN isFraud = 1 THEN 1 END) / COUNT(*))*100 as fraud_percentage FROM online_fraud GROUP BY type ORDER BY fraud_percentage DESC ;')
q4.show()

+--------+----------------+---------------------------+-------------------+
|    type|num_transactions|num_fraudulent_transactions|   fraud_percentage|
+--------+----------------+---------------------------+-------------------+
|TRANSFER|          532909|                       4097| 0.7687991758442811|
|CASH_OUT|         2237500|                       4116|0.18395530726256984|
| CASH_IN|         1399284|                          0|                0.0|
| PAYMENT|         2151495|                          0|                0.0|
|   DEBIT|           41432|                          0|                0.0|
+--------+----------------+---------------------------+-------------------+



####USING PYSPARK

In [None]:
grouped_rdd = rdd.map(lambda row: (row[1], (row[9], 1)))

In [None]:
fraud_counts = grouped_rdd.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))

In [None]:
fraud_percentages = fraud_counts.mapValues(lambda x: (x[0] / x[1]) * 100)

In [None]:
max_fraud_type = fraud_percentages.max(lambda x: x[1])

In [None]:
max_fraud_type

('TRANSFER', 0.7687991758442811)

####USING PANDAS

In [None]:
# Group the DataFrame by the type column and calculate the total count of transactions and the count of fraudulent transactions for each type
grouped_df = df.groupby('type')['isFraud'].agg(['count', 'sum'])

In [None]:
# Calculate the percentage of fraudulent transactions for each transaction type
grouped_df['fraud_percentage'] = (grouped_df['sum'] / grouped_df['count']) * 100
grouped_df['fraud_percentage']

type
CASH_IN     0.000000
CASH_OUT    0.183955
DEBIT       0.000000
PAYMENT     0.000000
TRANSFER    0.768799
Name: fraud_percentage, dtype: float64

In [None]:
# Find the transaction type with the highest percentage of fraudulent transactions
max_fraud_type = grouped_df['fraud_percentage'].idxmax()
max_fraud_type

'TRANSFER'

##Q 6. How many transactions involved customers with negative balances before the transaction?

####USING SPARK-SQL

In [None]:

q5=spark.sql('SELECT COUNT(*) as num_transactions FROM online_fraud WHERE sender_old_balance < 0 OR receiver_old_balance < 0')
q5.show()

+----------------+
|num_transactions|
+----------------+
|               0|
+----------------+



####USING PYSPARK

In [None]:
filtered_rdd = rdd.filter(lambda row: float(row[4]) < 0 or float(row[7]) < 0)

# Count the number of rows
num_transactions = filtered_rdd.count()
print(num_transactions)

0


####USING PANDAS

In [None]:
# Filter the DataFrame to keep only the rows where the oldbalanceOrg column is negative
negative_bal_df = df[df['oldbalanceOrg'] < 0]

In [None]:
# Select the nameOrig column from the resulting DataFrame and get a list of unique customer names
unique_customers = negative_bal_df['nameOrig'].unique()

In [None]:
# Filter the original DataFrame again to keep only the rows where the nameOrig column is in the list of unique customer names
negative_bal_transactions_df = df[df['nameOrig'].isin(unique_customers)]

In [None]:
# Calculate the total count of transactions
total_count = negative_bal_transactions_df.shape[0]

print("Number of transactions involving customers with negative balances before the transaction: ", total_count)

Number of transactions involving customers with negative balances before the transaction:  0


##Q 7. What is the maximum amount of money transferred in a single transaction that was not flagged as fraudulent?

####USING SPARK-SQL

In [None]:
q8=spark.sql("SELECT MAX(amount) AS max_amount FROM online_fraud WHERE isFraud = 0")
q8.show()

+-------------+
|   max_amount|
+-------------+
|9.244551664E7|
+-------------+



####USING Pandas

In [None]:
max_amount = df.loc[df['isFraud'] == 0, 'amount'].max()

print(max_amount)

92445516.64


####USING SPARK-RDD

In [None]:
max_amount = rdd.filter(lambda row: int(row[9]) == 0).map(lambda row: float(row[2])).max()
print(max_amount)

92445516.64


## Q8. How many transactions were processed in each step?

####SparkSql

In [None]:
q9=spark.sql("SELECT step, COUNT(*) AS num_transaction FROM online_fraud GROUP BY step order by count(*) desc")
q9.show(20)

+----+---------------+
|step|num_transaction|
+----+---------------+
|  19|          51352|
|  18|          49579|
| 187|          49083|
| 235|          47491|
| 307|          46968|
| 163|          46352|
| 139|          46054|
| 403|          45155|
|  43|          45060|
| 355|          44787|
|  15|          44609|
| 186|          43747|
| 306|          43615|
|  17|          43361|
| 259|          43328|
|  16|          42471|
| 379|          41759|
|  14|          41485|
|  42|          41304|
| 354|          40696|
+----+---------------+
only showing top 20 rows



####Using Pandas

In [None]:
#SOLUTION BY PANDAS DATAFRAME
step_counts = df.groupby('step')['step'].count().reset_index(name='num_transaction').sort_values(by=['num_transaction'], ascending=False)

print(step_counts.head(20))

     step  num_transaction
18     19            51352
17     18            49579
186   187            49083
234   235            47491
306   307            46968
162   163            46352
138   139            46054
402   403            45155
42     43            45060
354   355            44787
14     15            44609
185   186            43747
305   306            43615
16     17            43361
258   259            43328
15     16            42471
378   379            41759
13     14            41485
41     42            41304
353   354            40696


####Using RDD

In [None]:
 # Create an RDD with (step, 1) tuples
step_rdd = rdd.map(lambda row: (int(row[0]), 1))

# Group the RDD by step and count the number of transactions for each step
step_counts_rdd = step_rdd.reduceByKey(lambda x, y: x + y)

# Sort the RDD by the number of transactions in descending order
step_counts_rdd = step_counts_rdd.map(lambda x: (x[1], x[0])).sortByKey(False)

# Take the top 20 steps with the highest number of transactions
step_counts = step_counts_rdd.take(20)

# Print the result
print(step_counts)

[(51352, 19), (49579, 18), (49083, 187), (47491, 235), (46968, 307), (46352, 163), (46054, 139), (45155, 403), (45060, 43), (44787, 355), (44609, 15), (43747, 186), (43615, 306), (43361, 17), (43328, 259), (42471, 16), (41759, 379), (41485, 14), (41304, 42), (40696, 354)]


####Using Sparksql

In [None]:
q10=spark.sql("SELECT destination, COUNT(*) AS num_fraudulent FROM online_fraud WHERE isFraud = 1 GROUP BY destination ORDER BY num_fraudulent DESC LIMIT 1")
q10.show()

+-----------+--------------+
|destination|num_fraudulent|
+-----------+--------------+
| C964377943|             2|
+-----------+--------------+



####Using Pandas

In [None]:
#SOLUTION BY PANDAS DATAFRAME
fraudulent_destinations = df.loc[df['isFraud'] == 1].groupby('nameDest')['nameDest'].count().reset_index(name='num_fraudulent').sort_values(by=['num_fraudulent'], ascending=False)

max_fraudulent_destination = fraudulent_destinations.iloc[0]

print(max_fraudulent_destination)

nameDest          C2020337583
num_fraudulent              2
Name: 4244, dtype: object


####Using PYSPARK

In [None]:
# Filter the RDD to keep only the fraudulent transactions
fraudulent_rdd = rdd.filter(lambda row: int(row[9]) == 1)

# Group by the destination column and count the number of fraudulent transactions for each destination
grouped_rdd = fraudulent_rdd.map(lambda row: (row[6], 1)).reduceByKey(lambda x, y: x + y)

# Sort the results by the number of fraudulent transactions in descending order and take the first row
result_rdd = grouped_rdd.sortBy(lambda x: x[1], ascending=False).take(1)

# Print the result
print(result_rdd)

[('C1325541393', 2)]


## Q9 .which account is credited with most amount in case of fraud?


####Using Spark SQl

In [None]:
q11=spark.sql("SELECT destination, sum(amount) AS num_fraudulent FROM online_fraud WHERE isFraud = 1 GROUP BY destination ORDER BY num_fraudulent DESC LIMIT 1")
q11.show()

+-----------+--------------+
|destination|num_fraudulent|
+-----------+--------------+
| C668046170| 1.016008868E7|
+-----------+--------------+



####Using RDD

In [None]:
# Filter the RDD to select only the fraudulent transactions
fraudulent_rdd = rdd.filter(lambda row: int(row[9]) == 1)

# Group the fraudulent transactions by destination and sum the amounts
fraudulent_destinations_rdd = fraudulent_rdd.map(lambda row: (row[6], float(row[2])))
fraudulent_amounts_by_destination_rdd = fraudulent_destinations_rdd.reduceByKey(lambda a, b: a + b)

# Find the destination with the highest total fraudulent amount
destination_with_highest_fraudulent_amount = fraudulent_amounts_by_destination_rdd.takeOrdered(1, lambda x: -x[1])

# Print the result
print(destination_with_highest_fraudulent_amount)

[('C668046170', 10160088.68)]


####Using Pandas

In [None]:
#SOLUTION BY PANDAS DATAFRAME
fraudulent_destinations = df.loc[df['isFraud'] == 1].groupby('nameDest')['amount'].sum().reset_index(name='num_fraudulent').sort_values(by=['num_fraudulent'], ascending=False)

max_fraudulent_destination = fraudulent_destinations.iloc[0]

print(max_fraudulent_destination)

nameDest           C668046170
num_fraudulent    10160088.68
Name: 6773, dtype: object


## Q10.top 10 destaccount which has maximum amount in case of fraud ?


####Using SparkSQL

In [None]:
q12=spark.sql("SELECT destination, sum(amount) AS num_fraudulent FROM online_fraud WHERE isFraud = 1 GROUP BY destination ORDER BY num_fraudulent DESC LIMIT 10")
q12.show()

+-----------+--------------+
|destination|num_fraudulent|
+-----------+--------------+
| C668046170| 1.016008868E7|
| C704752055|         1.0E7|
| C242007270|         1.0E7|
|C1622860679|         1.0E7|
| C709815552|         1.0E7|
|C1423246212|         1.0E7|
|C1865083017|         1.0E7|
|C1806199534|         1.0E7|
|C1614528665|         1.0E7|
|C2065262017|         1.0E7|
+-----------+--------------+



#### Using RDD

In [None]:
fraudulent_rdd = rdd.filter(lambda row: int(row[9]) == 1)

# Group the fraudulent transactions by destination and sum the amounts
fraudulent_destinations_rdd = fraudulent_rdd.map(lambda row: (row[6], float(row[2])))
fraudulent_amounts_by_destination_rdd = fraudulent_destinations_rdd.reduceByKey(lambda a, b: a + b)

# Sort the destinations by descending order of fraudulent amount and take the top 10
top_10_fraudulent_destinations = fraudulent_amounts_by_destination_rdd.takeOrdered(10, lambda x: -x[1])

# Print the result
for dest, total_fraudulent_amount in top_10_fraudulent_destinations:
    print(f"{dest}: {total_fraudulent_amount}")

C668046170: 10160088.68
C1917849910: 10000000.0
C2055653883: 10000000.0
C1853789265: 10000000.0
C2029456603: 10000000.0
C709815552: 10000000.0
C120368346: 10000000.0
C1036214683: 10000000.0
C839316822: 10000000.0
C606914329: 10000000.0


####Using Pandas

In [None]:
Top_fraudulent_destinations = df.loc[df['isFraud'] == 1].groupby('nameDest')['amount'].sum().reset_index(name='num_fraudulent').sort_values(by=['num_fraudulent'], ascending=False)

Top_10_fraudulent_destinations =Top_fraudulent_destinations.iloc[0:9]

print(Top_10_fraudulent_destinations)

         nameDest  num_fraudulent
6773   C668046170     10160088.68
2602  C1630566944     10000000.00
6764   C666339947     10000000.00
2436  C1590217660     10000000.00
2453  C1595458981     10000000.00
252   C1056895901     10000000.00
6694   C650095152     10000000.00
2539  C1614528665     10000000.00
2570  C1622860679     10000000.00
