In [1]:
#!pip install pyspark

In [2]:
#!pip install findspark

In [1]:
%pylab inline
import pandas as pd
import numpy as np

Populating the interactive namespace from numpy and matplotlib


In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql.functions import min, max, count, when, isnan, col, countDistinct, hour, minute, second
from pyspark.sql.types import TimestampType, DoubleType, IntegerType, StringType, StructType, StructField

In [4]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("OTUS")
        .config("spark.dynamicAllocation.enabled", "true")
        .config("spark.executor.memory", "2g")
        .config("spark.driver.memory", "1g")
        .getOrCreate()
)

In [7]:
#посмотреть содержание папок hdfs
! hdfs dfs -ls

Found 41 items
drwxr-xr-x   - ubuntu hadoop          0 2023-05-28 11:37 .sparkStaging
-rw-r--r--   1 ubuntu hadoop 2807409271 2023-05-18 09:28 2019-08-22.txt
-rw-r--r--   1 ubuntu hadoop 2854479008 2023-05-18 09:28 2019-09-21.txt
-rw-r--r--   1 ubuntu hadoop 2895460543 2023-05-18 09:27 2019-10-21.txt
-rw-r--r--   1 ubuntu hadoop 2939120942 2023-05-18 09:28 2019-11-20.txt
-rw-r--r--   1 ubuntu hadoop 2995462277 2023-05-18 09:26 2019-12-20.txt
-rw-r--r--   1 ubuntu hadoop 2994906767 2023-05-18 09:28 2020-01-19.txt
-rw-r--r--   1 ubuntu hadoop 2995431240 2023-05-18 09:29 2020-02-18.txt
-rw-r--r--   1 ubuntu hadoop 2995176166 2023-05-18 09:26 2020-03-19.txt
-rw-r--r--   1 ubuntu hadoop 2996034632 2023-05-18 09:25 2020-04-18.txt
-rw-r--r--   1 ubuntu hadoop 2995666965 2023-05-18 09:29 2020-05-18.txt
-rw-r--r--   1 ubuntu hadoop 2994699401 2023-05-18 09:29 2020-06-17.txt
-rw-r--r--   1 ubuntu hadoop 2995810010 2023-05-18 09:28 2020-07-17.txt
-rw-r--r--   1 ubuntu hadoop 2995995

In [8]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)  # to pretty print pyspark.DataFrame in jupyter

### EDA на выборке

In [9]:
data = spark.read.text("2019-09-21.txt")

In [10]:
data.show(10)

+--------------------+
|               value|
+--------------------+
|# tranaction_id |...|
|46988237,2019-09-...|
|46988238,2019-09-...|
|46988239,2019-09-...|
|46988240,2019-09-...|
|46988241,2019-09-...|
|46988242,2019-09-...|
|46988243,2019-09-...|
|46988244,2019-09-...|
|46988245,2019-09-...|
+--------------------+
only showing top 10 rows



In [11]:
#extract first row as this is our header
header=data.first()[0]
schema=header.split(" | ")

In [12]:
print(schema)

['# tranaction_id', 'tx_datetime', 'customer_id', 'terminal_id', 'tx_amount', 'tx_time_seconds', 'tx_time_days', 'tx_fraud', 'tx_fraud_scenario']


In [13]:
#filter the header, separate the columns and apply the schema
df_new = data.filter(data["value"] != header).rdd.map(lambda x:x[0].split(",")).toDF(schema)
df_new.show()

+---------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|# tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+---------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|       46988237|2019-09-21 09:45:59|          1|        178|    83.11|        2627159|          30|       0|                0|
|       46988238|2019-09-21 19:33:01|          2|        660|    22.15|        2662381|          30|       0|                0|
|       46988239|2019-09-21 18:06:19|          3|        732|    36.83|        2657179|          30|       0|                0|
|       46988240|2019-09-21 16:56:01|         10|        663|    19.30|        2652961|          30|       0|                0|
|       46988241|2019-09-21 05:34:26|         10|        145|   106.51|        2612066|          30|    

In [14]:
df_new.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df_new.columns[2:]])

customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
0,778,0,0,0,0,0


#### 1. Data-Type Constraints – e.g., values in a particular column must be of a particular data type, e.g., Boolean, numeric (integer or real), date, etc.

