In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col, unix_timestamp, lag, when, count, year, month, dayofmonth, dayofweek, hour
from pyspark.ml.feature import QuantileDiscretizer, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder.appName("NequiProject").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/09 17:13:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sample006 = spark.read.parquet('./sample_data_0006_part_00.parquet')
sample007 = spark.read.parquet('./sample_data_0007_part_00.parquet')
samples = sample006.union(sample007)

                                                                                

In [4]:
samples = samples.na.drop()
samples = samples.dropDuplicates()

Transaction amount is an important feature, its distribution is important to define the encoding approuch

In [5]:
samples.describe("transaction_amount").show()

23/08/09 15:33:08 WARN TaskMemoryManager: Failed to allocate a page (8388608 bytes), try again.
23/08/09 15:35:55 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KiB for computing block rdd_16_125 in memory.
23/08/09 15:35:58 WARN MemoryStore: Not enough space to cache rdd_16_125 in memory! (computed 384.0 B so far)
23/08/09 15:35:58 WARN BlockManager: Persisting block rdd_16_125 to disk instead.
                                                                                

+-------+------------------+
|summary|transaction_amount|
+-------+------------------+
|  count|          21516907|
|   mean|  191.289846833874|
| stddev|240.84638087332038|
|    min|        5.94445501|
|    max|     4624.78599961|
+-------+------------------+



Since the mean is grater than the std, quantile segmentation is used. This will output tansactions in small, medium and large

In [5]:
quantile_discretizer = QuantileDiscretizer(
    inputCol="transaction_amount",
    outputCol="transaction_category",
    numBuckets=3)

In [7]:
pipeline = Pipeline(stages=[quantile_discretizer])
pipeline_model = pipeline.fit(samples)
samples = pipeline_model.transform(samples)

                                                                                

Subsidiary can provide helpful information, since it represent the id of the subsidiary, frecuency encoding is applied with this feature

In [5]:
sub_fq = samples.groupBy("subsidiary").agg(count("*").alias("subsidiary_count"))
samples = samples.join(sub_fq, on="subsidiary", how="left").cache()

In [10]:
samples.show(10)

+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+------------------+----------------+----------------+
|          subsidiary|         merchant_id|                 _id|   transaction_date|      account_number|             user_id|transaction_amount|transaction_type|subsidiary_count|
+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+------------------+----------------+----------------+
|0119b3ab2263fb863...|817d18cd3c31e40e9...|047a1dec0fcf7ad07...|2021-03-01 08:03:40|632626d4a14d1c0ce...|e5bf67f2894e342dd...|      356.66730074|          DEBITO|            1247|
|0119b3ab2263fb863...|817d18cd3c31e40e9...|5c148457344e75077...|2021-01-28 15:01:08|cc12afcca787606df...|9b63ea08a9a99c147...|      118.88910024|          DEBITO|            1247|
|0119b3ab2263fb863...|817d18cd3c31e40e9...|ee36d562b6300ba65...|2021-09-16 13:29:55|7b5ff7ca4532a15b

Time is crucial, the most important values remains.

In [5]:
samples = samples.withColumn("year", year(col("transaction_date"))) \
                 .withColumn("month", month(col("transaction_date"))) \
                 .withColumn("day", dayofmonth(col("transaction_date"))) \
                 .withColumn("day_of_week", dayofweek(col("transaction_date"))) \
                 .withColumn("hour", hour(col("transaction_date")))

In [6]:
samples.show(5)

                                                                                

+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+------------------+----------------+----+-----+---+-----------+----+
|         merchant_id|                 _id|          subsidiary|   transaction_date|      account_number|             user_id|transaction_amount|transaction_type|year|month|day|day_of_week|hour|
+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+------------------+----------------+----+-----+---+-----------+----+
|075d178871d8d4850...|0ccf1fe8dd3333e6e...|00015fd77a0f4d869...|2021-06-18 07:08:03|a79ff75955445c687...|ebc8d3ae01b8334f3...|      356.66730074|          DEBITO|2021|    6| 18|          6|   7|
|075d178871d8d4850...|b86f53170772d5b15...|00015fd77a0f4d869...|2021-05-05 06:28:04|c790394d677140db4...|05ee34549ddebf8c2...|      350.72284572|          DEBITO|2021|    5|  5|          4|   6|
|075d178871d8d4850...|cfe

Merchant and transaction type can be easily handled with one hot encoding

In [5]:
indexer_merchant = StringIndexer(inputCol="merchant_id", outputCol="indexed_merchant_id")
indexer_transaction = StringIndexer(inputCol="transaction_type", outputCol="indexed_transaction_type")

# OneHotEncoder for indexed columns
encoder_merchant = OneHotEncoder(inputCol="indexed_merchant_id", outputCol="encoded_merchant_id")
encoder_transaction = OneHotEncoder(inputCol="indexed_transaction_type", outputCol="encoded_transaction_type")

# Create a pipeline to add the indexing and encoding steps
pipeline = Pipeline(stages=[indexer_merchant, indexer_transaction, encoder_merchant, encoder_transaction])
pipeline_model = pipeline.fit(samples)
samples = pipeline_model.transform(samples).cache()

                                                                                

In [6]:
samples.show()

23/08/09 17:19:23 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
23/08/09 17:19:24 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
23/08/09 17:19:25 WARN TaskMemoryManager: Failed to allocate a page (12396667 bytes), try again.
23/08/09 17:19:34 ERROR Executor: Exception in task 10.0 in stage 14.0 (TID 146)
java.lang.OutOfMemoryError: GC overhead limit exceeded
23/08/09 17:19:34 ERROR Executor: Exception in task 3.0 in stage 14.0 (TID 139)
java.lang.OutOfMemoryError: GC overhead limit exceeded
23/08/09 17:19:34 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 3.0 in stage 14.0 (TID 139),5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
23/08/09 17:19:34 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 10.0 in stage 14.0 (TID 146),5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded

Py4JJavaError: An error occurred while calling o247.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 14.0 failed 1 times, most recent failure: Lost task 10.0 in stage 14.0 (TID 146) (192.168.1.8 executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded


23/08/09 17:19:35 ERROR BypassMergeSortShuffleWriter: Unable to delete file for partition 190
23/08/09 17:19:35 ERROR BypassMergeSortShuffleWriter: Unable to delete file for partition 195
23/08/09 17:19:35 ERROR BypassMergeSortShuffleWriter: Unable to delete file for partition 197
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/danielaherrera/Library/Python/3.9/lib/python/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/danielaherrera/Library/Python/3.9/lib/python/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/danielaherrera/Library/Python/3.9/lib/python/site-packages/py4j/clientserver.py", line 539, in send_

There are too many users ID and account numbers, and every day there are new ones. Including this feature to the model will require a random encoding for new values, not seen in training stage. For this reason, both columns are droped out from the features matrix.

In [None]:
window_spec = Window().partitionBy("account_number").orderBy("transaction_date")
samples = samples.withColumn("time_diff",unix_timestamp("transaction_date") - unix_timestamp(lag("transaction_date").over(window_spec)))
samples = samples.withColumn("Y",when((col("time_diff") < 86400) | (lag("time_diff").over(window_spec) < 86400), 1).otherwise(0)).cache()

In [None]:
samples = samples.drop(*["time_diff","_id","transaction_amount","account_number","user_id","transaction_date","subsidiary"]).cache()