# Train binary classifier
Notebook to train and save a binary classifier that it will be used to identify energy consumption patents.

In [0]:
FEATURES_STORAGE_NAME = "challengebasf"
FEATURES_CONTAINER_NAME = "featurizeddata"
FEATURES_OUTPUT_FOLDER = "output_data"

ENERGY_CLASSIFIER_VERSION = "v1"  # TODO it shoud be increased automatically checking previous models stored
ENERGY_CLASSIFIER_STORAGE_NAME = FEATURES_STORAGE_NAME
ENERGY_CLASSIFIER_CONTAINER_NAME = "classifier"
ENERGY_CLASSIFIER_OUTPUT_FOLDER = f"output_data_{ENERGY_CLASSIFIER_VERSION}"

ENERGY_PATENTS_STORAGE_NAME = FEATURES_STORAGE_NAME
ENERGY_PATENTS_CONTAINER_NAME = "energypatents"
ENERGY_PATENTS_OUTPUT_FOLDER = f"output_data_{ENERGY_CLASSIFIER_VERSION}"

In [0]:
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
def create_if_not_exists_container(storage_name: str, container_name: str):
    input_url = f"https://{storage_name}.blob.core.windows.net/"
    key = spark.conf.get(f"spark.hadoop.fs.azure.account.key.{storage_name}.blob.core.windows.net")
    service = BlobServiceClient(account_url=input_url, credential=key)
    container = service.get_container_client(container_name)
    try:
        container.create_container()
        print(f"Creating container: {container_name}")
    except ResourceExistsError:
        print("Output container already exists")

In [0]:
SECTIONS_IPCR = ["A", "B", "C", "D", "E", "F", "G"]
SECTIONS_CLASS_IPCR = [
    "A01", "A21", "A22", "A23", "A24", "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A61", "A62", "A63", "A99",
    "B01", "B02", "B03", "B04", "B05", "B06", "B07", "B08", "B09",
    "B21", "B22", "B23", "B24", "B25", "B26", "B27", "B28", "B29", "B30", "B31", "B32",
    "B41", "B42", "B43", "B44", "B60", "B61", "B62", "B63", "B64", "B65", "B66", "B67", "B68", "B81", "B82", "B99",
    "C01", "C02", "C03", "C04", "C05", "C06", "C07", "C08", "C09", "C10", "C11", "C12", "C13", "C14",
    "C21", "C22", "C23", "C25", "C30", "C40", "C99",
    "D01", "D02", "D03", "D04", "D05", "D06", "D07", "D21", "D99",
    "E01", "E02", "E03", "E04", "E05", "E06", "E21", "E99",
    "F01", "F02", "F03", "F04", "F15", "F16", "F17", "F21",
    "F22", "F23", "F24", "F25", "F26", "F27", "F28", "F41", "F42", "F99",
    "G01", "G02", "G03", "G04", "G05", "G06", "G07", "G08", "G09", "G10", "G11", "G12", "G21", "G99",
    "H01", "H02", "H03", "H04", "H05", "H99",
]

FEATURE_COLS_SECTION = [f"section_{section}" for section in SECTIONS_IPCR]
FEATURE_COLS_SECTION_CLASS = [f"section_class_{section_class}" for section_class in SECTIONS_CLASS_IPCR]

NON_TEXT_FEATURE_COLS = FEATURE_COLS_SECTION + FEATURE_COLS_SECTION_CLASS
COL_ENGLISH_TEXT = "english_text_features"
TEXT_FEATURE_COLS = ["flag_energy_title", "flag_energy_abstract", "flag_energy_claims"]

In [0]:
from pyspark.sql import functions as sf
input_container_path = f"wasbs://{FEATURES_CONTAINER_NAME}@{FEATURES_STORAGE_NAME}.blob.core.windows.net"
input_path = f"{input_container_path}/{FEATURES_OUTPUT_FOLDER}/"
input_data = spark.read.parquet(input_path)
input_data = input_data.select("_file", COL_ENGLISH_TEXT, *NON_TEXT_FEATURE_COLS, *TEXT_FEATURE_COLS)

