In [None]:
!pip install findspark polars fsspec s3fs

In [1]:
import findspark
findspark.init()
findspark.find()

'/usr/lib/spark'

In [2]:
import numpy as np
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import (count, when, isnan, col, countDistinct, approx_count_distinct, \
                                   to_timestamp, unix_timestamp, floor)
from pyspark.sql.types import (StringType, BooleanType, LongType, TimestampType, \
                               DecimalType, IntegerType, ShortType, StructType)

In [3]:
S3_BUCKET = "s3a://otus-mlops-course/raw/"

In [4]:
%%time
spark = SparkSession\
    .builder\
    .appName("preproc")\
    .config("spark.executor.memory", "8g")\
    .config("spark.driver.memory", "8g")\
    .config("spark.driver.maxResultSize", "8g")\
    .getOrCreate()

CPU times: user 24.7 ms, sys: 10.4 ms, total: 35.1 ms
Wall time: 20.6 s


In [5]:
%%time
sql = SQLContext(spark)

CPU times: user 1.29 ms, sys: 229 µs, total: 1.52 ms
Wall time: 3.34 ms


In [9]:
%%time
df = sql.read.csv(S3_BUCKET, header=True, inferSchema=True)

CPU times: user 79.2 ms, sys: 18.8 ms, total: 98 ms
Wall time: 11min 35s


In [23]:
df.printSchema()

root
 |-- tx_id: integer (nullable = true)
 |-- tx_datetime: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- terminal_id: string (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: integer (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)



In [26]:
print((df.count(), len(df.columns)))

(1879794098, 9)


## Problem # 1:invalid terminal id [~0.002%]

In [10]:
%%time
distinct_terminals = df.select('terminal_id').distinct().collect()

CPU times: user 65.9 ms, sys: 15.3 ms, total: 81.2 ms
Wall time: 8min 28s


In [19]:
len(distinct_terminals)

1009

In [17]:
import numpy as np
dt_array = np.array(distinct_terminals).flatten()
dt_array.astype(int)

TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'

In [18]:
# easiest check, because dt_array is small:)
for x in dt_array:
    try:
        x = int(x)
    except:
        print(x, end=';')

None;Err;

In [35]:
%%time
terminal_counts = df.agg(approx_count_distinct("terminal_id"))

CPU times: user 3.36 ms, sys: 0 ns, total: 3.36 ms
Wall time: 31 ms


In [22]:
# trying explicit cast with spark
terminal_int_cast = df.select("terminal_id").withColumn("terminal_id", col("terminal_id").cast(IntegerType()))

In [24]:
%%time
terminal_int_cast.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ['terminal_id']]).show()

+-----------+
|terminal_id|
+-----------+
|      40312|
+-----------+

CPU times: user 112 ms, sys: 22 ms, total: 134 ms
Wall time: 15min 32s


## Problem #2: invalid datetime format [~0.0002%]

In [45]:
%%time
tx_dt = df.select(col("tx_datetime"), to_timestamp(col("tx_datetime"), "yyyy-MM-dd HH:mm:ss").alias("to_timestamp"))
  

CPU times: user 2.12 ms, sys: 364 µs, total: 2.49 ms
Wall time: 19.1 ms


In [49]:
tx_dt.show()

+-------------------+-------------------+
|        tx_datetime|       to_timestamp|
+-------------------+-------------------+
|2019-08-22 05:10:37|2019-08-22 05:10:37|
|2019-08-22 19:05:33|2019-08-22 19:05:33|
|2019-08-22 07:21:33|2019-08-22 07:21:33|
|2019-08-22 09:06:17|2019-08-22 09:06:17|
|2019-08-22 18:41:25|2019-08-22 18:41:25|
|2019-08-22 03:12:21|2019-08-22 03:12:21|
|2019-08-22 22:36:40|2019-08-22 22:36:40|
|2019-08-22 17:23:29|2019-08-22 17:23:29|
|2019-08-22 21:09:37|2019-08-22 21:09:37|
|2019-08-22 11:32:42|2019-08-22 11:32:42|
|2019-08-22 03:09:26|2019-08-22 03:09:26|
|2019-08-22 15:47:54|2019-08-22 15:47:54|
|2019-08-22 21:59:20|2019-08-22 21:59:20|
|2019-08-22 20:55:13|2019-08-22 20:55:13|
|2019-08-22 16:39:03|2019-08-22 16:39:03|
|2019-08-22 23:15:07|2019-08-22 23:15:07|
|2019-08-22 07:39:45|2019-08-22 07:39:45|
|2019-08-22 05:35:39|2019-08-22 05:35:39|
|2019-08-22 10:29:16|2019-08-22 10:29:16|
|2019-08-22 06:13:37|2019-08-22 06:13:37|
+-------------------+-------------

