# Credit Default Prediction on Amex Dataset

### Importing the necessary libraries

In [1]:
import numpy as np
import pandas as pd 

import pyspark
from pyspark import StorageLevel
from pyspark.sql import (
    SparkSession, 
    types, 
    functions as F,
)
from pyspark.sql.functions import (
    col,
    isnan,
    when,
    count,
)
from pyspark.ml import Pipeline 
from pyspark.ml.feature import (
    OneHotEncoder, 
    StringIndexer, 
    VectorAssembler, 
    Imputer,
)
from pyspark.ml.classification import (
    LogisticRegression, 
    LinearSVC,
    DecisionTreeClassifier,
    GBTClassifier,
    RandomForestClassifier,
)
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator,
)

import itertools

import pickle

### Create a Spark Session

In [2]:
spark = SparkSession.builder \
                    .appName("amex-app") \
                    .master("local[*]") \
                    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/16 07:03:38 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/12/16 07:03:39 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/12/16 07:03:39 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/12/16 07:03:39 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


### Important Global Variables

In [3]:
TRAIN_DATA_PATH = 'gs://icdp-bigdata-bucket/train_data.csv'
TRAIN_LABEL_PATH = 'gs://icdp-bigdata-bucket/train_labels.csv'

### Miscellaneous Utility Functions

In [4]:
## Function to create a Schema Object for the Dataframe 
def create_spark_schema(series):
    fields = list()
    
    for value in series: 
        if value in string_dtypes:
            fields.append(
                types.StructField(
                    value, 
                    types.StringType(), 
                    True,
                )
            )
        elif value in date_dtypes:
            fields.append(
                types.StructField(
                    value, 
                    types.DateType(), 
                    True,
                )
            )
        elif value in integer_dtypes:
            fields.append(
                types.StructField(
                    value, 
                    types.IntegerType(), 
                    True,
                )
            )
        else:
            fields.append(
                types.StructField(
                    value, 
                    types.FloatType(), 
                    True,
                )
            )
    return types.StructType(fields)

In [5]:
#Add Suffix to List Elements
def add_suffix(names, suffix):
    return [name + suffix for name in names]

In [6]:
# Drop Columns with Null values above a certain threshold
def dropNullColumns(df, threshold):
    """
    This function drops columns containing all null values.
    :param df: A PySpark DataFrame
    """
  
    null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(
        c) for c in df.columns]).collect()[0].asDict()
    print("null counts calculated...")
    df_count = df.count()
    col_to_drop = [k for k, v in null_counts.items() if v >(df_count * threshold)]  
    print("columns to drop found...")
    df = df.drop(*col_to_drop)  
  
    return df, col_to_drop

### Reading the Dataframe

#### Reading the First 20 rows only

In [7]:
train_df_temp = spark.read.option(
    "header", 'true',
).csv(
    TRAIN_DATA_PATH,
).limit(
    20
)
train_labels_temp = spark.read.option(
    "header", 'true',
).csv(
    TRAIN_LABEL_PATH,
).limit(
    20
)

                                                                                

#### Define Schema Using Sampled Temporary Dataframe

In [8]:
## Known Datatypes: 

string_dtypes = ["customer_ID", 'B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']
date_dtypes = ['S_2']
integer_dtypes = ['target']

In [9]:
train_schema = create_spark_schema(train_df_temp.columns)
label_schema = create_spark_schema(train_labels_temp.columns)

#### Remove Temp Datasets from Memory

In [74]:
train_df_temp.unpersist()
train_labels_temp.unpersist()

del train_df_temp
del train_labels_temp

#### Reading the Whole Dataset with the Inferred Schema

In [75]:
train_df = spark.read.option(
    "header", 
    "true",
).csv(
    TRAIN_DATA_PATH, 
    schema=train_schema
)
label_df = spark.read.option(
    "header", 
    "true",
).csv(
    TRAIN_LABEL_PATH, 
    schema=label_schema,
)

In [76]:
## Other categorization of the known dtypes
info_cols = ['customer_ID', 'S_2']
target_cols = ['target']
cat_cols = ['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']


# Define Numeric Columns
excluded = info_cols + cat_cols
num_cols = [col for col in train_df.columns if col not in excluded]

### Preprocessing of the Dataset

#### Dropping Null Columns

In [77]:
## Remove All Columns with More than 5% Missing Values
train_df, cols_to_drop = dropNullColumns(train_df, 0.05)

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42729)
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o4407.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_

Py4JError: An error occurred while calling o47.setCallSite

