## **Proyecto - Base de datos de Big Data**

## **Descripción de la Base de datos**

Base de datos seleccionada: IBM Transactions for Anti-Money Laundering (AML), versión LI-Medium

La base de datos está compuesta por aproximadamente 31 millones de transacciones, contiene una menor proporción de casos positivos de lavado de dinero respecto a las otras versiones.

Se cuenta con información como: momento de la transacción, bancos y cuentas involucradas, montos ($) y monedas, formato y si la transacción se considera o no lavado de dinero.

# **Descripción de las reglas de particionamiento**

Variables de caracterización seleccionadas: Payment Currency y Payment Format.

Agrupación previa al particionamiento:
Payment Format:
* Online: ACH, Credit Card, Wire y Bitcoin
* Offline: Cheque, Cash y Reinvestment

Payment Currency:
* America: MXN, USD, BRL y CAD
* Others: se excluyen las monedas antes listadas

Reglas:
* *Part 1*: Payment Format = Online & Payment Currency = America
* *Part 2*: Payment Format = Online & Payment Currency = Others
* *Part 3*: Payment Format = Offline & Payment Currency = America
* *Part 4*: Payment Format = Offline & Payment Currency = Others

# **Cargando el entorno de PySpark en Google Colab**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession

#spark = SparkSession.builder.master("local[*]").getOrCreate()
spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("Tarea_Equipo46") \
                    .config("spark.driver.memory", "8g") \
                    .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark
spark

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
import numpy as np
import pandas as pd
from pyspark.sql.functions import count, when, isnan, col
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import mean, stddev, min, max

In [None]:
df = spark.read.csv("/content/drive/MyDrive/BigData/LI-Medium_Trans.csv",
                    header=True, sep=",", inferSchema=True)

# **Particionamiento**

In [None]:
Payment_Format_Online = ["ACH","Credit Card","Wire","Bitcoin"]
Payment_Format_Offline = ["Cheque","Cash","Reinvestment"]

Payment_Currency_America = ["Mexican Peso", "US Dollar", "Brazil Real", "Canadian Dollar"]
Payment_Currency_Others = ["Yen", "UK Pound", "Australian Dollar","Saudi Riyal", "Shekel","Yuan","Euro","Rupee","Swiss Franc","Bitcoin","Ruble"]

In [None]:
from pyspark.sql.functions import col

# Payment Format = Online y Payment Currency = America
part_1 = df.filter((col("Payment Format").isin(Payment_Format_Online)) & (col("Payment Currency").isin(Payment_Currency_America)))
print("Partición 1: Online y America")
part_1.show()

# Payment Format = Online y Payment Currency = Others
part_2 = df.filter((col("Payment Format").isin(Payment_Format_Online)) & (col("Payment Currency").isin(Payment_Currency_Others)))
print("Partición 2: Online y Resto del Mundo")
part_2.show()

# Payment Format = Offline y Payment Currency = America
part_3 = df.filter((col("Payment Format").isin(Payment_Format_Offline)) & (col("Payment Currency").isin(Payment_Currency_America)))
print("Partición 3: Offline y America")
part_3.show()

# Payment Format = Offline y Payment Currency = Others
part_4 = df.filter((col("Payment Format").isin(Payment_Format_Offline)) & (col("Payment Currency").isin(Payment_Currency_Others)))
print("Partición 4: Offline y Resto del Mundo")
part_4.show()


# Verificación de Submuestras
count_part_1 = part_1.count()
print(f"Número de filas en part_1: {count_part_1}")

count_part_2 = part_2.count()
print(f"Número de filas en part_2: {count_part_2}")

count_part_3 = part_3.count()
print(f"Número de filas en part_3: {count_part_3}")

count_part_4 = part_4.count()
print(f"Número de filas en part_4: {count_part_4}")

Partición 1: Online y America
+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|       Timestamp|From Bank| Account2|To Bank| Account4|Amount Received|Receiving Currency|Amount Paid|Payment Currency|Payment Format|Is Laundering|
+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|2022/09/01 00:15|       20|80010E6F0|  27365|8084250A0|         137.72|         US Dollar|     137.72|       US Dollar|   Credit Card|            0|
|2022/09/01 00:05|       20|80010EA30|  18234|82BC4CE30|           1.24|         US Dollar|       1.24|       US Dollar|   Credit Card|            0|
|2022/09/01 00:21|       11|8003289F0| 237264|810BA3240|           5.28|         US Dollar|       5.28|       US Dollar|   Credit Card|            0|
|2022/09/01 00:11|     2776|800816450|1190218|8486D8D10|          13.2

# **Submuestreo**

In [None]:
from pyspark.sql.functions import rand

