In [2]:
!hdfs dfs -ls /user/ubuntu/data

Found 40 items
-rw-r--r--   1 ubuntu hadoop 2807409271 2025-12-07 08:44 /user/ubuntu/data/2019-08-22.txt
-rw-r--r--   1 ubuntu hadoop 2854479008 2025-12-07 08:26 /user/ubuntu/data/2019-09-21.txt
-rw-r--r--   1 ubuntu hadoop 2895460543 2025-12-07 08:38 /user/ubuntu/data/2019-10-21.txt
-rw-r--r--   1 ubuntu hadoop 2939120942 2025-12-07 08:35 /user/ubuntu/data/2019-11-20.txt
-rw-r--r--   1 ubuntu hadoop 2995462277 2025-12-07 08:23 /user/ubuntu/data/2019-12-20.txt
-rw-r--r--   1 ubuntu hadoop 2994906767 2025-12-07 08:27 /user/ubuntu/data/2020-01-19.txt
-rw-r--r--   1 ubuntu hadoop 2995431240 2025-12-07 08:30 /user/ubuntu/data/2020-02-18.txt
-rw-r--r--   1 ubuntu hadoop 2995176166 2025-12-07 08:40 /user/ubuntu/data/2020-03-19.txt
-rw-r--r--   1 ubuntu hadoop 2996034632 2025-12-07 08:15 /user/ubuntu/data/2020-04-18.txt
-rw-r--r--   1 ubuntu hadoop 2995666965 2025-12-07 08:16 /user/ubuntu/data/2020-05-18.txt
-rw-r--r--   1 ubuntu hadoop 2994699401 2025-12-07 08:17 /user/ubuntu/data

In [3]:
!pip install findspark

Defaulting to user installation because normal site-packages is not writeable
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


Перезапустить kernel после установки!

In [6]:
import findspark
findspark.init()

In [8]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("hw03")
        .config("spark.hadoop.fs.s3a.committer.name", "directory")
        .config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
        .config("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
        .config("spark.driver.memory", "4g")
        .config("spark.executor.memory", "4g")     # 8 ГБ для экзекутора
        .config("spark.executor.cores", "2")       # 2 ядра на экзекутор
        .config("spark.executor.instances", "6")   # 6 экзекуторов
        .getOrCreate()
)
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.hadoop.parquet.block.size", 128 * 1024 * 1024)    # 128 МБ
spark.conf.set("spark.sql.files.maxPartitionBytes", 512 * 1024 * 1024) # 512 МБ

In [9]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType

schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("tx_datetime", TimestampType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("terminal_id", IntegerType(), True),
    StructField("tx_amount", DoubleType(), True),
    StructField("tx_time_seconds", IntegerType(), True),
    StructField("tx_time_days", IntegerType(), True),
    StructField("tx_fraud", IntegerType(), True),
    StructField("tx_fraud_scenario", IntegerType(), True)
])

df = spark.read.csv(
    "/user/ubuntu/data",
    header=False,
    pathGlobFilter="2022-11-04.txt",
    schema=schema,
    comment='#',
    mode='PERMISSIVE'
)

df

transaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
1832792610,2022-11-04 14:22:18,0,53,63.58,101139738,1170,0,0
1832792611,2022-11-04 02:12:24,0,53,92.95,101095944,1170,0,0
1832792612,2022-11-04 12:49:35,3,205,48.88,101134175,1170,0,0
1832792613,2022-11-04 02:40:01,5,383,24.69,101097601,1170,0,0
1832792614,2022-11-04 08:02:05,6,858,95.48,101116925,1170,0,0
1832792615,2022-11-04 05:45:04,8,931,60.98,101108704,1170,0,0
1832792616,2022-11-04 20:01:50,8,931,28.48,101160110,1170,0,0
1832792617,2022-11-04 15:11:42,9,450,7.89,101142702,1170,0,0
1832792618,2022-11-04 11:20:49,10,549,63.37,101128849,1170,0,0
1832792619,2022-11-04 23:11:46,10,549,78.02,101171506,1170,0,0


In [4]:
from pyspark.sql.functions import isnan, when, count, col
import pandas as pd

summary = {
    "column_name"  : [],
    "column_type"  : [],
    "total_count"  : [],
    "unique_count" : [],
    "nan_count"    : [],
    "null_count"   : [],
    "eq_zero"      : [],
    "lt_zero"      : [],
    "gt_zero"      : [],
}

for c, t in df.dtypes:
    if t == "timestamp" :
        continue

    column_cached=df.select(c).cache()
    summary["column_name"].append(c)
    summary["column_type"].append(t)
    summary["total_count"].append(column_cached.count())
    summary["unique_count"].append(column_cached.distinct().count())
    summary["nan_count"].append(column_cached.filter(isnan(df[c])).count())
    summary["null_count"].append(column_cached.filter(df[c].isNull()).count())
    summary["eq_zero"].append(column_cached.filter(df[c] == 0).count())
    summary["lt_zero"].append(column_cached.filter(df[c] < 0).count())
    summary["gt_zero"].append(column_cached.filter(df[c] > 0).count())
    column_cached.unpersist()

pd.DataFrame.from_dict(summary)

Unnamed: 0,column_name,column_type,total_count,unique_count,nan_count,null_count,eq_zero,lt_zero,gt_zero
0,transaction_id,int,46998983,46998975,0,0,0,0,46998983
1,customer_id,int,46998983,988635,0,0,63,96,46998824
2,terminal_id,int,46998983,1006,0,2298,2075990,0,44920695
3,tx_amount,double,46998983,43751,0,0,936,0,46998047
4,tx_time_seconds,int,46998983,2585565,0,0,0,0,46998983
5,tx_time_days,int,46998983,30,0,0,0,0,46998983
6,tx_fraud,int,46998983,2,0,0,45592832,0,1406151
7,tx_fraud_scenario,int,46998983,4,0,0,45592832,0,1406151


In [10]:
df_clean = df.filter(
    df.terminal_id.isNotNull() &
    (df.customer_id > 0) &
    (df.tx_amount > 0) &
    (df.tx_time_seconds > 0) &
    (df.transaction_id > 0) &
    df.tx_fraud.isin([0, 1]) &
    df.tx_fraud_scenario.isin([0, 1, 2, 3])
).dropDuplicates(['transaction_id'])

In [11]:
df_clean.coalesce(12).write.mode("overwrite").parquet("s3a://otus-bucket-d96c6f8b8b8aa500/dataset.parquet")