In [1]:
from glob import glob

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.driver.memory", "22g")
    .config("spark.executor.memory", "22g")
    .config("spark.driver.bindAddress","localhost")
    .config("spark.ui.port","4050")
    .getOrCreate()
)

In [2]:
BASE_PATH = "C:/BigData/"

In [3]:
train = spark.read.csv(BASE_PATH+"alphabattle2.0/alfabattle2_train_target.csv",
                        header=True,
                        inferSchema=True)
train.show(3, truncate=50)

+------+-------+----+
|app_id|product|flag|
+------+-------+----+
|     0|      3|   0|
|     1|      1|   0|
|     2|      1|   0|
+------+-------+----+
only showing top 3 rows



In [4]:
train.count()

963811

In [5]:
train.printSchema()

root
 |-- app_id: integer (nullable = true)
 |-- product: integer (nullable = true)
 |-- flag: integer (nullable = true)



In [6]:
train.select(F.min("app_id")).show()

+-----------+
|min(app_id)|
+-----------+
|          0|
+-----------+



In [7]:
train.select(F.max("app_id")).show()

+-----------+
|max(app_id)|
+-----------+
|    1003050|
+-----------+



In [8]:
train.select("flag").distinct().show()

+----+
|flag|
+----+
|   1|
|   0|
+----+



In [9]:
train_trunc = train.join(train.select("app_id").distinct().limit(5_000), on=["app_id"])

In [10]:
train_trunc.createOrReplaceTempView("train")

In [11]:
transaction_data_list = glob(BASE_PATH+"alphabattle2.0/alfabattle2_train_transactions_contest/train_transactions_contest/*.parquet")
transaction_data_list

['C:/BigData/alphabattle2.0/alfabattle2_train_transactions_contest/train_transactions_contest\\part_000_0_to_23646.parquet',
 'C:/BigData/alphabattle2.0/alfabattle2_train_transactions_contest/train_transactions_contest\\part_001_23647_to_47415.parquet',
 'C:/BigData/alphabattle2.0/alfabattle2_train_transactions_contest/train_transactions_contest\\part_002_47416_to_70092.parquet',
 'C:/BigData/alphabattle2.0/alfabattle2_train_transactions_contest/train_transactions_contest\\part_003_70093_to_92989.parquet',
 'C:/BigData/alphabattle2.0/alfabattle2_train_transactions_contest/train_transactions_contest\\part_004_92990_to_115175.parquet',
 'C:/BigData/alphabattle2.0/alfabattle2_train_transactions_contest/train_transactions_contest\\part_005_115176_to_138067.parquet',
 'C:/BigData/alphabattle2.0/alfabattle2_train_transactions_contest/train_transactions_contest\\part_006_138068_to_159724.parquet',
 'C:/BigData/alphabattle2.0/alfabattle2_train_transactions_contest/train_transactions_contest\\p

In [12]:
transaction_data = spark.read.parquet(*transaction_data_list)
transaction_data.show(3, truncate=50)

+------+------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|app_id|              amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flag|payment_system|income_flag|mcc|country|city|mcc_category|day_of_week|hour|days_before|weekofyear|hour_diff|transaction_number|__index_level_0__|
+------+------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|823300|0.2975890792412704|       1|             4|        2|             4|                   2|             1|             3|          2|  2|      1|  93|           2|          2|  10|        240|        48|       -1|          

In [13]:
transaction_data.printSchema()

root
 |-- app_id: integer (nullable = true)
 |-- amnt: double (nullable = true)
 |-- currency: integer (nullable = true)
 |-- operation_kind: integer (nullable = true)
 |-- card_type: integer (nullable = true)
 |-- operation_type: integer (nullable = true)
 |-- operation_type_group: integer (nullable = true)
 |-- ecommerce_flag: integer (nullable = true)
 |-- payment_system: integer (nullable = true)
 |-- income_flag: integer (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- country: integer (nullable = true)
 |-- city: integer (nullable = true)
 |-- mcc_category: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- days_before: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- hour_diff: long (nullable = true)
 |-- transaction_number: integer (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [14]:
transaction_data.show(5)

+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|app_id|               amnt|currency|operation_kind|card_type|operation_type|operation_type_group|ecommerce_flag|payment_system|income_flag|mcc|country|city|mcc_category|day_of_week|hour|days_before|weekofyear|hour_diff|transaction_number|__index_level_0__|
+------+-------------------+--------+--------------+---------+--------------+--------------------+--------------+--------------+-----------+---+-------+----+------------+-----------+----+-----------+----------+---------+------------------+-----------------+
|823300| 0.2975890792412704|       1|             4|        2|             4|                   2|             1|             3|          2|  2|      1|  93|           2|          2|  10|        240|        48|       -1|      

In [15]:
sample_transaction_data = transaction_data.join(train_trunc, on=["app_id"])
sample_transaction_data.columns

['app_id',
 'amnt',
 'currency',
 'operation_kind',
 'card_type',
 'operation_type',
 'operation_type_group',
 'ecommerce_flag',
 'payment_system',
 'income_flag',
 'mcc',
 'country',
 'city',
 'mcc_category',
 'day_of_week',
 'hour',
 'days_before',
 'weekofyear',
 'hour_diff',
 'transaction_number',
 '__index_level_0__',
 'product',
 'flag']

In [16]:
sample_transaction_data.count()

1315417

In [17]:
sample_transaction_data = sample_transaction_data.drop("__index_level_0__")

In [19]:
(
    sample_transaction_data
    .write
    .format("parquet")
    .mode('overwrite')
    .save(BASE_PATH+"alphabattle2.0/train_sample")
)