# Machine learning using random forest classification 
# to predict results of mic usage

In [1]:
# installs
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m7.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m38.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=acc56d151babdcdae38cfb911388c4a6fd0147a8f8ff48979f01f76bdafd7796
  Stored in directory: /tmp/wsuser/.cache/pip/wheels/89/d6/52/1178e354ba2207673484f0ccd7b2ded0ab6671ae5c1fc5b49a
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
# Spark context class
sc = SparkContext()

# Spark session
spark = SparkSession.builder.appName("what-mic").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/15 13:20:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
# install ibm_db and pandas
# these are already installed in my environment, uncomment to install
# !pip install ibm_db
# !pip install pandas

In [6]:
# imports
import ibm_db
import ibm_db_dbi
import pandas as pd

In [7]:
# connect to db2
dsn_hostname = "################.databases.appdomain.cloud"
dsn_uid = "dnl#####"
dsn_pwd = "################"
dsn_port = "#####"
dsn_database = "bludb"
dsn_driver = "{IBM DB2 ODBC DRIVER}"
dsn_protocol = "TCPIP"
dsn_security = "SSL"

dsn = (
    "DRIVER={0};"
    "DATABASE={1};"
    "HOSTNAME={2};"
    "PORT={3};"
    "PROTOCOL={4};"
    "UID={5};"
    "PWD={6};"
    "SECURITY={7};").format(dsn_driver, dsn_database, dsn_hostname, dsn_port, dsn_protocol, dsn_uid, dsn_pwd, dsn_security)

try:
    db2_conn = ibm_db.connect(dsn, "", "")
    print ("DB2 connection established")
except:
    print("Failed to connect to the production data warehouse")

DB2 connection established


In [8]:
# query the ML table
SQL = "select * from mqt_ml;"

In [9]:
# create a pandas dataframe
pconn = ibm_db_dbi.Connection(db2_conn)
df = pd.read_sql(SQL, pconn)
df.head(20)



Unnamed: 0,MANUFACTURER,MODEL,SOURCE_NAME,STYLE,NUM_MEMBERS,SIZE,RESULT
0,Shure,SM7B,Bass,Pop,7,Large,Poor
1,Heil,PR35,Lead Vocal,Pop,9,Medium,Good
2,Audix,D6,"Drums, Kick",Pop,9,Medium,Poor
3,Beyerdynamic,M 201 TG,"Drums, Snare",Pop,9,Large,Mediocre
4,Electro-Voice,635a,"Drums, Hi-hat",Pop,9,Large,Good
5,Sennheiser,e904,"Drums, Toms",Pop,9,Small,Good
6,Audio-Technica,AT4040,"Drums, Overhead",Pop,9,Medium,Mediocre
7,Beyerdynamic,M 69 TG,Bass,Pop,9,Small,Good
8,Electro-Voice,635a,Keyboards,Pop,9,Large,Poor
9,Heil,PR30,Guitar,Pop,9,Medium,Poor


In [10]:
# create a spark dataframe
sdf = spark.createDataFrame(df)

In [11]:
sdf.printSchema()

root
 |-- MANUFACTURER: string (nullable = true)
 |-- MODEL: string (nullable = true)
 |-- SOURCE_NAME: string (nullable = true)
 |-- STYLE: string (nullable = true)
 |-- NUM_MEMBERS: long (nullable = true)
 |-- SIZE: string (nullable = true)
 |-- RESULT: string (nullable = true)



In [12]:
sdf.show(20)

[Stage 0:>                                                          (0 + 1) / 1]

