In [7]:
import pandas as pd
import os

First step is to create the connection to Databricks

In [8]:
# Connect to Databricks
from databricks import sql
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
import pandas as pd

# Load the environment variables
load_dotenv()

# Set the connection
connection =  sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("PYTHON_ACCESS_TOKEN"))

cursor = connection.cursor()


# Start spark session
spark = SparkSession.builder \
    .appName("mkt_analysis_2024") \
    .getOrCreate()

def get_df_from_databricks(query):
    # Execute the query
    cursor.execute(query)
    # Get the result
    result = cursor.fetchall()
    # Get the column names
    columns = [desc[0] for desc in cursor.description]
    # Create a pandas DataFrame
    df = pd.DataFrame(result, columns=columns)
    return df


24/05/19 14:06:40 WARN Utils: Your hostname, Bernardos-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.68.117 instead (on interface en0)
24/05/19 14:06:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/19 14:06:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


A bit about how the data is structured at Metaloop. We essentially have these sources of trasactional material data:

 - Cart items
    - These are the materials that compose a transaction (stored in the orders object). Each order is associated with two accounts, a seller and a buyer. Therefore, the lowest granularity data here will be cart item level data with data from the orders joined
 - Opportunity materials
    - These paly a similar role as cart items, but on the opportunity level (prior to an order being put forward)
 - Conditions (Contract materials)

 

In [11]:
# Read the query from the query file    
queries_path = os.path.join(os.getcwd(), "queries/")

# Read the cart items query
query = open(queries_path + "cart_items.sql").read()

# Get cart_items
cart_items = get_df_from_databricks(query)

Analyse the cart items

In [35]:
# Print the dataset
cart_items.head()

Unnamed: 0,pk_cart_item,fk_order,ds_material_alias,ds_material,fk_material_alias,fk_material,ds_purchase_price_formula,ds_sale_price_formula,vl_quantity_purchase,vl_quantity_sell,vl_unit_price_purchase,vl_unit_price_sell,ds_status_label
0,a0V09000000p8gCEAQ,a0b09000000gKg6AAE,Cu mit Fe (87%),"Kupfer, gemischt, Raff, alt",a0g09000001F4tuAAC,a0Z09000005ERBFEA4,,,374.0,374.0,4.38,4.38,Webshop
1,a0V09000000p8vyEAA,a0b09000000gKgBAAU,"Kupfer, blank I, Millberry","Kupfer, blank I, Millberry",a0g09000001F4tqAAC,a0Z09000000oVReEAM,,,220.0,220.0,5.15,5.15,Webshop
2,a0V09000000p8vzEAA,a0b09000000gKgBAAU,"Kupfer schwer, Berry min. 98% Cu","Kupfer schwer, Berry min. 98% Cu",a0g09000001F4tmAAC,a0Z09000005ERBKEA4,,,25.0,25.0,4.43,4.43,Webshop
3,a0V09000000p8w0EAA,a0b09000000gKgBAAU,Kupferkabel mit Stecker,Kupferkabel mit Stecker,a0g09000001F4uMAAS,a0Z09000005ESrsEAG,,,5.0,5.0,0.67,0.67,Webshop
4,a0V09000000p8w1EAA,a0b09000000gKgBAAU,"Kupferkabel, min. 38% Cu","Kupferkabel, min. 38% Cu",a0g09000001F4uOAAS,a0Z09000000oVSIEA2,,,50.0,50.0,1.41,1.41,Webshop


## Simple bag of words model

With a simple bag of words model, we try to extract features from the alias, material and formula fields. Our target variable is the variable product_id, which is a class corresponding to the underlying product related to the material. This is encoded by the class in fk_material column.
We will also only use the aliases at this point

In [12]:
# Revoving the unncessary columns
df1 = cart_items.drop(['pk_cart_item', 'fk_order', 'fk_material_alias', 'ds_status_label', 'ds_material'], 
                      axis=1)

# Change ds_material_alias to string
df1['ds_material_alias'] = df1['ds_material_alias'].astype(str)

Here we can actually "duplicate" the 

In [13]:
# Define function to count words
def count_words(string):
    return len(string.split())

# Define function to get the average word length
def avg_word_length(string):
    words = string.split()
    return sum(len(word) for word in words) / len(words)