In [15]:
df_new = df_new.withColumn("tx_datetime", df_new.tx_datetime.cast(TimestampType()))\
        .withColumn("customer_id", df_new.customer_id.cast(IntegerType()))\
        .withColumn("terminal_id", df_new.terminal_id.cast(IntegerType()))\
        .withColumn("tx_amount", df_new.tx_amount.cast(DoubleType()))\
        .withColumn("tx_time_seconds", df_new.tx_time_seconds.cast(IntegerType()))\
        .withColumn("tx_time_days", df_new.tx_time_days.cast(IntegerType()))\
        .withColumn("tx_fraud", df_new.tx_fraud.cast(IntegerType()))\
        .withColumn("tx_fraud_scenario", df_new.tx_fraud_scenario.cast(IntegerType()))

In [16]:
df_new.printSchema()

root
 |-- # tranaction_id: string (nullable = true)
 |-- tx_datetime: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- terminal_id: integer (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: integer (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)



In [17]:
df_new = df_new.withColumn("hour", hour(col("tx_datetime")))\
        .withColumn("minute", minute(col("tx_datetime")))\
        .withColumn("second", second(col("tx_datetime")))

In [18]:
df_new

# tranaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario,hour,minute,second
46988237,2019-09-21 09:45:59,1,178,83.11,2627159,30,0,0,9,45,59
46988238,2019-09-21 19:33:01,2,660,22.15,2662381,30,0,0,19,33,1
46988239,2019-09-21 18:06:19,3,732,36.83,2657179,30,0,0,18,6,19
46988240,2019-09-21 16:56:01,10,663,19.3,2652961,30,0,0,16,56,1
46988241,2019-09-21 05:34:26,10,145,106.51,2612066,30,0,0,5,34,26
46988242,2019-09-21 12:12:51,11,337,53.97,2635971,30,0,0,12,12,51
46988243,2019-09-21 11:05:32,11,973,29.3,2631932,30,0,0,11,5,32
46988244,2019-09-21 15:13:40,11,975,28.59,2646820,30,0,0,15,13,40
46988245,2019-09-21 16:47:20,12,522,88.02,2652440,30,0,0,16,47,20
46988246,2019-09-21 07:57:03,12,522,77.39,2620623,30,0,0,7,57,3


In [19]:
df_new.select(min("tx_datetime"), max("tx_datetime")).first()

Row(min(tx_datetime)=datetime.datetime(2019, 9, 21, 0, 0), max(tx_datetime)=datetime.datetime(2019, 10, 20, 23, 59, 59))

In [20]:
df_new.select(min("hour"), max("hour"), min("minute"), max("minute"), min("second"), max("second")).first()

Row(min(hour)=0, max(hour)=23, min(minute)=0, max(minute)=59, min(second)=0, max(second)=59)

#### 2. Range Constraints: typically, numbers or dates should fall within a certain range. That is, they have minimum and/or maximum permissible values.

In [21]:
df_new.summary()

summary,# tranaction_id,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario,hour,minute,second
count,46994586.0,46994586.0,46993808.0,46994586.0,46994586.0,46994586.0,46994586.0,46994586.0,46994483.0,46994483.0,46994483.0
mean,70485529.5,500408.7910296518,26131.415391065988,54.23517020109595,3887961.7464464735,44.4995699291829,0.0571271337511091,0.1150891934658175,11.499809371240447,29.499881720158516,29.503239539841303
stddev,13566168.583115077,288605.71890121,1514438.6991362155,41.309001673721376,748103.2080479839,8.656042958590001,0.2320853840435848,0.4695955994921507,5.059005945167615,17.317712944087045,17.318269042539246
min,46988237.0,-999999.0,0.0,0.0,2592000.0,30.0,0.0,0.0,0.0,0.0,0.0
25%,58734146.0,250719.0,197.0,21.94,3239912.0,37.0,0.0,0.0,8.0,15.0,15.0
50%,70487852.0,500719.0,511.0,45.42,3888462.0,44.0,0.0,0.0,12.0,30.0,30.0
75%,82232017.0,750218.0,797.0,77.81,4535930.0,52.0,0.0,0.0,15.0,45.0,45.0
max,93982822.0,999999.0,89518096.0,4303.07,5184000.0,59.0,1.0,3.0,23.0,59.0,59.0


Что обнаружено:

* customer_id = -999999; 999999

* tx_amount = 0.0

In [22]:
df_new[df_new.tx_amount == 0.0]

# tranaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario,hour,minute,second
47061238,2019-09-21 03:26:32,46539,707,0.0,2604392,30,0,0,3,26,32
47154330,2019-09-21 21:33:19,105987,870,0.0,2669599,30,0,0,21,33,19
47165461,2019-09-21 13:32:23,113020,100,0.0,2640743,30,1,2,13,32,23
47181299,2019-09-21 12:35:54,123313,251,0.0,2637354,30,0,0,12,35,54
47227073,2019-09-21 05:15:53,152548,891,0.0,2610953,30,0,0,5,15,53
47228403,2019-09-21 19:02:19,153348,160,0.0,2660539,30,0,0,19,2,19
47350288,2019-09-21 13:55:31,231601,435,0.0,2642131,30,0,0,13,55,31
47426689,2019-09-21 11:10:10,280295,816,0.0,2632210,30,0,0,11,10,10
47486538,2019-09-21 04:57:51,318725,337,0.0,2609871,30,0,0,4,57,51
47544251,2019-09-21 16:15:48,355184,516,0.0,2650548,30,0,0,16,15,48


In [23]:
df_new[df_new.customer_id == -999999]

# tranaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario,hour,minute,second
47009347,2019-09-21 19:52:15,-999999,779,107.75,2663535,30,0,0,19,52,15
47079004,2019-09-21 09:35:34,-999999,972,11.88,2626534,30,0,0,9,35,34
47105774,2019-09-21 10:29:57,-999999,403,7.6,2629797,30,0,0,10,29,57
47158695,2019-09-21 13:56:17,-999999,710,5.57,2642177,30,0,0,13,56,17
47392780,2019-09-21 07:49:27,-999999,481,74.61,2620167,30,0,0,7,49,27
47440508,2019-09-21 13:53:24,-999999,754,63.82,2642004,30,0,0,13,53,24
47503116,2019-09-21 19:08:16,-999999,114,54.13,2660896,30,0,0,19,8,16
47549817,2019-09-21 16:43:46,-999999,132,26.74,2652226,30,0,0,16,43,46
47668930,2019-09-21 10:27:52,-999999,831,24.74,2629672,30,0,0,10,27,52
47673768,2019-09-21 11:38:18,-999999,580,63.21,2633898,30,0,0,11,38,18


In [24]:
df_new[df_new.customer_id == 999999]

# tranaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario,hour,minute,second
50124028,2019-09-22 04:18:50,999999,488,25.47,2693930,31,0,0,4,18,50
50124029,2019-09-22 13:56:59,999999,920,37.56,2728619,31,0,0,13,56,59
50124030,2019-09-22 15:49:08,999999,374,42.99,2735348,31,0,0,15,49,8
56388538,2019-09-26 14:33:06,999999,374,31.56,3076386,35,0,0,14,33,6
61087255,2019-09-29 08:18:41,999999,548,34.81,3313121,38,0,0,8,18,41
61087256,2019-09-29 09:53:19,999999,354,32.94,3318799,38,0,0,9,53,19
62653437,2019-09-30 20:04:37,999999,498,49.47,3441877,39,0,0,20,4,37
64220672,2019-10-01 10:50:39,999999,735,28.45,3495039,40,0,0,10,50,39
64220673,2019-10-01 12:12:32,999999,881,34.04,3499952,40,0,0,12,12,32
65786156,2019-10-02 15:51:07,999999,4,27.72,3599467,41,0,0,15,51,7


#### 3. Mandatory Constraints: Certain columns cannot be empty

In [25]:
# Find Count of Null, None, NaN of All DataFrame Columns
df_new.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in df_new.columns[2:]])

customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario,hour,minute,second
0,778,0,0,0,0,0,103,103,103


#### 4. Unique Constraints: A field, or a combination of fields, must be unique across a dataset.

In [26]:
df_new.distinct().count()

46994586

In [27]:
df_new.select(countDistinct("# tranaction_id"))

count(DISTINCT # tranaction_id)
46994586


Что обнаружено:

* количество уникальных значений меньше, чем общее количество строк, обнаружены дубликаты строк

#### 5. Set-Membership constraints: The values for a column come from a set of discrete values or codes.

In [28]:
df_new.groupBy("tx_fraud").count()

tx_fraud,count
1,2684666
0,44309920


In [29]:
df_new.groupBy("tx_fraud_scenario").count()

tx_fraud_scenario,count
1,25433
3,64670
2,2594563
0,44309920


### EDA на всем датасете

In [5]:
import subprocess

dir_in = "/data"
args = "hdfs dfs -ls | awk '{print $8}'"
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)

s_output, s_err = proc.communicate()
all_dart_dirs = s_output.split()
all_dart_dirs = all_dart_dirs[1:]

In [6]:
def basic_info(path):
    data = spark.read.text(path)
    header=data.first()[0]
    schema=header.split(" | ")
    df_new = data.filter(data["value"] != header).rdd.map(lambda x:x[0].split(",")).toDF(schema)
    df_new = df_new.withColumn("tx_datetime", df_new.tx_datetime.cast(TimestampType()))\
        .withColumn("customer_id", df_new.customer_id.cast(IntegerType()))\
        .withColumn("terminal_id", df_new.terminal_id.cast(IntegerType()))\
        .withColumn("tx_amount", df_new.tx_amount.cast(DoubleType()))\
        .withColumn("tx_time_seconds", df_new.tx_time_seconds.cast(IntegerType()))\
        .withColumn("tx_time_days", df_new.tx_time_days.cast(IntegerType()))\
        .withColumn("tx_fraud", df_new.tx_fraud.cast(IntegerType()))\
        .withColumn("tx_fraud_scenario", df_new.tx_fraud_scenario.cast(IntegerType()))
    print(df_new.summary())
    print(df_new.select(min("tx_datetime"), max("tx_datetime")).first())
    print(df_new.select(countDistinct("# tranaction_id")))