+--------------+--------+---------------+-----+-----------+------+---------+
|  MANUFACTURER|   MODEL|    SOURCE_NAME|STYLE|NUM_MEMBERS|  SIZE|   RESULT|
+--------------+--------+---------------+-----+-----------+------+---------+
|         Shure|    SM7B|           Bass|  Pop|          7| Large|     Poor|
|          Heil|    PR35|     Lead Vocal|  Pop|          9|Medium|     Good|
|         Audix|      D6|    Drums, Kick|  Pop|          9|Medium|     Poor|
|  Beyerdynamic|M 201 TG|   Drums, Snare|  Pop|          9| Large| Mediocre|
| Electro-Voice|    635a|  Drums, Hi-hat|  Pop|          9| Large|     Good|
|    Sennheiser|    e904|    Drums, Toms|  Pop|          9| Small|     Good|
|Audio-Technica|  AT4040|Drums, Overhead|  Pop|          9|Medium| Mediocre|
|  Beyerdynamic| M 69 TG|           Bass|  Pop|          9| Small|     Good|
| Electro-Voice|    635a|      Keyboards|  Pop|          9| Large|     Poor|
|          Heil|    PR30|         Guitar|  Pop|          9|Medium|     Poor|

                                                                                

In [13]:
# import VectorAssembler to create a 'feature' vector, 
# StringIndexer to index feature and result classifications (strings are not valid)
# RandomForestClassifier to create the model, 
# MulticlassClassificationEvaluator for exactly what it sounds like!
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [14]:
# apply indexes to the feature columns and the label column 'result'
feature_indexer = StringIndexer(inputCols=['MANUFACTURER','MODEL','SOURCE_NAME','STYLE',str('NUM_MEMBERS'),'SIZE','RESULT'], 
                               outputCols=['manufacturer_index','model_index','scource_name_index','style_index','num_members_index','size_index','result_index'])
sdf_indexed = feature_indexer.fit(sdf).transform(sdf)
sdf_indexed.show(20)

                                                                                

+--------------+--------+---------------+-----+-----------+------+---------+------------------+-----------+------------------+-----------+-----------------+----------+------------+
|  MANUFACTURER|   MODEL|    SOURCE_NAME|STYLE|NUM_MEMBERS|  SIZE|   RESULT|manufacturer_index|model_index|scource_name_index|style_index|num_members_index|size_index|result_index|
+--------------+--------+---------------+-----+-----------+------+---------+------------------+-----------+------------------+-----------+-----------------+----------+------------+
|         Shure|    SM7B|           Bass|  Pop|          7| Large|     Poor|               0.0|       29.0|               0.0|        4.0|              2.0|       2.0|         0.0|
|          Heil|    PR35|     Lead Vocal|  Pop|          9|Medium|     Good|               9.0|       25.0|               6.0|        4.0|              0.0|       0.0|         2.0|
|         Audix|      D6|    Drums, Kick|  Pop|          9|Medium|     Poor|               3.0|

In [15]:
# feature array for the vectorassembler
assembler = VectorAssembler(
    inputCols=['manufacturer_index','model_index','scource_name_index','style_index','num_members_index','size_index'], 
    outputCol='features')
data = assembler.transform(sdf_indexed).select('features','result_index')
data.show(20)

