In [1]:
import os
import shutil
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType

In [2]:
pyspark.__version__

'2.4.4'

In [3]:
def replace_string_columns(df):
    """Replace all string columns with integers.
    This does not return the string to integer mappings."""
    df = df.fillna('')
    df = df.cache()
    for colname, dtype in df.dtypes:
        if dtype == 'string':
            indexer = StringIndexer(inputCol=colname, outputCol=colname + '_index')
            df = indexer.fit(df).transform(df)
            df = df.drop(colname)
            df = df.withColumn(colname, df[colname + '_index'].cast(IntegerType()))
            df = df.drop(colname + '_index')
    return df

In [4]:
def get_schema_ddl():
    dtypes = [
        ("loan_id", "bigint"),
        ("monthly_reporting_period", "date"),
        ("servicer", "string"),
        ("interest_rate", "double"),
        ("current_actual_upb", "double"),
        ("loan_age", "double"),
        ("remaining_months_to_legal_maturity", "double"),
        ("adj_remaining_months_to_maturity", "double"),
        ("maturity_date", "string"),
        ("msa", "double"),
        ("current_loan_delinquency_status", "int"),
        ("mod_flag", "string"),
        ("zero_balance_code", "string"),
        ("zero_balance_effective_date", "date"),
        ("last_paid_installment_date", "date"),
        ("foreclosed_after", "date"),
        ("disposition_date", "date"),
        ("foreclosure_costs", "double"),
        ("prop_preservation_and_repair_costs", "double"),
        ("asset_recovery_costs", "double"),
        ("misc_holding_expenses", "double"),
        ("holding_taxes", "double"),
        ("net_sale_proceeds", "double"),
        ("credit_enhancement_proceeds", "double"),
        ("repurchase_make_whole_proceeds", "double"),
        ("other_foreclosure_proceeds", "double"),
        ("non_interest_bearing_upb", "double"),
        ("principal_forgiveness_upb", "double"),
        ("repurchase_make_whole_proceeds_flag", "string"),
        ("foreclosure_principal_write_off_amount", "double"),
        ("servicing_activity_indicator", "string")
    ]
    schema = ','.join([' '.join(col) for col in dtypes])
    return schema

In [5]:
def get_data_path(
    base_dir,
    data_file_prefix,
    size_multiplier,
    partitions,
    stripe_size_MiB,
    compression,
    file_format,
    ):
    
    basename = '%s-%0.2fx-%dp-%dMiB-%s.%s' % (data_file_prefix, size_multiplier, partitions, stripe_size_MiB, compression, file_format)
    return os.path.join(base_dir, basename)

In [6]:
def get_mortgage_dataframe(
    base_dir='/mnt/isilon1/data/mortgage',
    input_file='perf/Performance_2001*.txt',
    size_multiplier=1.0,
    partitions=48,
    stripe_size_MiB=64,
    convert_strings=False,
    ):
    
    spark = (SparkSession
         .builder
         .config('spark.driver.memory', '1000g')
         .config('hive.exec.orc.default.stripe.size', stripe_size_MiB*1024*1024)
         .getOrCreate()
         )
    input_path = os.path.join(base_dir, input_file)
    schema = get_schema_ddl()
    df = spark.read.load(input_path, format='csv', sep='|', schema=schema, header=False, dateFormat='MM/dd/yyyy')
    if convert_strings:
        df = replace_string_columns(df)
    if size_multiplier != 1.0:
        df = df.sample(True, size_multiplier, seed=7)
    if partitions:
        df = df.repartition(partitions)
    return df

In [7]:
def write_mortgage_dataframe(
    df,
    base_dir='/mnt/isilon1/data/mortgage',
    data_file_prefix='perf-from-spark',
    size_multiplier=1.0,
    partitions=48,
    stripe_size_MiB=64,
    compression='snappy',
    file_format='orc',
    ):
    
    output_path = get_data_path(base_dir, data_file_prefix, size_multiplier, partitions, stripe_size_MiB, compression, file_format)
    print('write_mortgage_dataframe: output_path=%s' % output_path)
    if os.path.exists(output_path): shutil.rmtree(output_path)
    (df.write
     .format(file_format)
     .option('compression',compression)
     .save(output_path))
    print('write_mortgage_dataframe: done.')    

In [8]:
base_dir = '/mnt/isilon1/data/mortgage'
input_file = 'perf/Performance_*.txt'
data_file_prefix = 'perf-no-strings'
convert_strings = True

In [9]:
if False:
    size_multiplier = 0.01
    partitions = 48
    stripe_size_MiB = 2048
    df = get_mortgage_dataframe(
            base_dir=base_dir,
            input_file=input_file,
            size_multiplier=size_multiplier,
            partitions=partitions,
            stripe_size_MiB=stripe_size_MiB,
            )     

In [10]:
%%time
for size_multiplier in [6.0]:
    for partitions in [48]:
        for stripe_size_MiB in [2048]:            
            df = get_mortgage_dataframe(
                    base_dir=base_dir,
                    input_file=input_file,
                    size_multiplier=size_multiplier,
                    partitions=partitions,
                    stripe_size_MiB=stripe_size_MiB,
                    convert_strings=convert_strings,
                    )
            for compression in ['snappy']:
                for file_format in ['orc']:
                    write_mortgage_dataframe(
                        df,
                        base_dir=base_dir,
                        data_file_prefix=data_file_prefix,
                        size_multiplier=size_multiplier,
                        partitions=partitions,
                        stripe_size_MiB=stripe_size_MiB,
                        compression=compression,
                        file_format=file_format,
                        )

write_mortgage_dataframe: output_path=/mnt/isilon1/data/mortgage/perf-no-strings-6.00x-48p-2048MiB-snappy.orc
write_mortgage_dataframe: done.
CPU times: user 635 ms, sys: 300 ms, total: 935 ms
Wall time: 40min 33s