In [47]:
for path in all_dart_dirs[:3]:
    print(str(path)[2:-1])
    basic_info(str(path)[2:-1])

2019-08-22.txt
+-------+--------------------+------------------+------------------+-----------------+------------------+-----------------+-------------------+-------------------+
|summary|     # tranaction_id|       customer_id|       terminal_id|        tx_amount|   tx_time_seconds|     tx_time_days|           tx_fraud|  tx_fraud_scenario|
+-------+--------------------+------------------+------------------+-----------------+------------------+-----------------+-------------------+-------------------+
|  count|            46988418|          46988418|          46988418|         46988418|          46988418|         46988418|           46988418|           46988418|
|   mean|2.3494115552178305E7|500433.83151635365|26597.231571533222|54.23395999456729|1296054.5003670265| 14.5006346883183|0.05377931642644364|0.10841507794537794|
| stddev|1.3564333711807257E7| 288539.1737685039|1528137.6620335372|41.25033514383643| 748049.5087349451|8.655415884394312|0.22558169835808653| 0.4568780500029875|
|

In [None]:
for path in all_dart_dirs[3:]:
    print(str(path)[2:-1])
    basic_info(str(path)[2:-1])

2019-11-20.txt
+-------+--------------------+------------------+------------------+-----------------+-----------------+------------------+-------------------+-------------------+
|summary|     # tranaction_id|       customer_id|       terminal_id|        tx_amount|  tx_time_seconds|      tx_time_days|           tx_fraud|  tx_fraud_scenario|
+-------+--------------------+------------------+------------------+-----------------+-----------------+------------------+-------------------+-------------------+
|  count|            46992239|          46992239|          46990303|         46992239|         46992239|          46992239|           46992239|           46992239|
|   mean|1.6447322476824638E8|500459.87731093215|27049.256139059158|54.23781666627963|9071959.531695394|104.49993331877631|0.05227031212537032|0.10539791474928445|
| stddev|1.3565467724042192E7|288546.72727632715| 1541322.190585709|41.30263273238161|748176.4890162547| 8.655355111644191| 0.2225716236402647| 0.4509415384972227|
|

In [None]:
for path in all_dart_dirs[14:]:
    print(str(path)[2:-1])
    basic_info(str(path)[2:-1])