In [56]:
unix_tmstmp = df.select(unix_timestamp(col("tx_datetime")).alias("timestamp_1"))

In [57]:
unix_tmstmp.printSchema()

root
 |-- timestamp_1: long (nullable = true)



In [62]:
unix_tmstmp.show()

+-----------+
|timestamp_1|
+-----------+
| 1566450637|
| 1566500733|
| 1566458493|
| 1566464777|
| 1566499285|
| 1566443541|
| 1566513400|
| 1566494609|
| 1566508177|
| 1566473562|
| 1566443366|
| 1566488874|
| 1566511160|
| 1566507313|
| 1566491943|
| 1566515707|
| 1566459585|
| 1566452139|
| 1566469756|
| 1566454417|
+-----------+
only showing top 20 rows



In [58]:
%%time
unix_tmstmp.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ['timestamp_1']]).show()

+-----------+
|timestamp_1|
+-----------+
|       3805|
+-----------+

CPU times: user 82.2 ms, sys: 29.2 ms, total: 111 ms
Wall time: 12min 39s


## Problem #3: Inconsistent data between `tx_time_seconds` and `tx_time_days` [~0.0002%]

In [97]:
%%time
n_sec_day = 3600 * 24
df.select('tx_time_seconds').where(df.tx_time_days < floor(df.tx_time_seconds / n_sec_day)).count()

CPU times: user 42.5 ms, sys: 28.6 ms, total: 71.1 ms
Wall time: 7min 45s


4229

## Problem #4: Duplicate transaction ids (should be unique, [~0.0001%])

In [105]:
%%time
tx_unique = df.select(col("tx_id")).groupBy("tx_id").agg(countDistinct("tx_id"))

CPU times: user 4.32 ms, sys: 0 ns, total: 4.32 ms
Wall time: 27.4 ms


In [106]:
%%time
tx_unique.count()

CPU times: user 234 ms, sys: 73.4 ms, total: 307 ms
Wall time: 36min 10s


1879791545

## Small check

In [12]:
%%time
"""
root
 |-- tx_id: integer (nullable = true)
 |-- tx_datetime: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- terminal_id: string (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: integer (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)


"""
schema = StructType() \
      .add("tx_id", LongType(), True) \
      .add("tx_datetime", TimestampType(), True) \
      .add("customer_id", LongType(), True) \
      .add("terminal_id", IntegerType(), True) \
      .add("tx_amount", DecimalType(precision=10, scale=2), True) \
      .add("tx_time_seconds", IntegerType(), True) \
      .add("tx_time_days", IntegerType(), True) \
      .add("tx_fraud", IntegerType(), True) \
      .add("tx_fraud_scenario", ShortType(), True)

df = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load(f'{S3_BUCKET}/2019-08-22.txt')
df = df.dropDuplicates(["tx_id"])
df = df.na.drop(subset=["tx_datetime", "terminal_id"])
df.write.parquet("/tmp/2019-08-22_1.parquet")

CPU times: user 49.5 ms, sys: 18.7 ms, total: 68.2 ms
Wall time: 6min 57s


In [16]:
processed = spark.read.parquet(f"s3a://otus-mlops-course/tmp/2019-08-22_1.parquet")

## Post-processing check

In [7]:
all_processed = spark.read.parquet("s3a://otus-mlops-course/user/ubuntu/processed/processed_data.parquet")

In [9]:
all_processed.show()

+-------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|  tx_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|7024003|2019-08-26 14:42:40|     484596|        619|     3.86|         398560|           4|    null|                0|
|7024073|2019-08-26 13:46:46|     484638|         38|    46.08|         395206|           4|    null|                0|
|7024092|2019-08-26 19:42:35|     484649|        930|   155.03|         416555|           4|    null|                0|
|7024263|2019-08-26 16:34:14|     484762|        624|     2.58|         405254|           4|    null|                0|
|7024417|2019-08-26 17:27:25|     484865|        272|   127.25|         408445|           4|    null|                0|
|7024849|2019-08-26 14:45:02|     485138

In [12]:
all_processed.printSchema()

root
 |-- tx_id: long (nullable = true)
 |-- tx_datetime: timestamp (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- terminal_id: integer (nullable = true)
 |-- tx_amount: decimal(10,2) (nullable = true)
 |-- tx_time_seconds: integer (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: boolean (nullable = true)
 |-- tx_fraud_scenario: short (nullable = true)



In [11]:
all_processed.count(), len(all_processed.columns)

(1879747428, 9)

In [21]:
all_processed = all_processed.withColumn("tx_datetime", unix_timestamp(col("tx_datetime")))

In [19]:
all_processed = all_processed.withColumn("tx_fraud", col("tx_fraud").cast(IntegerType()))