#### Remove Less Important Column S_2

In [None]:
## Remove the S_2 variable as the testing data and the training data are in different time periods 
train_df = train_df.drop("S_2")

In [None]:
cols_to_drop.append("S_2")

#### Converting Categorical Columns to Numeric using StringIndexer

In [None]:
cat_columns_to_index = list(set(train_df.columns) & set(cat_cols))

In [None]:
cat_cols_indexed = add_suffix(cat_columns_to_index, "_index")

## Create StringIndexer Object
indexer = StringIndexer(
    inputCols=cat_columns_to_index,
    outputCols=cat_cols_indexed,
)
indexer.setHandleInvalid("keep")
indexer_model = indexer.fit(train_df)

train_df = indexer_model.transform(train_df)

#### Impute values for numerical columns

In [None]:
num_columns_to_impute = list(set(train_df.columns) & set(num_cols))

In [None]:
num_cols_imputed = add_suffix(num_columns_to_impute, "_imputed")

##Create Imputer
imputer = Imputer(
    inputCols=num_columns_to_impute,
    outputCols=num_cols_imputed,
)
imputer.setStrategy("median")

imputer_model = imputer.fit(train_df)

train_df = imputer_model.transform(train_df)

#### OneHotEncode the Categorical Columns

In [None]:
cat_cols_ohe = add_suffix(cat_cols_indexed, "_ohe")
https://github.com/yangsong24/Amex_credit_card_default_prediction.git
### Create Ohe Object
ohe = OneHotEncoder(
    inputCols = cat_cols_indexed,
    outputCols = cat_cols_ohe,
)

ohe_model = ohe.fit(train_df)

train_df = ohe_model.transform(train_df)

In [None]:
useful_cols = ["customer_ID"] + cat_cols_ohe + num_cols_imputed

### Remove Unnecessary Columns and Aggregate

In [None]:
train_df = train_df.select(*useful_cols)

In [None]:
new_num_cols = []
for num_col in num_cols_imputed:
    new_name = num_col.split("_")[0] + "_" + num_col.split("_")[1]
    new_num_cols.append(new_name)
    train_df = train_df.withColumnRenamed(num_col, new_name)
new_cat_cols = []
for cat_col in cat_cols_ohe:
    new_name = cat_col.split("_")[0] + "_" + cat_col.split("_")[1]
    new_cat_cols.append(new_name)
    train_df = train_df.withColumnRenamed(cat_col, new_name)

In [None]:
## Aggregation Functions
num_funcs = [
    (F.mean, "_mean"),
     (F.min, "_min"),
     (F.max, "_max"),
]

cat_funcs = [
    (F.count, "_count"),
    (F.last, "_last"),
    (F.countDistinct, "_nunique"),
]

In [None]:
agg_num_args = [
    func(col).alias(col + suffix) 
    for col, (func, suffix) in itertools.product(new_num_cols, num_funcs)]

agg_cols_args = [
    func(col).alias(col + suffix) 
    for col, (func, suffix) in itertools.product(new_cat_cols, cat_funcs)]

# Combine numeric and categoric agg arguments
agg_args = agg_num_args + agg_cols_args

In [None]:
train_df = train_df.groupBy("customer_ID").agg(*agg_args)

In [None]:
train_df = train_df.join(
    F.broadcast(label_df), 
    on="customer_ID",
)

In [27]:
va_model = VectorAssembler(
    inputCols=train_df.drop(
        "customer_ID",
        "target",
    ).columns,
    outputCol="features",
    handleInvalid="skip",
)

In [28]:
train_df = va_model.transform(
    train_df).select(["customer_ID", "features", "target"]).persist(StorageLevel.DISK_ONLY)

                                                                                

### Train Test Split

In [29]:
train_split, test_split = train_df.randomSplit(weights = [0.8, 0.2], seed = 42)

### Fit Models

#### Logistic Regression

In [None]:
lr = LogisticRegression(
    featuresCol="features",
    labelCol="target",
)
lr_model = lr.fit(train_split)

22/12/16 03:49:38 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/16 03:49:38 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [None]:
lr_preds = lr_model.transform(test_split)

In [None]:
binEval = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="target",metricName="areaUnderROC")
multiEval = MulticlassClassificationEvaluator(labelCol = "target", predictionCol = "prediction")

