In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/criteo-dataset/dac/test.txt
/kaggle/input/criteo-dataset/dac/readme.txt
/kaggle/input/criteo-dataset/dac/train.txt


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.0.2.tar.gz (204.8 MB)
[K     |████████████████████████████████| 204.8 MB 49 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 43.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - done
[?25h  Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186687 sha256=ece44ae443d80bbd060bad9baa018d87954b5d5f6b730b6397beb5ad098c45dd
  Stored in directory: /root/.cache/pip/wheels/9a/39/f6/970565f38054a830e9a8593f388b36e14d75dba6c6fdafc1ec
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.2

In [3]:
import dask.dataframe as dd
import gc

from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

import time

In [4]:
#gc.collect()

In [5]:
# sc = SparkContext.getOrCreate()
# sc.version, sc.defaultParallelism, sc.defaultMinPartitions

In [6]:
%%time

# sc = SparkContext.getOrCreate()

spark = (
    SparkSession.builder
    .appName("ctr_data") # .config("spark.executor.cores", 4)
    .getOrCreate()
)

df_original_test = spark.read.csv("/kaggle/input/criteo-dataset/dac/test.txt",
                              inferSchema=True)
df_original_train = spark.read.csv("/kaggle/input/criteo-dataset/dac/train.txt",
                              inferSchema=True)

#df_original_test.printSchema()

CPU times: user 44.8 ms, sys: 25 ms, total: 69.9 ms
Wall time: 1min 40s


In [7]:
# helper functions

def split_col_testing_I(i, df_type):
    '''
    Function for extracting numerical features (now of str type) 
    from original df (one cloumn only).
    
    i: int, numerical index for column
    df_type: dataframe, either df_original_test or df_original_train
    '''
    
    splitted = F.split(df_type['_c0'], '\t')
    
    if df_type == df_original_test:
        result = splitted.getItem(i).alias('I'+str(i+1))
    else:
        if i == 0:
            result = splitted.getItem(i).alias('Label')
        else:
            result = splitted.getItem(i).alias('I'+str(i))
    
    return result

def split_col_testing_C(i, df_type):
    '''
    Function for extracting categorical features (str type) 
    from original df (one cloumn only).
    
    i: int, numerical index for column
    df_type: dataframe, either df_original_test or df_original_train
    '''
    
    splitted = F.split(df_type['_c0'], '\t')
    
    if df_type == df_original_test:
        result = splitted.getItem(i).alias('C'+str(i-12))
    else:
        result = splitted.getItem(i).alias('C'+str(i-13))
    
    return result

def empty_str_to_null(x):
    '''
    Function for replacing empty strings in a column with null values.
    '''
    return F.when(F.col(x) != "", F.col(x)).otherwise(None)

In [8]:
%%time

# change into dataframe with all feature columns (all of str type)
exprs1 = [split_col_testing_I(i, df_original_test) for i in range(13)] \
+ [split_col_testing_C(i, df_original_test) for i in range(13, 39)]

df_testing_temp1 = df_original_test.select(*exprs1)



# Convert into correct types and fill missing data with null
# numerical features: to int
query_change_test_col = ["cast(I"+str(i)+" as int) I"+str(i) for i in range(1, 14)] \
+ ['C'+str(i) for i in range(1,27)]

df_testing_temp2 = df_testing_temp1.selectExpr(
    query_change_test_col
)

# categorical features: replace '' with null
to_convert = ['C'+str(i) for i in range(1,27)]
exprs2 = [empty_str_to_null(x).alias(x) 
         if x in to_convert else x
         for x in df_testing_temp2.columns]

df_testing = df_testing_temp2.select(*exprs2)

CPU times: user 101 ms, sys: 33.1 ms, total: 134 ms
Wall time: 1.16 s


In [9]:
df_testing.count() # IDs start at 60000000 and finish at 66042134

6042135

In [10]:
df_testing.show(3)

+----+---+---+---+----+---+---+---+---+----+---+----+---+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+----+--------+--------+--------+--------+
|  I1| I2| I3| I4|  I5| I6| I7| I8| I9| I10|I11| I12|I13|      C1|      C2|      C3|      C4|      C5|      C6|      C7|      C8|      C9|     C10|     C11|     C12|     C13|     C14|     C15|     C16|     C17|     C18|     C19|     C20|     C21| C22|     C23|     C24|     C25|     C26|
+----+---+---+---+----+---+---+---+---+----+---+----+---+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+----+--------+--------+--------+--------+
|null| 29| 50|  5|7260|437|  1|  4| 14|null|  1|   0|  6|5a9ed9b0|a0e12995|a1e14474|08a40877|25c83c98|    null|964d1fdd|5b392875|a73ee51

In [11]:
df_testing.printSchema()

root
 |-- I1: integer (nullable = true)
 |-- I2: integer (nullable = true)
 |-- I3: integer (nullable = true)
 |-- I4: integer (nullable = true)
 |-- I5: integer (nullable = true)
 |-- I6: integer (nullable = true)
 |-- I7: integer (nullable = true)
 |-- I8: integer (nullable = true)
 |-- I9: integer (nullable = true)
 |-- I10: integer (nullable = true)
 |-- I11: integer (nullable = true)
 |-- I12: integer (nullable = true)
 |-- I13: integer (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: string (nullable = true)
 |-- C6: string (nullable = true)
 |-- C7: string (nullable = true)
 |-- C8: string (nullable = true)
 |-- C9: string (nullable = true)
 |-- C10: string (nullable = true)
 |-- C11: string (nullable = true)
 |-- C12: string (nullable = true)
 |-- C13: string (nullable = true)
 |-- C14: string (nullable = true)
 |-- C15: string (nullable = true)
 |-- C16: string (nu

In [12]:
%%time

# change into dataframe with all feature columns (all of str type)
exprs3 = [split_col_testing_I(0, df_original_train)] \
+ [split_col_testing_I(i, df_original_train) for i in range(1, 14)] \
+ [split_col_testing_C(i, df_original_train) for i in range(14, 40)]

df_training_temp1 = df_original_train.select(*exprs3)


# Convert into correct types and fill missing data with null
# numerical features: to int
query_change_train_col = ["cast(Label as int) Label"] \
+ ["cast(I"+str(i)+" as int) I"+str(i) for i in range(1, 14)] \
+ ['C'+str(i) for i in range(1,27)]

df_training_temp2 = df_training_temp1.selectExpr(
    query_change_train_col
)

# categorical features: replace '' with null
to_convert = ['C'+str(i) for i in range(1,27)]
exprs4 = [empty_str_to_null(x).alias(x) 
         if x in to_convert else x
         for x in df_training_temp2.columns]

df_training = df_training_temp2.select(*exprs4)

CPU times: user 86 ms, sys: 31.4 ms, total: 117 ms
Wall time: 590 ms


In [13]:
df_training.printSchema()

root
 |-- Label: integer (nullable = true)
 |-- I1: integer (nullable = true)
 |-- I2: integer (nullable = true)
 |-- I3: integer (nullable = true)
 |-- I4: integer (nullable = true)
 |-- I5: integer (nullable = true)
 |-- I6: integer (nullable = true)
 |-- I7: integer (nullable = true)
 |-- I8: integer (nullable = true)
 |-- I9: integer (nullable = true)
 |-- I10: integer (nullable = true)
 |-- I11: integer (nullable = true)
 |-- I12: integer (nullable = true)
 |-- I13: integer (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: string (nullable = true)
 |-- C6: string (nullable = true)
 |-- C7: string (nullable = true)
 |-- C8: string (nullable = true)
 |-- C9: string (nullable = true)
 |-- C10: string (nullable = true)
 |-- C11: string (nullable = true)
 |-- C12: string (nullable = true)
 |-- C13: string (nullable = true)
 |-- C14: string (nullable = true)
 |-- C15: string 

In [14]:
df_training.show(5)

+-----+----+---+----+----+----+----+---+---+---+----+---+----+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|Label|  I1| I2|  I3|  I4|  I5|  I6| I7| I8| I9| I10|I11| I12| I13|      C1|      C2|      C3|      C4|      C5|      C6|      C7|      C8|      C9|     C10|     C11|     C12|     C13|     C14|     C15|     C16|     C17|     C18|     C19|     C20|     C21|     C22|     C23|     C24|     C25|     C26|
+-----+----+---+----+----+----+----+---+---+---+----+---+----+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|    0|   1|  1|   5|   0|1382|   4| 15|  2|181|   1|  2|null|   2|68fd1e64|80e26c9b|fb936136|

In [15]:
df_training.count()

45840617

In [16]:
# save training and testing dataframes as parquet files
df_testing.write.parquet('testing.parquet', mode='overwrite')
df_training.write.parquet('training.parquet', mode='overwrite')