# Подготовка для работы

In [1]:
import os
os.environ["SPARK_VERSION"] = "3.3"

import pydeequ
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import *


In [2]:
spark = SparkSession.builder \
.config("spark.executor.instances", 7) \
.config("spark.driver.memory", "4g") \
.config("spark.driver.cores", "1") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memory", "10g") \
.config("spark.jars.packages", pydeequ.deequ_maven_coord) \
.config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
.getOrCreate()

hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.endpoint", "https://storage.yandexcloud.net")
hadoopConf.set("fs.s3a.access.key", "***")
hadoopConf.set("fs.s3a.secret.key", "***")
hadoopConf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.0')
hadoopConf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]


:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1e24638b-4b7e-4ad0-a348-e4f96b187ddb;1.0
	confs: [default]
	found com.amazon.deequ#deequ;2.0.7-spark-3.3 in central
	found org.scala-lang#scala-reflect;2.12.10 in central
	found org.scalanlp#breeze_2.12;0.13.2 in central
	found org.scalanlp#breeze-macros_2.12;0.13.2 in central
	found com.github.fommil.netlib#core;1.1.2 in central
	found net.sf.opencsv#opencsv;2.3 in central
	found com.github.rwl#jtransforms;2.4.0 in central
	found junit#junit;4.8.2 in central
	found org.apache.commons#commons-math3;3.2 in central
	found org.spire-math#spire_2.12;0.13.0 in central
	found org.spire-math#spire-macros_2.12;0.13.0 in central
	found org.typelevel#machinist_2.12;0.6.1 in central
	found com.chuusai#shapeless_2.12;2.3.2 in central
	found org.typelevel#macro-compat_2.12;1.1.1 

24/06/15 11:07:29 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/com.amazon.deequ_deequ-2.0.7-spark-3.3.jar added multiple times to distributed cache.
24/06/15 11:07:29 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/org.scala-lang_scala-reflect-2.12.10.jar added multiple times to distributed cache.
24/06/15 11:07:29 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/org.scalanlp_breeze_2.12-0.13.2.jar added multiple times to distributed cache.
24/06/15 11:07:29 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/org.scalanlp_breeze-macros_2.12-0.13.2.jar added multiple times to distributed cache.
24/06/15 11:07:29 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/com.github.fommil.netlib_core-1.1.2.jar added multiple times to distributed cache.
24/06/15 11:07:29 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/net.sf.opencsv_opencsv-2.3.jar added multiple times to distributed cache.
24/06/15 11:07:29 WARN

In [3]:
def get_schema():
      schema = StructType() \
            .add("tranaction_id",IntegerType()) \
            .add("tx_datetime",StringType()) \
            .add("customer_id",IntegerType()) \
            .add("terminal_id",IntegerType()) \
            .add("tx_amount",DoubleType()) \
            .add("tx_time_seconds",IntegerType()) \
            .add("tx_time_days",IntegerType()) \
            .add("tx_fraud",IntegerType()) \
            .add("tx_fraud_scenario",IntegerType())
      return schema

def drop_nulls(df):
    clear_df = df.filter(df.tranaction_id.isNotNull()) \
    .filter(df.tx_datetime.isNotNull()) \
    .filter(df.customer_id.isNotNull()) \
    .filter(df.terminal_id.isNotNull()) \
    .filter(df.tx_amount.isNotNull()) \
    .filter(df.tx_time_seconds.isNotNull()) \
    .filter(df.tx_time_days.isNotNull()) \
    .filter(df.tx_fraud.isNotNull()) \
    .filter(df.tx_fraud_scenario.isNotNull())
    return clear_df

def drop_dublicates_by_tranaction_id(df):
    return df.dropDuplicates(['tranaction_id'])

def filter_zero_value_tranzation(df):
    return df.filter(df.tx_amount != 0.0)

def filter_negative_customer_id(df):
    return df.filter(df.customer_id > 0)

# Реализация

In [4]:
### Читаем файл по предварительно подготовленной схеме

schema = get_schema()
original_df = spark.read.options(delimiter=",").schema(schema).csv("s3a://mykochecnyuk-bucket/fraud-data/*.txt")
print("Original DF row count ", original_df.count())

Original DF row count 1879751273


In [5]:
### Фильтруем грязные данные 
df = drop_dublicates_by_tranaction_id(original_df)
df = drop_nulls(df)
print("DF No nulls row count ",df.count())
df = filter_zero_value_tranzation(df)
print("Df No zero Transatctions row count ",df.count())
df = filter_negative_customer_id(df)
print("DF No negtive customet IDS row count ",df.count())

                                                                                

DF No nulls row count  1879753826


                                                                                

Df No zero Transatctions row count  1879718566




DF No negtive customet IDS row count  1879701255


                                                                                

In [10]:
### Делим DF на 200 частей и сохраняем

df.coalesce(200).write.parquet("s3a://mykochecnyuk-bucket/cleaned_data.parquet", mode="overwrite")

                                                                                

# Опровергнутые гипотезы

In [9]:
### Расхождение формата строк с датами

df.count() - df.filter(df.tx_datetime.rlike("\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}")).count()

                                                                                

0

In [8]:
### Расхождение мошеннического сценария и факта мошенничества

df.select(["tx_fraud_scenario", "tx_fraud"]).filter(df.tx_fraud_scenario > 0).filter(df.tx_fraud == 0).count()

                                                                                

0