### Import packages

In [1]:
# -*- coding: utf-8 -*-
import pymysql
from pyspark.sql import SparkSession, SQLContext
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
pd.options.mode.chained_assignment = None  # default='warn'
from pyspark.ml.feature import StringIndexer, Tokenizer, CountVectorizer, VectorAssembler, MinMaxScaler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

In [2]:
spark = SparkSession.builder.getOrCreate()

### Import data from SQL database

In [3]:
def import_data_from_SQL(host, user, password, port, database):
    dbcon = pymysql.connect(host=host ,user=user,password=password, port = port, database = database)
    try:
        pdf = pd.read_sql_query(
            '''SELECT
                name,
                country,
                industry
              FROM company_table
              ''', dbcon)
        dbcon.close()
        return(pdf)
    except:
        dbcon.close()
        return(print("Error: unable to convert the data"))


In [4]:
host='127.0.0.1'
user='root'
password=''
port = 3307
database = "company_names"

pdf = import_data_from_SQL(host, user, password, port, database)

In [None]:
def plot_class_distr(df):
    indust_count = df['industry'].value_counts()
    indust_count.plot.barh(x= indust_count.index, y=indust_count.values)
    plt.gcf().set_size_inches(5, 60)
    plt.grid()
    return(plt.show())

plot_class_distr(pdf)

### Data cleaning

In [5]:
""" Drop rows with 'nan', replace non eng char and remove space and beginnning and end """
""" Select the n largest classes and the rows with the classes. Undersample so that the largest classes have equal number of
of instances as the smallest class -> Uniform data with same number of instances in each class."""

def clean_main_df(df, n):
    df_copy = df.copy()
    df_copy  = df_copy [(df_copy !='nan').all(1)]
    df_copy['name'] = df_copy ['name'].str.replace('[^a-zA-Z]+',' ', regex=True).str.strip()
    df_copy  = df_copy[df_copy['name'].str.isspace() == False]
    df_copy  = df_copy .drop_duplicates()
    indust_count = df_copy['industry'].value_counts()
    n_indu = indust_count.index[:n]
    df_sel_class = df_copy[df_copy['industry'].isin(n_indu)]
    n_cut = df_sel_class['industry'].value_counts() - indust_count.loc[n_indu[-1]]
    n_cut_dict = n_cut.to_dict()
    for k, v in n_cut_dict.items():
        df_class = df_sel_class[df_sel_class['industry'] == k]
        drop_indices = np.random.choice(df_class.index, v, replace=False)
        df_sel_class = df_sel_class.drop(drop_indices)
    return(df_sel_class)

In [6]:
number_classes = 3
pdf_clean = clean_main_df(pdf, number_classes)
sdf = spark.createDataFrame(pdf_clean)

### Data transformation

In [15]:
def transform_data(sdf):
    train, test = sdf.randomSplit([0.8, 0.2], seed=12345)

    """ set country and industry to categorical columns """
    country_index_fit = StringIndexer(inputCol="country", outputCol="country_index").fit(sdf)
    sdf_tr1 = country_index_fit.transform(train)
    sdf_te1 = country_index_fit.transform(test)
    industry_index_fit = StringIndexer(inputCol="industry", outputCol="label").fit(sdf_tr1)
    sdf_tr2 = industry_index_fit.transform(sdf_tr1)
    sdf_te2 = industry_index_fit.transform(sdf_te1)

    """ convert name to countvector """
    tokenizer = Tokenizer(inputCol="name", outputCol="token_name")
    sdf_tr3 = tokenizer.transform(sdf_tr2)
    sdf_te3 = tokenizer.transform(sdf_te2)
    cvector = CountVectorizer(inputCol="token_name", outputCol="name_vector", vocabSize=30).fit(sdf_tr3)
    sdf_tr4 = cvector.transform(sdf_tr3)
    sdf_te4 = cvector.transform(sdf_te3)

    """ combine the features """
    assembler = VectorAssembler(inputCols=["country_index", "name_vector"], outputCol="features")
    sdf_tr5 = assembler.transform(sdf_tr4)
    sdf_te5 = assembler.transform(sdf_te4)

    """ norm features """
    scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
    scalerModel = scaler.fit(sdf_tr5)
    sdf_tr6 = scalerModel.transform(sdf_tr5)
    sdf_te6 = scalerModel.transform(sdf_te5)

    columns_to_drop = ["country_index","token_name","name_vector", "features"]
    sdf_tr7 = sdf_tr6.drop(*columns_to_drop)
    sdf_te7 = sdf_te6.drop(*columns_to_drop)

    return(sdf_tr7, sdf_te7, country_index_fit, industry_index_fit, tokenizer, cvector, assembler, scalerModel)

In [16]:
training, test, country_index_fit, industry_index_fit, tokenizer, cvector, assembler, scalerModel = transform_data(sdf)

### Create a model

In [17]:
def create_model(training, test):
    rf = RandomForestClassifier(featuresCol = 'scaledFeatures', labelCol = 'label')
    rfModel = rf.fit(training)
    predictions = rfModel.transform(test)
    evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
    predictionAndTarget = predictions.select("label", "prediction")
    acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
    f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
    weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
    weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})
    print('Accuracy = {} \nF1 = {} \nWeighted Precision = {} \nWeighted Recall = {}'.format(round(acc,2), round(f1,2), round(weightedPrecision,2), round(weightedRecall,2)))
    return(rfModel)

In [18]:
rfModel = create_model(training, test)

Accuracy = 0.54 
F1 = 0.51 
Weighted Precision = 0.64 
Weighted Recall = 0.54


In [19]:
def check_input(rfModel, country_index_fit, industry_index_fit, tokenizer, cvector, assembler, scalerModel):
    name = input('Name: ').lower()
    country = input('Country: ').lower()
    input_sdf = spark.createDataFrame([(name, country)],['name', 'country'])
    input_sdf1 = country_index_fit.transform(input_sdf)
    input_sdf2 = tokenizer.transform(input_sdf1)
    input_sdf3 = cvector.transform(input_sdf2)
    input_sdf4 = assembler.transform(input_sdf3)
    input_sdf5 = scalerModel.transform(input_sdf4)
    columns_to_drop = ["country_index","token_name","name_vector", "features"]
    input_sdf6 = input_sdf5.drop(*columns_to_drop)
    input_predictions = rfModel.transform(input_sdf6)
    inp_pred = input_predictions.select('prediction')
    inp_prob = input_predictions.select('probability')
    inverter = IndexToString(inputCol="prediction", outputCol="industry_pred", labels=industry_index_fit.labels)
    itd = inverter.transform(input_predictions)
    input_pred = itd.select('probability').collect()[0][0]
    indust_lab = industry_index_fit.labels
    for i,j in zip(input_pred,indust_lab):
        print('{} {} %'.format(j,round(i,2)*100))
    return(print(itd.select('name', 'country', 'industry_pred').show()))

In [20]:
check_input(rfModel, country_index_fit, industry_index_fit, tokenizer, cvector, assembler, scalerModel)

construction 26.0 %
information technology and services 40.0 %
marketing and advertising 34.0 %
+----+-------+--------------------+
|name|country|       industry_pred|
+----+-------+--------------------+
|wuzu|denmark|information techn...|
+----+-------+--------------------+

None
