# TODO:
* make this look at description only

In [None]:
%%configure -f
{
 "executorCores": 4,
 "executorMemory": "47696M",
 "conf": {"spark.default.parallelism": 1000,
          "spark.sql.shuffle.partitions": 1000,
          "spark.task.cpus": 1
         }
}

In [None]:
word2id_path = "s3://onai-ml-dev-eu-west-1/company2vec/common"
data_path = "s3://onai-ml-dev-eu-west-1/company2vec/data_nmf"

In [None]:
from gensim.utils import lemmatize, tokenize
from gensim.parsing.preprocessing import remove_stopwords
from gensim.parsing.porter import PorterStemmer
from nltk.stem import WordNetLemmatizer

from langdetect import detect
import string
from itertools import islice
import numpy as np
import smart_open
from collections import Counter
import math

from pyspark.ml.feature import (HashingTF,
                                IDF,
                                Tokenizer,
                                StopWordsRemover,
                                CountVectorizer,
                                StringIndexer,
                                OneHotEncoderEstimator,
                                VectorAssembler,
                                VectorSizeHint,
                                StandardScaler,
                                PCA
                               )
from pyspark.ml import Pipeline, Transformer

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.mllib.linalg import DenseMatrix
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors

from pyspark.ml.linalg import Vectors, VectorUDT

In [None]:
companies_raw = spark.read.load("s3://ai-data-lake-dev-eu-west-1/business/company_data_denormalized")

In [None]:
def is_english(text):
    try:
        return detect(text) == 'en'
    except:
        return False
is_english_udf = F.udf(is_english, T.BooleanType())

p = PorterStemmer()
def process_text(text):
    from nltk.stem import WordNetLemmatizer
    lemmatizer = WordNetLemmatizer()
    if not text:
        return ''
    text = remove_stopwords(text)
    text = p.stem_sentence(text)
    words = [lemmatizer.lemmatize(word) for word in tokenize(text, lower=True)]
    return list(
        filter(lambda word: word not in string.punctuation and word.isalpha() and len(word) > 1, words)
    )
process_text_udf = F.udf(process_text, T.ArrayType(T.StringType()))

def sparse_bow(*args):
    ret = []
    for el in args:
        if not el:
            continue
        ret.extend(el)
    return dict(Counter(ret))
sparse_bow_udf = F.udf(sparse_bow, T.MapType(T.StringType(), T.IntegerType()))