+--------------------+------------+
|            features|result_index|
+--------------------+------------+
|[0.0,29.0,0.0,4.0...|         0.0|
|[9.0,25.0,6.0,4.0...|         2.0|
|[3.0,17.0,2.0,4.0...|         0.0|
|[4.0,7.0,4.0,4.0,...|         4.0|
|[2.0,2.0,1.0,4.0,...|         2.0|
|[1.0,15.0,5.0,4.0...|         2.0|
|[8.0,5.0,3.0,4.0,...|         4.0|
|[4.0,30.0,0.0,4.0...|         2.0|
|[2.0,2.0,8.0,4.0,...|         0.0|
|[9.0,38.0,10.0,4....|         0.0|
|[6.0,0.0,12.0,4.0...|         0.0|
|[2.0,11.0,9.0,4.0...|         0.0|
|[2.0,2.0,12.0,4.0...|         1.0|
|[6.0,0.0,8.0,4.0,...|         4.0|
|[13.0,32.0,6.0,1....|         2.0|
|[0.0,16.0,2.0,1.0...|         0.0|
|[0.0,8.0,4.0,1.0,...|         3.0|
|[2.0,2.0,1.0,1.0,...|         0.0|
|[1.0,15.0,5.0,1.0...|         4.0|
|[8.0,5.0,3.0,1.0,...|         2.0|
+--------------------+------------+
only showing top 20 rows



In [16]:
# break the data into train/test data
(train, test) = data.randomSplit([0.8, 0.2])

# create the model and fit it to the training data
rfc_model = RandomForestClassifier(featuresCol = "features", labelCol = "result_index", maxBins = 39)
rfc_model = rfc_model.fit(train)

                                                                                

In [17]:
# make predictions on the test data
pred = rfc_model.transform(test)
pred.show(20)

+--------------------+------------+--------------------+--------------------+----------+
|            features|result_index|       rawPrediction|         probability|prediction|
+--------------------+------------+--------------------+--------------------+----------+
| (6,[0,1],[3.0,9.0])|         4.0|[4.31856852081373...|[0.21592842604068...|       0.0|
| (6,[0,1],[4.0,7.0])|         2.0|[4.32773875174368...|[0.21638693758718...|       2.0|
| (6,[0,2],[6.0,1.0])|         4.0|[4.15059768142244...|[0.20752988407112...|       1.0|
| (6,[1,2],[4.0,8.0])|         1.0|[4.41508546205991...|[0.22075427310299...|       0.0|
| (6,[1,2],[4.0,8.0])|         3.0|[4.41508546205991...|[0.22075427310299...|       0.0|
| (6,[1,2],[4.0,8.0])|         4.0|[4.41508546205991...|[0.22075427310299...|       0.0|
|(6,[1,2],[4.0,12.0])|         2.0|[4.68515112259848...|[0.23425755612992...|       0.0|
|(6,[1,2],[16.0,2.0])|         1.0|[3.74458618113060...|[0.18722930905653...|       1.0|
|(6,[1,3],[22.0,1.0])

In [18]:
# evaluate our results in a confusion matrix
# the confusion matrix is a visual comparison between actual results
# and predicted results on our test population 
# if our model were 100% accurate, the only non-zero values
# we would see would be in a diagonal from top left to bottom right

# import the necessary module
from sklearn.metrics import confusion_matrix

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="result_index")
acc = evaluator.evaluate(pred)
 
print("Prediction Accuracy: ", acc)
 
y_pred = pred.select("prediction").collect()
y_actual = pred.select("result_index").collect()

cm = confusion_matrix(y_actual, y_pred)
print("Confusion Matrix:")
print(cm)

                                                                                

Prediction Accuracy:  0.189040615770377
Confusion Matrix:
[[74 78 36 52 24]
 [70 86 29 26 26]
 [86 80 35 48 39]
 [83 93 33 44 37]
 [90 85 45 28 32]]


In [19]:
# Goodness, our results are terrible! Worse than guessing in fact.
# This is not suprising, since results are generated randomly.
# This doesn't render our model completely useless. At the least,
# it strongly suggests that were these results not randomly generated,
# the features we had chosen as candidates for prediction are poor, i.e.
# these features are not related to results.

In [22]:
# Nevertheless, let's write a function to predict outcomes based on user input
# of each feature.
def predict(manufacturer_index,model_index,scource_name_index,style_index,num_members_index,size_index):
    # create a vector of all features
    assembler = VectorAssembler(
        inputCols=['manufacturer_index','model_index','scource_name_index','style_index','num_members_index','size_index'], 
        outputCol='features')
    # create a dataframe with user arguments and a dummy result of 0
    data_pred = [[manufacturer_index,model_index,scource_name_index,style_index,num_members_index,size_index, 0]]
    columns = ['manufacturer_index','model_index','scource_name_index','style_index','num_members_index','size_index','result_index']
    sdf_pred = spark.createDataFrame(data_pred, columns)
    # convert to a features and result column
    sdf_pred_tr = assembler.transform(sdf_pred).select('features', 'result_index')
    # apply the model created with the training data above
    predictions = rfc_model.transform(sdf_pred_tr)
    # display the prediction
    predictions.select('prediction').show()
    
# sample run - Shure, SM7B, source is bass, style is pop, number of band members is 7, venue size is large
predict(0.0,29.0,0.0,4.0,2.0,2.0)

+----------+
|prediction|
+----------+
|       2.0|
+----------+



In [23]:
# stop Spark context and close db2 connection
sc.stop()
print("Spark context stopped")  
ibm_db.close(db2_conn)
print("DB2 connection closed")

Spark context stopped
DB2 connection closed