input_container_path = f"wasbs://trainingsamples@challengebasf.blob.core.windows.net"
input_blob_folder = f"{input_container_path}/file_names/"
target_files = spark.read.parquet(input_blob_folder)
target_file_p = target_files.toPandas()
target_file_names = target_file_p["_file"].tolist()

input_data = input_data.filter(sf.col("_file").isin(target_file_names))
input_data = input_data.join(target_files, ["_file"], "left")

In [0]:
positive_data = input_data.filter(sf.col("positive_data") == 1)
negative_data = input_data.filter(sf.col("positive_data") == 0)
(p_training_data, p_test_data) = positive_data.randomSplit([0.8, 0.2], seed=18)
(n_training_data, n_test_data) = negative_data.randomSplit([0.8, 0.2], seed=18)
p_training_data = p_training_data.cache()
num_ps_training = p_training_data.count()
num_ns_training = n_training_data.count()
print(f"Positive registers for training: {num_ps_training}")
print(f"Negative registers for training: {num_ns_training}")
# First implementation to balance data, more advanced techniques as SMOTH should be analyzed
num_dups = num_ns_training // num_ps_training
print(f"Duplicating positive data {num_dups} times to balance the data")
p_training_data = p_training_data.withColumn("dummy", sf.explode(sf.array(*[sf.lit(n) for n in range(num_dups)])))
p_training_data = p_training_data.drop("dummy")

training_data = p_training_data.union(n_training_data)
test_data = p_test_data.union(n_test_data)

In [0]:
training_data.write.mode("overwrite").parquet("/tmp/training")
test_data.write.mode("overwrite").parquet("/tmp/test")

## Training the model
We start with a simple model, LogisticRegresson.
Other models that should be tried are NaiveBayes and RandomForest

In [0]:
training_data.printSchema()

In [0]:
training_data = spark.read.parquet("/tmp/training")
test_data = spark.read.parquet("/tmp/test")

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, CountVectorizer, IDF


minDF = 5
cv = CountVectorizer(inputCol=COL_ENGLISH_TEXT, outputCol="text_features_count", minDF=minDF)
idf = IDF(inputCol="text_features_count", outputCol="text_features_idf")
assembler = VectorAssembler(
    inputCols=NON_TEXT_FEATURE_COLS + TEXT_FEATURE_COLS + ["text_features_idf"],
    outputCol="features")
lr = LogisticRegression(elasticNetParam=1, regParam=0.05, labelCol="positive_data")

pipeline = Pipeline(stages=[cv, idf, assembler, lr])

model = pipeline.fit(training_data)

lrModel = model.stages[-1]

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

In [0]:
# Extract the summary from the returned LogisticRegressionModel instance trained in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)

## Test

In [0]:
test_data.printSchema

In [0]:
predictions = model.transform(test_data)
predicions.cache()

In [0]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.sql.types import DoubleType

predictionAndTarget = predictions.select(sf.col("positive_data").astype(DoubleType()), sf.col("prediction").astype(DoubleType()))

# Create both evaluators
metrics_binary = BinaryClassificationMetrics(predictionAndTarget.rdd.map(tuple))
metrics_multi = MulticlassMetrics(predictionAndTarget.rdd.map(tuple))

acc = metrics_multi.accuracy
f1 = metrics_multi.fMeasure(1.0)
precision = metrics_multi.precision(1.0)
recall = metrics_multi.recall(1.0)
auc = metrics_binary.areaUnderROC

In [0]:
print(f"Acc: {acc}")
print(f"f1: {f1}")
print(f"precision: {precision}")
print(f"Recall: {recall}")
print(f"Auc: {auc}")

### Save the model

In [0]:
create_if_not_exists_container(ENERGY_CLASSIFIER_STORAGE_NAME, ENERGY_CLASSIFIER_CONTAINER_NAME)
output_container_path = f"wasbs://{ENERGY_CLASSIFIER_CONTAINER_NAME}@{ENERGY_CLASSIFIER_STORAGE_NAME}.blob.core.windows.net"
output_blob_folder = f"{output_container_path}/{ENERGY_CLASSIFIER_OUTPUT_FOLDER}/"
model.save(output_blob_folder)