In [None]:
class OneHotEncoderEmpty(Transformer):
    def __init__(self, inputCol, outputCol, categories):
        super(OneHotEncoderEmpty, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.categories = categories

    def _transform(self, ds):
        categories = self.categories

        def fill_onehot(text):
            ret = [0.0]*len(categories)
            if not text:
                return ret
            for i,el in enumerate(categories):
                if text == el:
                    ret[i]=1.0
            return ret

        fill_onehot_udf = F.udf(fill_onehot, T.ArrayType(T.DoubleType()))
        onehot_to_vector = F.udf(lambda arr: Vectors.dense(arr), VectorUDT())

        ds = ds.withColumn(self.outputCol+"_tmp", fill_onehot_udf(self.inputCol))
        return ds.withColumn(self.outputCol, onehot_to_vector(self.outputCol+"_tmp")).drop(self.outputCol+"_tmp")

In [None]:
duplicate_ids = [row.company_id for row in
                 companies_raw
                 .groupBy("company_id")
                 .agg(F.count("*").alias("count"))
                 .filter(F.col("count") > 1)
                 .collect()
                 ]

In [None]:
companies = (companies_raw
             .filter(~F.col("company_id").isin(duplicate_ids))
             .filter((F.col("company_long_description").isNotNull() &
                     (F.length("company_long_description") > 0)) |
                     
                     (F.col("company_description").isNotNull() &
                     (F.length("company_description") > 0)) |
                     
                     (F.col("website_long_description").isNotNull() &
                     (F.length("website_long_description") > 0)) |
                     
                     (F.col("website_description").isNotNull() &
                     (F.length("website_description") > 0))
                    )
             .withColumn("merged_description",
                         sparse_bow_udf(process_text_udf("company_long_description"),
                                        process_text_udf("company_description"),
                                        process_text_udf("website_long_description"),
                                        process_text_udf("website_description"))
                        )
             .filter(F.size("merged_description") > 0)
             .drop("company_description")
             .drop("company_long_description")
             .drop("website_description")
             .drop("website_long_description")
             .fillna({"latest_ebitda": 0.0,
                      "latest_revenue": 0.0,
                      "latest_revenue_growth": 0.0,
                      "latest_ebitda_margin": 0.0,
                      "number_of_employees": 0
                     })
             .repartition(1000)
             .cache()
             )

In [None]:
companies.count()

In [None]:
companies.select("company_id").distinct().count()

In [None]:
countries = list(
    sorted([row[0] for row in companies.select("country").distinct().collect() if len(row[0]) > 0])
)

In [None]:
industry_ids = list(
    sorted([row[0] for row in companies.select("sic_code").distinct().collect() if len(row[0]) > 0])
)

In [None]:
company_types = list(
    sorted([row[0] for row in companies.select("company_type").distinct().collect() if len(row[0]) > 0])
)

In [None]:
regions = list(
    sorted([row[0] for row in companies.select("region").distinct().collect() if len(row[0]) > 0])
)

In [None]:
num_docs = companies.count()

In [None]:
min_df = num_docs*0.00001

In [None]:
words_df = {row.key: row.n_docs for row in
            companies.select(F.explode("merged_description"))
                     .groupBy("key")
                     .agg(F.count("*").alias("n_docs"))
                     .filter(F.col("n_docs") > min_df)
                     .collect()
            }

In [None]:
words_idf = {k: math.log((num_docs+1)/(v+1)) for k,v in words_df.items()}
idx = 0
id2word = {}
word2id = {}
for word in sorted(words_idf):
    id2word[idx] = word
    word2id[word] = idx
    idx += 1

In [None]:
with smart_open.open(f"{word2id_path}/bow/words_idf.csv", "w") as f:
    for word,idf in sorted(words_idf.items()):
        f.write(f"{word},{idf}\n")

In [None]:
with smart_open.open(f"{word2id_path}/bow/word2id.csv", "w") as f:
    for word,idd in sorted(word2id.items()):
        f.write(f"{word},{idd}\n")

In [None]:
words_idf = {}
with smart_open.open(f"{word2id_path}/bow/words_idf.csv", "r") as f:
    for line in f:
        word,idf = line.strip().split(",")
        words_idf[word] = float(idf)

In [None]:
word2id = {}
with smart_open.open(f"{word2id_path}/bow/word2id.csv", "r") as f:
    for line in f:
        word,idd = line.strip().split(",")
        word2id[word] = int(idd)

In [None]:
num_words = len(word2id)

In [None]:
def words_tfidf(bow):
    dct = {word2id[k]: math.log(v+1)*words_idf[k] for k,v in bow.items() if k in word2id}
    return Vectors.sparse(num_words, dct)
words_tfidf_udf = F.udf(words_tfidf, VectorUDT())

In [None]:
companies_tfidf = (companies
                   .withColumn("bow_tfidf", words_tfidf_udf("merged_description"))
                   )

In [None]:
companies_tfidf.count()

In [None]:
country_onehot_encoder = OneHotEncoderEmpty(inputCol="country", 
                                            outputCol="country_onehot",
                                            categories=countries
                                            )
industry_onehot_encoder = OneHotEncoderEmpty(inputCol="sic_code", 
                                            outputCol="industry_onehot",
                                            categories=industry_ids
                                            )
type_onehot_encoder = OneHotEncoderEmpty(inputCol="company_type", 
                                         outputCol="type_onehot",
                                         categories=company_types
                                         )
region_onehot_encoder = OneHotEncoderEmpty(inputCol="region", 
                                           outputCol="region_onehot",
                                           categories=regions
                                           )

vectorizer_numeric = VectorAssembler(inputCols=[
                                        "latest_revenue",
                                        "number_of_employees",
                                        "latest_ebitda",
                                        "latest_ebitda_margin",
                                        "latest_revenue_growth"
                                       ],
                             outputCol="numeric_features",
                             handleInvalid = "skip"
                            )

scaler = StandardScaler(inputCol="numeric_features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)

vectorizer = VectorAssembler(inputCols=["bow_tfidf",
                                        "scaledFeatures",
                                        "industry_onehot",
                                        "region_onehot",
                                        "type_onehot",
                                        "country_onehot"
                                       ],
                             outputCol="features",
                             handleInvalid = "skip"
                            )

pipeline = Pipeline(stages=[country_onehot_encoder,
                            region_onehot_encoder,
                            industry_onehot_encoder,
                            type_onehot_encoder,
                            vectorizer_numeric,
                            scaler,
                            vectorizer
                           ])
pipeline_fit = pipeline.fit(companies_tfidf)
processed_companies = pipeline_fit.transform(companies_tfidf).repartition(1000)

In [None]:
def make_checker_udf(categories):    
    def check_onehot(column, vector):
        if not column:
            return not np.any(vector)
        i=-1
        for ii,cat in enumerate(categories):
            if cat == column:
                i=ii
                break
        return bool(vector[i] == 1.0)
    return F.udf(check_onehot, T.BooleanType())

In [None]:
(processed_companies.select("country",
                            "country_onehot",
                            make_checker_udf(countries)("country", "country_onehot").alias("country_valid"),
                            "sic_code",
                            "industry_onehot",
                            make_checker_udf(industry_ids)("sic_code", "industry_onehot").alias("industry_valid")
                           )
).show(20, False)

###### Some columns can be negative, so for them we subtract the minimum from them, so we can apply NMF

In [None]:
def col_splat(vec):
    ret = {}
    for ind,val in zip(vec.indices, vec.values):
        ret[int(ind)] = float(val)
    return ret

col_splat_udf = F.udf(col_splat, T.MapType(T.IntegerType(), T.DoubleType()))

In [None]:
min_val = {row.key: min(0.0, row.min_val_at_index) 
           for row in
            processed_companies.select(col_splat_udf("features").alias("sparse"))
             .select(F.explode("sparse"))
             .groupBy("key")
             .agg(F.min("value").alias("min_val_at_index"))
             .collect()
 }

In [None]:
get_size = F.udf(lambda vec: vec.size, T.IntegerType())
get_indices = F.udf(lambda vec: [int(el) for el in vec.indices], T.ArrayType(T.IntegerType()))
get_values  = F.udf(lambda vec: [float(el) - min_val[vec.indices[i]] for i,el in enumerate(vec.values)], T.ArrayType(T.DoubleType()))

In [None]:
(processed_companies.select("company_id",
                            get_size("features").alias("size"),
                            get_indices("features").alias("feature_indices"),
                            get_values("features").alias("feature_values"),
                            "merged_description"
                           )
 .repartition(32)
 .write
 .parquet(f"{data_path}/raw_company_features", mode="overwrite")
)

In [None]:
companies_df = spark.read.load(f"{data_path}/raw_company_features")

In [None]:
companies_df.filter("company_id == '704634'").show(20, False)

In [None]:
companies_df.count()

In [None]:
companies_df.select("company_id").distinct().count()