In [1]:
#!pip install findspark

In [2]:
import findspark

findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("mlops_hw03")
#         .config("spark.driver.memory", "4g") 
#         .config("spark.executor.memory", "8g")     # 8 ГБ для экзекутора
#         .config("spark.executor.cores", "2")       # 1 ядра на экзекутор
#         .config("spark.executor.instances", "6")   # 6 экзекуторов
        .getOrCreate()
)
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [4]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType

schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("tx_datetime", TimestampType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("terminal_id", IntegerType(), True),
    StructField("tx_amount", DoubleType(), True),
    StructField("tx_time_seconds", IntegerType(), True),
    StructField("tx_time_days", IntegerType(), True),
    StructField("tx_fraud", IntegerType(), True),
    StructField("tx_fraud_scenario", IntegerType(), True)
])

df = spark.read.csv(
    "/dataset",
    header=False,
    pathGlobFilter="*.txt",
    schema=schema, 
    comment='#',
    mode='PERMISSIVE'
)

df

transaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
0,2019-08-22 06:51:03,0,711,70.91,24663,0,0,0
1,2019-08-22 05:10:37,0,0,90.55,18637,0,0,0
2,2019-08-22 19:05:33,0,753,35.38,68733,0,0,0
3,2019-08-22 07:21:33,0,0,80.41,26493,0,0,0
4,2019-08-22 09:06:17,1,981,102.83,32777,0,0,0
5,2019-08-22 18:41:25,3,205,34.2,67285,0,0,0
6,2019-08-22 03:12:21,3,0,47.2,11541,0,0,0
7,2019-08-22 22:36:40,6,809,139.39,81400,0,0,0
8,2019-08-22 17:23:29,7,184,87.24,62609,0,0,0
9,2019-08-22 21:09:37,8,931,61.7,76177,0,0,0


In [5]:
for c, t in df.dtypes:
    print(f"{c:<20}{t}")

transaction_id      int
tx_datetime         timestamp
customer_id         int
terminal_id         int
tx_amount           double
tx_time_seconds     int
tx_time_days        int
tx_fraud            int
tx_fraud_scenario   int


In [6]:
from pyspark.sql.functions import isnan, when, count, col
import pandas as pd

summary = {
    "column_name"  : [],
    "column_type"  : [],
    "total_count"  : [],
    "unique_count" : [],
    "nan_count"    : [],
    "null_count"   : [],
    "eq_zero"      : [],
    "lt_zero"      : [],
    "gt_zero"      : [],
}

# row_count = df.count()

for c, t in df.dtypes:
    if t == "timestamp" : 
        continue
        
    column_cached=df.select(c).cache()
    summary["column_name"].append(c)
    summary["column_type"].append(t)
    summary["total_count"].append(column_cached.count())
    summary["unique_count"].append(column_cached.distinct().count())
    summary["nan_count"].append(column_cached.filter(isnan(df[c])).count())
    summary["null_count"].append(column_cached.filter(df[c].isNull()).count())
    summary["eq_zero"].append(column_cached.filter(df[c] == 0).count())
    summary["lt_zero"].append(column_cached.filter(df[c] < 0).count())
    summary["gt_zero"].append(column_cached.filter(df[c] > 0).count())
    column_cached.unpersist()
    
pd.DataFrame.from_dict(summary)

Unnamed: 0,column_name,column_type,total_count,unique_count,nan_count,null_count,eq_zero,lt_zero,gt_zero
0,transaction_id,int,1879794138,1879791585,0,0,1,0,1879794137
1,customer_id,int,1879794138,996611,0,0,2688,14623,1879776827
2,terminal_id,int,1879794138,1008,0,40312,81177681,0,1798576145
3,tx_amount,double,1879794138,102544,0,0,35260,0,1879758878
4,tx_time_seconds,int,1879794138,103420351,0,0,3,0,1879794135
5,tx_time_days,int,1879794138,1200,0,0,1565894,0,1878228244
6,tx_fraud,int,1879794138,2,0,0,1768455409,0,111338729
7,tx_fraud_scenario,int,1879794138,4,0,0,1768455409,0,111338729