2020-10-15.txt
+-------+--------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+------------------+
|summary|     # tranaction_id|       customer_id|       terminal_id|         tx_amount|    tx_time_seconds|      tx_time_days|           tx_fraud| tx_fraud_scenario|
+-------+--------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+------------------+
|  count|            47001238|          47001238|          46998898|          47001238|           47001238|          47001238|           47001238|          47001238|
|   mean| 6.814209840966522E8|500457.73553637037| 27032.93110825705| 54.23049368167651|3.758398264230325E7|434.49979994143985|0.04991462139784488|  0.10067870978207|
| stddev|1.3568048763479581E7| 288569.9564065086|1540849.5479515449|41.246148088204606|  748096.8014401352| 8.655975066176538|0.21776857665313426|0.4414355

In [10]:
for path in all_dart_dirs[20:]:
    print(str(path)[2:-1])
    basic_info(str(path)[2:-1])

2021-04-13.txt
+-------+--------------------+------------------+------------------+-----------------+-------------------+-----------------+-------------------+-------------------+
|summary|     # tranaction_id|       customer_id|       terminal_id|        tx_amount|    tx_time_seconds|     tx_time_days|           tx_fraud|  tx_fraud_scenario|
+-------+--------------------+------------------+------------------+-----------------+-------------------+-----------------+-------------------+-------------------+
|  count|            46994570|          46994570|          46992906|         46994570|           46994570|         46994570|           46994570|           46994570|
|   mean| 9.633788144999998E8|500377.17109921423| 26558.23356344466|54.23780637848129|5.313580339001889E7|614.4994623421387|0.05314043728881869|0.10711986512484314|
| stddev|1.3566163964312918E7| 288541.7384795898|1527004.8167277158|41.25614367717534|  751628.7974476136|8.656297510590337| 0.2243134687981527| 0.4542968358740

* tx_datetime: поиск значений, которые выпадают за требуемые ограничения (часы, минуты и т.п.)

In [49]:
def add_info(path):
    data = spark.read.text(path)
    header=data.first()[0]
    schema=header.split(" | ")
    df_new = data.filter(data["value"] != header).rdd.map(lambda x:x[0].split(",")).toDF(schema)
    df_new = df_new.withColumn("tx_datetime", df_new.tx_datetime.cast(TimestampType()))\
                   .withColumn("hour", hour(col("tx_datetime")))\
                   .withColumn("minute", minute(col("tx_datetime")))\
                   .withColumn("second", second(col("tx_datetime")))
    print(df_new.select(min("hour"), max("hour"), min("minute"), max("minute"), min("second"), max("second")).first())

In [50]:
for path in all_dart_dirs:
    print(str(path)[2:-1])
    add_info(str(path)[2:-1])

2019-08-22.txt
Row(min(hour)=0, max(hour)=23, min(minute)=0, max(minute)=59, min(second)=0, max(second)=59)
2019-09-21.txt
Row(min(hour)=0, max(hour)=23, min(minute)=0, max(minute)=59, min(second)=0, max(second)=59)
2019-10-21.txt
Row(min(hour)=0, max(hour)=23, min(minute)=0, max(minute)=59, min(second)=0, max(second)=59)
2019-11-20.txt
Row(min(hour)=0, max(hour)=23, min(minute)=0, max(minute)=59, min(second)=0, max(second)=59)
2019-12-20.txt
Row(min(hour)=0, max(hour)=23, min(minute)=0, max(minute)=59, min(second)=0, max(second)=59)
2020-01-19.txt
Row(min(hour)=0, max(hour)=23, min(minute)=0, max(minute)=59, min(second)=0, max(second)=59)
2020-02-18.txt
Row(min(hour)=0, max(hour)=23, min(minute)=0, max(minute)=59, min(second)=0, max(second)=59)
2020-03-19.txt
Row(min(hour)=0, max(hour)=23, min(minute)=0, max(minute)=59, min(second)=0, max(second)=59)
2020-04-18.txt
Row(min(hour)=0, max(hour)=23, min(minute)=0, max(minute)=59, min(second)=0, max(second)=59)
2020-05-18.txt
Row(min(hour)

Дубликаты строк

In [50]:
df_new.filter(df_new['# tranaction_id'] == 3600373).show(truncate=False)

+---------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|# tranaction_id|tx_datetime        |customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+---------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|3600373        |2019-08-24 19:52:51|299409     |539        |33.25    |244371         |2           |0       |0                |
|3600373        |2019-08-24 19:52:51|299409     |539        |33.25    |244371         |2           |0       |0                |
+---------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+



In [8]:
spark.stop()