fraccion_de_muestra = 0.2

# 'False' = el muestreo se realiza sin reemplazo.
submuestra_1 = part_1.sample(False, fraccion_de_muestra, seed=123)  # seed para reproducibilidad

submuestra_1.show()

tamaño_submuestra_1 = submuestra_1.count()
print("Tamaño de la submuestra para Online y America:", tamaño_submuestra_1)

+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|       Timestamp|From Bank| Account2|To Bank| Account4|Amount Received|Receiving Currency|Amount Paid|Payment Currency|Payment Format|Is Laundering|
+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|2022/09/01 00:15|       20|80010E6F0|  27365|8084250A0|         137.72|         US Dollar|     137.72|       US Dollar|   Credit Card|            0|
|2022/09/01 00:26|     1601|800970C80|  26669|80EE08950|         133.98|         US Dollar|     133.98|       US Dollar|   Credit Card|            0|
|2022/09/01 00:19|     1924|800AD8140|  17200|83F5A3DD0|         250.23|         US Dollar|     250.23|       US Dollar|   Credit Card|            0|
|2022/09/01 00:01|      741|800ADCDB0| 234135|80FA463E0|           7.64|         US Dollar|       7.

In [None]:
submuestra_2 = part_2.sample(False, fraccion_de_muestra, seed=123)
submuestra_2.show()
tamaño_submuestra_2 = submuestra_2.count()
print("Tamaño de la submuestra para Online y Resto del Mundo:", tamaño_submuestra_2)

+----------------+---------+---------+-------+---------+---------------+------------------+--------------+----------------+--------------+-------------+
|       Timestamp|From Bank| Account2|To Bank| Account4|Amount Received|Receiving Currency|   Amount Paid|Payment Currency|Payment Format|Is Laundering|
+----------------+---------+---------+-------+---------+---------------+------------------+--------------+----------------+--------------+-------------+
|2022/09/01 00:11|    27669|8043475E0| 119043|824C90D30|           5.82|              Euro|          5.82|            Euro|   Credit Card|            0|
|2022/09/01 00:25|     1601|800F66E21|   2589|801352771|       0.150633|           Bitcoin|      0.150633|         Bitcoin|       Bitcoin|            0|
|2022/09/01 00:06|    18328|8041E5C70| 292826|82BCBA740|          35.21|              Euro|         35.21|            Euro|   Credit Card|            0|
|2022/09/01 00:14|     1831|8011ED901| 113503|805B64B21|       3.425702|          

In [None]:
submuestra_3 = part_3.sample(False, fraccion_de_muestra, seed=123)
submuestra_3.show()
tamaño_submuestra_3 = submuestra_3.count()
print("Tamaño de la submuestra para Offline y America:", tamaño_submuestra_3)

+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|       Timestamp|From Bank| Account2|To Bank| Account4|Amount Received|Receiving Currency|Amount Paid|Payment Currency|Payment Format|Is Laundering|
+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------+
|2022/09/01 00:15|       20|800104D70|     20|800104D70|        8095.07|         US Dollar|    8095.07|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:27|       20|800104D20|     20|800104D20|           9.72|         US Dollar|       9.72|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:22|     1924|8009B6B00|   1924|8009B6B00|        1704.89|         US Dollar|    1704.89|       US Dollar|  Reinvestment|            0|
|2022/09/01 00:22|       20|800AD58D0|     20|800AD58D0|     1573418.17|         US Dollar| 1573418.

In [None]:
submuestra_4 = part_4.sample(False, fraccion_de_muestra, seed=123)
submuestra_4.show()
tamaño_submuestra_4 = submuestra_4.count()
print("Tamaño de la submuestra para Offline y Resto del Mundo:", tamaño_submuestra_4)

+----------------+---------+---------+-------+---------+---------------+------------------+-----------+-----------------+--------------+-------------+
|       Timestamp|From Bank| Account2|To Bank| Account4|Amount Received|Receiving Currency|Amount Paid| Payment Currency|Payment Format|Is Laundering|
+----------------+---------+---------+-------+---------+---------------+------------------+-----------+-----------------+--------------+-------------+
|2022/09/01 00:21|    33605|80251F890|  33605|80251F890|      131510.97|              Yuan|  131510.97|             Yuan|  Reinvestment|            0|
|2022/09/01 00:07|    36214|807585840|  36214|807585840|       37746.85|              Yuan|   37746.85|             Yuan|  Reinvestment|            0|
|2022/09/01 00:01|   343558|8109A96A0| 343558|8109A96A0|        30278.1|             Rupee|    30278.1|            Rupee|  Reinvestment|            0|
|2022/09/01 00:22|   316869|815DFF590| 316869|815DFF590|         351.67|              Euro|   