In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from typing import Iterable
from itertools import chain
from pyspark.sql.types import StructType
from pyspark import SparkContext
import pandas as pd

In [2]:
spark = SparkSession.builder.master("local[*]").config("spark.driver.cores", 16).appName('zindi-insurance').getOrCreate()

In [3]:
spark.sparkContext.uiWebUrl

'http://DESKTOP-NLTNIOI:4040'

# Загрузка данных

In [4]:
train = spark.read.csv('Train.csv', header = True)
test = spark.read.csv('Test.csv', header = True)

In [5]:
train = train.dropna(subset = 'join_date')
test = test.fillna('1/1/2018', subset = 'join_date')

In [6]:
train = train.withColumn('marital_status', regexp_replace('marital_status', 'f', 'F'))

# Трансформируем дату в timestamp

In [7]:
train = train.withColumn('join_date', to_timestamp('join_date', 'd/M/y'))
test = test.withColumn('join_date', to_timestamp('join_date', 'd/M/y'))

In [8]:
train = train.withColumn('join_date', unix_timestamp('join_date', 'd/M/y'))
test = test.withColumn('join_date', unix_timestamp('join_date', 'd/M/y'))

# Переведем target variables в строки

In [9]:
target_cols = ['P5DA', 'RIBP', '8NN1',
           '7POT', '66FJ', 'GYSR', 'SOP4', 'RVSZ', 'PYUQ', 'LJR9', 'N2MW', 'AHXO',
           'BSTQ', 'FM3X', 'K6QO', 'QBOL', 'JWFN', 'JZ9D', 'J9JW', 'GHYX', 'ECY3']

In [10]:
def melt(
        df, 
        id_vars, 
        value_vars, 
        var_name = 'variable',
        value_name = 'value'):
    """Convert :class:`DataFrame` from wide to long format."""

    # Create map<key: value>
    _vars_and_vals = create_map(
        list(chain.from_iterable([
            [lit(c), col(c)] for c in value_vars]
        ))
    )

    _tmp = df.select(*id_vars, explode(_vars_and_vals)) \
        .withColumnRenamed('key', var_name) \
        .withColumnRenamed('value', value_name)

    return _tmp

In [11]:
train_id_vars = train[['ID', *target_cols]]
test_id_vars = test[['ID', *target_cols]]

train = melt(train, 
             id_vars = [col for col in train.columns if col not in target_cols], 
             value_vars = target_cols, value_name = 'presence')
test = melt(test, 
            id_vars = [col for col in test.columns if col not in target_cols], 
            value_vars = target_cols, value_name = 'presence')

train = train.join(train_id_vars, on = 'ID')
test = test.join(test_id_vars, on = 'ID')

# Кодирование переменных

In [12]:
cat_features = ['sex', 'marital_status', 'branch_code', 'occupation_code', 'occupation_category_code']

In [13]:
def encoding(train, test, column):
    indexer = StringIndexer(inputCol = column, outputCol = '{}_indexed'.format(column))
    indexer = indexer.fit(train.union(test))


    train = indexer.transform(train)
    train = train.drop(column)
    train = train.withColumnRenamed('{}_indexed'.format(column), column)


    test = indexer.transform(test)
    test = test.drop(column)
    test = test.withColumnRenamed('{}_indexed'.format(column), column)
    
    return(train, test)

In [14]:
for column in cat_features:
    
    train, test = encoding(train, test, column)

# Создание тренировочного датасета

In [17]:
def reshaping(user_data):
    
    new_user_data = pd.DataFrame()
    presence_1_vars = user_data[user_data['presence'] == '1']['variable'].values
    
    for var in presence_1_vars:
            
        temp_user_data = user_data.copy()
        temp_user_data[var] = '0'
        new_user_data = new_user_data.append(temp_user_data)
    
    return(new_user_data)

In [18]:
train = train.groupby('ID').applyInPandas(reshaping, schema = train.schema)

# Сохраним данные

In [19]:
train.toPandas().to_csv('train_prepared.csv', index = False)
test.toPandas().to_csv('test_prepared.csv', index = False)