# Check if the description has the percentage symbol
df1['flg_has_perc'] = df1['ds_material_alias'].apply(lambda x: len([c for c in x if c == '%'])>0)

# Create number of words for the two columns available
df1['n_words_alias'] = df1['ds_material_alias'].apply(count_words)

df1.head()


Unnamed: 0,ds_material_alias,fk_material,ds_purchase_price_formula,cd_alias_language,ds_sale_price_formula,vl_quantity_purchase,vl_quantity_sell,vl_unit_price_purchase,vl_unit_price_sell,flg_has_perc,n_words_alias
0,Cu mit Fe (87%),a0Z09000005ERBFEA4,,DE,,374.0,374.0,4.38,4.38,True,4
1,"Kupfer, blank I, Millberry",a0Z09000000oVReEAM,,DE,,220.0,220.0,5.15,5.15,False,4
2,"Kupfer schwer, Berry min. 98% Cu",a0Z09000005ERBKEA4,,DE,,25.0,25.0,4.43,4.43,True,6
3,Kupferkabel mit Stecker,a0Z09000005ESrsEAG,,DE,,5.0,5.0,0.67,0.67,False,3
4,"Kupferkabel, min. 38% Cu",a0Z09000000oVSIEA2,,DE,,50.0,50.0,1.41,1.41,True,4


## Tokenization

1. Converting words into lowercase
2. Removing leading and trailing whitespaces
3. Removing punctuation
4. Removing stopwords
5. Expanding contractions
6. Removing special characters

Can we perform lemmatazation, given that a lot of our text is in German? Yes we can, check out the models at the [spaCy documentation](https://spacy.io/usage/models)

We will need an algorithm to determine in what language the text is (or is likely to be) to perform the best possible tokenization

Note, after loading the spacy package you might need to reboot your computer (the first time). If you are getting this error:
> ValueError: [E002] Can't find factory for 'transformer' for language English (en). This usually happens when spaCy calls `nlp.create_pipe` with a custom component name that's not registered on the current language class. If you're using a Transformer, make sure to install 'spacy-transformers'. If you're using a custom component, make sure you've added the decorator `@Language.component` (for function components) or `@Language.factory` (for class components).


In [15]:
# Tokenazation example with 'Kupfer, blank I, Millberry'
import spacy
import spacy_transformers
import de_dep_news_trf
import en_core_web_sm

# Load the en_core_web_sm model
de_nlp = spacy.load('de_dep_news_trf')

# Load the en_core_web_sm model
en_nlp = spacy.load('en_core_web_sm')

# Create a Doc object
doc = de_nlp('Kupfer, blank I, Millberry')

# Generate the tokens
tokens = [token.text for token in doc]
print(tokens)

['Kupfer', ',', 'blank', 'I', ',', 'Millberry']


As part of the baseline exercise, we will use standard stopwords. However, as a potential improvement to build custom stopword lists tailored to this use-case


In [17]:
# Get german default stopwords
de_stopwords  = spacy.lang.de.stop_words.STOP_WORDS

# Get english default stopwords
en_stopwords = spacy.lang.en.stop_words.STOP_WORDS

In [18]:
# Function to preprocess text in German
def de_preprocess(text):
  	# Create Doc object
    doc = de_nlp(text, disable=['ner', 'parser'])
    # Generate lemmas
    lemmas = [token.lemma_ for token in doc]
    # Remove stopwords and non-alphabetic characters
    a_lemmas = [lemma for lemma in lemmas 
            if lemma not in de_stopwords]
    
    return ' '.join(a_lemmas)

# Function to preprocess text in English
def de_preprocess(text):
  	# Create Doc object
    doc = en_nlp(text, disable=['ner', 'parser'])
    # Generate lemmas
    lemmas = [token.lemma_ for token in doc]
    # Remove stopwords and non-alphabetic characters
    a_lemmas = [lemma for lemma in lemmas 
            if lemma not in en_stopwords]
    
    return ' '.join(a_lemmas)

# Apply preprocess to ted['transcript']
df1['p_ds_material_alias'] = df1['ds_material_alias'].apply(de_preprocess)

24/05/19 16:45:37 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 183059 ms exceeds timeout 120000 ms
24/05/19 16:45:37 WARN SparkContext: Killing executors is not supported by current scheduler.
24/05/19 16:45:40 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$