In [6]:
import os
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline
from ucimlrepo import fetch_ucirepo

In [12]:
spark.stop()

In [7]:
SEED = 9
spark_master = os.environ.get("SPARK_MASTER_URL")
train_path = os.path.join(os.environ.get("TRAIN_PATH"))
test_path = os.path.join(os.environ.get("TEST_PATH"))

In [8]:
# initialize spark session
spark_master = os.environ.get("SPARK_MASTER_URL")
spark = SparkSession.builder \
    .appName("Random-Forest-Classifier") \
    .master(spark_master) \
    .getOrCreate()

logger = logging.getLogger("py4j")
logger.setLevel(logging.ERROR)


In [9]:
from scripts.stage_data import write_data, read_data

In [10]:
write_data(train_path,test_path,spark)

Writing to absolute paths:
Train: /app/data/train
Test: /app/data/test
Features shape: 30000 rows
Targets shape: 30000 rows


                                                                                

Combined dataset size: 30000 rows


                                                                                

Split sizes - Train: 21063, Test: 8937
root
 |-- X1: long (nullable = true)
 |-- X2: long (nullable = true)
 |-- X3: long (nullable = true)
 |-- X4: long (nullable = true)
 |-- X5: long (nullable = true)
 |-- X6: long (nullable = true)
 |-- X7: long (nullable = true)
 |-- X8: long (nullable = true)
 |-- X9: long (nullable = true)
 |-- X10: long (nullable = true)
 |-- X11: long (nullable = true)
 |-- X12: long (nullable = true)
 |-- X13: long (nullable = true)
 |-- X14: long (nullable = true)
 |-- X15: long (nullable = true)
 |-- X16: long (nullable = true)
 |-- X17: long (nullable = true)
 |-- X18: long (nullable = true)
 |-- X19: long (nullable = true)
 |-- X20: long (nullable = true)
 |-- X21: long (nullable = true)
 |-- X22: long (nullable = true)
 |-- X23: long (nullable = true)
 |-- Y: long (nullable = true)

root
 |-- X1: long (nullable = true)
 |-- X2: long (nullable = true)
 |-- X3: long (nullable = true)
 |-- X4: long (nullable = true)
 |-- X5: long (nullable = true)
 |-- X6: 

                                                                                

Files successfully written to disk
Train directory exists: True
Test directory exists: True


In [11]:
train, test = read_data(train_path,test_path,spark)

Attempting to read from:
Train: /app/data/train
Test: /app/data/test
Train directory contents: ['._SUCCESS.crc', '_SUCCESS']
Test directory contents: ['._SUCCESS.crc', '_SUCCESS']
Error reading parquet files: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually.


AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually.

In [None]:

# fetch dataset 
default_of_credit_card_clients = fetch_ucirepo(id=350) 
X = spark.createDataFrame(default_of_credit_card_clients.data.features)
y = spark.createDataFrame(default_of_credit_card_clients.data.targets)

# combine X and y
X = X.withColumn("id", monotonically_increasing_id())
y = y.withColumn("id", monotonically_increasing_id())
df = X.join(y, on="id", how="inner").drop("id")

In [None]:
SEEDS = [1,2,4,56,42,75,12,56,9]
for seed in SEEDS:

    train, test = df.randomSplit([.7,.3], seed=seed)
    train.write.mode("overwrite").parquet(os.path.join(train_path,str(seed)))
    test.write.mode("overwrite").parquet(os.path.join(test_path,str(seed)))
    print(train.count())

In [6]:
schema = StructType([
    StructField("X1", LongType(), True),
    StructField("X2", LongType(), True),
    StructField("X3", LongType(), True),
    StructField("X4", LongType(), True),
    StructField("X5", LongType(), True),
    StructField("X6", LongType(), True),
    StructField("X7", LongType(), True),
    StructField("X8", LongType(), True),
    StructField("X9", LongType(), True),
    StructField("X10", LongType(), True),
    StructField("X11", LongType(), True),
    StructField("X12", LongType(), True),
    StructField("X13", LongType(), True),
    StructField("X14", LongType(), True),
    StructField("X15", LongType(), True),
    StructField("X16", LongType(), True),
    StructField("X17", LongType(), True),
    StructField("X18", LongType(), True),
    StructField("X19", LongType(), True),
    StructField("X20", LongType(), True),
    StructField("X21", LongType(), True),
    StructField("X22", LongType(), True),
    StructField("X23", LongType(), True),
    StructField("Y", LongType(), True)
])


In [9]:
# Read the DataFrame from the saved Parquet file

train = spark.read.parquet(train_path)
#test = spark.read.schema(schema).parquet(os.path.join(train_path,'9'))


AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually.

In [None]:
spark.stop()

In [8]:
train.count()

0

In [None]:
spark.active()

In [None]:
os.path.join("file///",train_path,'98')

In [None]:
default_fs = spark.sparkContext.getConf().get("fs.defaultFS")
print("Default File System:", default_fs)


In [None]:
train.count()

In [None]:
os.path.join(train_path,str(seed))

In [None]:
# list of numerical and categorical columns
target = ['Y']
num_feat = ['X1','X5','X12','X13','X14','X15','X16','X17','X18','X19','X20','X21','X22','X23']
cat_feat = [col for col in train.columns if col not in num_feat+target]
cat_feat_indexed = [f"{col}_i" for col in cat_feat]
cat_feat_encoded = [f"{col}_e" for col in cat_feat_indexed]



In [None]:
# Feature Transformations
string_indexer = StringIndexer(inputCols=cat_feat, outputCols=cat_feat_indexed, handleInvalid='keep')
hot_encoder = OneHotEncoder(inputCols=cat_feat_indexed, outputCols=cat_feat_encoded, handleInvalid='keep')
vector_assembler_1 = VectorAssembler(inputCols=num_feat+cat_feat_encoded,outputCol="features")

# Target Transformations
string_indexer_target = StringIndexer(inputCol='Y',outputCol='Y_i',handleInvalid='keep')
hot_encoder_target = OneHotEncoder(inputCol='Y_i', outputCol='Y_i_e',handleInvalid='keep')


In [None]:
# Vector Indexing (might be better for trees)
vector_assembler_2 = VectorAssembler(inputCols=num_feat+cat_feat,\
                                     outputCol="features")
vector_indexer = VectorIndexer(maxCategories=15,inputCol="features",\
                               outputCol="indexed_features")

# Target
vector_indexer_target = VectorIndexer(maxCategories=15,inputCol='Y',\
                               outputCol="indexed_target")

In [None]:
# pipeline
pipeline1 = Pipeline(stages=[string_indexer, \
                            hot_encoder, \
                            vector_assembler_1, \
                            string_indexer_target, \
                            hot_encoder_target])
train_1 = pipeline1.fit(train).transform(train)

In [None]:
# pipeline
pipeline2 = Pipeline(stages=[vector_assembler_2,\
                            vector_indexer,\
                            vector_indexer_target])
train_2 = pipeline2.fit(train).transform(train)

In [None]:
spark.stop()