In [None]:
print("AUCROC: ", binEval.evaluate(lr_preds))
print("Accuracy: ", multiEval.evaluate(lr_preds, {multiEval.metricName: "accuracy"}))
print("F1 Score: ", multiEval.evaluate(lr_preds, {multiEval.metricName: "f1"}))
print("Weighted Precision: ", multiEval.evaluate(lr_preds, {multiEval.metricName: "weightedPrecision"}))
print("Weighted Recall: ", multiEval.evaluate(lr_preds, {multiEval.metricName: "weightedRecall"}))

                                                                                

AUCROC:  0.8495909980592996


                                                                                

Accuracy:  0.8898278218092375


                                                                                

F1 Score:  0.8889484226843926


                                                                                

Weighted Precision:  0.8884047070122425




Weighted Recall:  0.8898278218092375


                                                                                

In [35]:
lr_preds.show(50)

+--------------------+--------------------+------+--------------------+--------------------+----------+
|         customer_ID|            features|target|       rawPrediction|         probability|prediction|
+--------------------+--------------------+------+--------------------+--------------------+----------+
|00dbda372d97f2357...|[0.61583860791646...|     0|[-0.9019394609293...|[0.28865210067818...|       1.0|
|01439ee3abf1b4552...|[0.06917607898895...|     0|[6.03246938516561...|[0.99760618289505...|       0.0|
|01500e2a9f82cfab4...|[0.07531008926721...|     0|[6.83407077258295...|[0.99892469040201...|       0.0|
|01bc5e75de384a9bc...|[0.04786717357973...|     0|[3.17294932208646...|[0.95980352597385...|       0.0|
|02d2e4eec5ba89aff...|[0.09979327364514...|     0|[5.82379023291571...|[0.99705233589156...|       0.0|
|036c54f4ae5f87f73...|[0.11121583042236...|     0|[6.67983866744993...|[0.99874559492165...|       0.0|
|0413bb4944d7b691b...|[0.03066083917824...|     0|[4.25528841107

### Save Models and Meta Data 

#### Data to Save

In [50]:
meta_data = {
    #"spark_session": spark,
    "schema":{
        "train_schema": train_schema,
        "label_schema": label_schema,
    },
    "column_names":{
        "cols_to_drop": cols_to_drop,
        "cat_columns_to_index": cat_columns_to_index,
        "num_cols_imputed": num_cols_imputed,
        "cat_cols_ohe": cat_cols_ohe,
        "useful_cols": useful_cols,
    },
}

In [52]:
with open('/home/aap2239/interpretable-credit-default-prediction/meta_data.pkl', 'wb') as handle:
    pickle.dump(meta_data, handle, protocol=pickle.HIGHEST_PROTOCOL)


In [43]:
from google.cloud import storage

PROJECT = 'big-data-86948'
BUCKET_NAME = 'icdp-bigdata-bucket'
first_layer = "icdp_deployment/"
second_layer_meta = "meta_data/"
second_layer_objects = "objects/"
storage_client = storage.Client(project=PROJECT)

In [44]:
def create_folder(bucket_name, folder_name):
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(folder_name)
    blob.upload_from_string('', content_type='application/x-www-form-urlencoded;charset=UTF-8')

In [53]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket. https://cloud.google.com/storage/docs/ """
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print('File {} uploaded to {}.'.format(
        source_file_name,
        destination_blob_name))

In [45]:
create_folder(BUCKET_NAME, first_layer)

In [46]:
create_folder(BUCKET_NAME, "icdp_deployment/"+second_layer_meta)
create_folder(BUCKET_NAME, "icdp_deployment/"+second_layer_objects)

In [56]:
upload_blob(BUCKET_NAME, "/home/aap2239/interpretable-credit-default-prediction/meta_data.pkl", "icdp_deployment/"+second_layer_meta+"meta_data.pkl")

File /home/aap2239/interpretable-credit-default-prediction/meta_data.pkl uploaded to icdp_deployment/meta_data/meta_data.pkl.


In [57]:
!rm /home/aap2239/interpretable-credit-default-prediction/meta_data.pkl

#### Models to Save 

In [58]:
indexer_model.save("gs://icdp-bigdata-bucket/icdp_deployment/objects/indexer_model")

                                                                                

In [59]:
imputer_model.save("gs://icdp-bigdata-bucket/icdp_deployment/objects/imputer_model")

                                                                                

In [60]:
ohe_model.save("gs://icdp-bigdata-bucket/icdp_deployment/objects/ohe_model")

                                                                                

In [61]:
va_model.save("gs://icdp-bigdata-bucket/icdp_deployment/objects/va_model")

                                                                                

In [63]:
lr_model.save("gs://icdp-bigdata-bucket/icdp_deployment/objects/lr_model")

                                                                                