In [1]:
from pyspark.sql import SparkSession

spark=(
    SparkSession
    .builder
    .master('local[4]')
    .appName('logisticReg')
    .getOrCreate()
)

spark

22/10/02 19:11:12 WARN Utils: Your hostname, notebook resolves to a loopback address: 127.0.1.1; using 192.168.0.18 instead (on interface wlp9s0)
22/10/02 19:11:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/10/02 19:11:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Load

In [2]:
path_input = "/home/walter/Documents/serie-notas/z_data/20221002_logistic_reg_dataset/data.csv"

data=spark.read.csv(path_input, header=True)

data.show(5, False)

+---------+---+--------------+--------+----------------+------+
|Country  |Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|
+---------+---+--------------+--------+----------------+------+
|India    |41 |1             |Yahoo   |21              |1     |
|Brazil   |28 |1             |Yahoo   |5               |0     |
|Brazil   |40 |0             |Google  |3               |0     |
|Indonesia|31 |1             |Bing    |15              |1     |
|Malaysia |32 |0             |Google  |15              |1     |
+---------+---+--------------+--------+----------------+------+
only showing top 5 rows



# Exploratory

In [3]:
print(data.count(), len(data.columns))

20000 6


In [4]:
data.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Repeat_Visitor: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Web_pages_viewed: string (nullable = true)
 |-- Status: string (nullable = true)



In [5]:
from pyspark.sql.types import IntegerType

integer_type = ['Age', 'Repeat_Visitor', 'Web_pages_viewed', 'Status']

for c in integer_type:
    data = data.withColumn(c, data[c].cast(IntegerType()))

In [6]:
data.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Repeat_Visitor: integer (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Web_pages_viewed: integer (nullable = true)
 |-- Status: integer (nullable = true)



In [7]:
data.describe().select(integer_type).show()

                                                                                

+-----------------+-----------------+-----------------+------------------+
|              Age|   Repeat_Visitor| Web_pages_viewed|            Status|
+-----------------+-----------------+-----------------+------------------+
|            20000|            20000|            20000|             20000|
|         28.53955|           0.5029|           9.5533|               0.5|
|7.888912950773227|0.500004090187782|6.073903499824976|0.5000125004687693|
|               17|                0|                1|                 0|
|              111|                1|               29|                 1|
+-----------------+-----------------+-----------------+------------------+



In [8]:
(
    data
    .groupBy('Country')
    .count()
    .show()
)

+---------+-----+
|  Country|count|
+---------+-----+
| Malaysia| 1218|
|    India| 4018|
|Indonesia|12178|
|   Brazil| 2586|
+---------+-----+



In [9]:
data.groupBy('Platform').count().show()

+--------+-----+
|Platform|count|
+--------+-----+
|   Yahoo| 9859|
|    Bing| 4360|
|  Google| 5781|
+--------+-----+



In [10]:
data.groupBy('Status').count().show()

+------+-----+
|Status|count|
+------+-----+
|     1|10000|
|     0|10000|
+------+-----+



In [11]:
data.groupBy('Country').mean().show()

+---------+------------------+-------------------+---------------------+--------------------+
|  Country|          avg(Age)|avg(Repeat_Visitor)|avg(Web_pages_viewed)|         avg(Status)|
+---------+------------------+-------------------+---------------------+--------------------+
| Malaysia|27.792282430213465| 0.5730706075533661|   11.192118226600986|  0.6568144499178982|
|    India|27.976854156296664| 0.5433051269288203|   10.727227476356397|  0.6212045793927327|
|Indonesia| 28.43159796354081| 0.5207751683363442|    9.985711939563148|  0.5422893742814913|
|   Brazil|30.274168600154677|  0.322892498066512|    4.921113689095128|0.038669760247486466|
+---------+------------------+-------------------+---------------------+--------------------+



# Prep for model in pyspark

In [12]:
data.show(5, False)

+---------+---+--------------+--------+----------------+------+
|Country  |Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|
+---------+---+--------------+--------+----------------+------+
|India    |41 |1             |Yahoo   |21              |1     |
|Brazil   |28 |1             |Yahoo   |5               |0     |
|Brazil   |40 |0             |Google  |3               |0     |
|Indonesia|31 |1             |Bing    |15              |1     |
|Malaysia |32 |0             |Google  |15              |1     |
+---------+---+--------------+--------+----------------+------+
only showing top 5 rows



In [13]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [14]:
# encoding. step 1. Uso de StringIndexer para recodificar var categóricas
# por default, el valor de la variable se da en función del count de la categoría

si_platform = StringIndexer(inputCol='Platform', outputCol='Platform_index')
si_country = StringIndexer(inputCol='Country', outputCol='Country_index')
data = si_platform.fit(data).transform(data)
data = si_country.fit(data).transform(data)
data.show(5, False)

+---------+---+--------------+--------+----------------+------+--------------+-------------+
|Country  |Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|Platform_index|Country_index|
+---------+---+--------------+--------+----------------+------+--------------+-------------+
|India    |41 |1             |Yahoo   |21              |1     |0.0           |1.0          |
|Brazil   |28 |1             |Yahoo   |5               |0     |0.0           |2.0          |
|Brazil   |40 |0             |Google  |3               |0     |1.0           |2.0          |
|Indonesia|31 |1             |Bing    |15              |1     |2.0           |0.0          |
|Malaysia |32 |0             |Google  |15              |1     |1.0           |3.0          |
+---------+---+--------------+--------+----------------+------+--------------+-------------+
only showing top 5 rows



In [15]:
# encoding. step 2. One hot encoding

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCols=['Platform_index', 'Country_index'], outputCols=['Platform_vec', 'Country_vec'])

data = encoder.fit(data).transform(data)


data.show(5, False)

# la interpretación del OnehotEncoder es la siguiente: (3,[1],[1.0]) : un vector de len 3, en la posición 1, valor 1.0

# from Pramod (2022), pg. 94:
#This kind of representation allows to save computational space and hence is faster to
#compute. The length of the vector is equal to one less than the total number of elements
#since each value can be easily represented with just the help of three columns:

+---------+---+--------------+--------+----------------+------+--------------+-------------+-------------+-------------+
|Country  |Age|Repeat_Visitor|Platform|Web_pages_viewed|Status|Platform_index|Country_index|Platform_vec |Country_vec  |
+---------+---+--------------+--------+----------------+------+--------------+-------------+-------------+-------------+
|India    |41 |1             |Yahoo   |21              |1     |0.0           |1.0          |(2,[0],[1.0])|(3,[1],[1.0])|
|Brazil   |28 |1             |Yahoo   |5               |0     |0.0           |2.0          |(2,[0],[1.0])|(3,[2],[1.0])|
|Brazil   |40 |0             |Google  |3               |0     |1.0           |2.0          |(2,[1],[1.0])|(3,[2],[1.0])|
|Indonesia|31 |1             |Bing    |15              |1     |2.0           |0.0          |(2,[],[])    |(3,[0],[1.0])|
|Malaysia |32 |0             |Google  |15              |1     |1.0           |3.0          |(2,[1],[1.0])|(3,[],[])    |
+---------+---+--------------+--

In [16]:
# Vector Assembler

from pyspark.ml.feature import VectorAssembler

vec_assembler = VectorAssembler(inputCols=['Age', 'Web_pages_viewed', 'Platform_vec', 'Country_vec'], outputCol='features')
features_df = vec_assembler.transform(data)
model_df = features_df.select('features', 'Status')
model_df = model_df.withColumnRenamed('Status', 'label')
model_df.show(5, False)


+-------------------------------+-----+
|features                       |label|
+-------------------------------+-----+
|[41.0,21.0,1.0,0.0,0.0,1.0,0.0]|1    |
|[28.0,5.0,1.0,0.0,0.0,0.0,1.0] |0    |
|[40.0,3.0,0.0,1.0,0.0,0.0,1.0] |0    |
|(7,[0,1,4],[31.0,15.0,1.0])    |1    |
|(7,[0,1,3],[32.0,15.0,1.0])    |1    |
+-------------------------------+-----+
only showing top 5 rows



In [17]:
# split train test
train_df, test_df = model_df.randomSplit([0.7, 0.3])

In [18]:
# check class balance in train test
train_df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 6960|
|    0| 6998|
+-----+-----+



In [19]:
test_df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 3040|
|    0| 3002|
+-----+-----+



# Model

In [20]:
# Model
from pyspark.ml.classification import LogisticRegression
log_reg = LogisticRegression(labelCol='label').fit(train_df)


22/10/02 19:11:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/10/02 19:11:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


# Eval

In [21]:
# function evaluate(): agrega probability (vector neg_class / pos_class) y prediction (thr = 0.5)

train_results = log_reg.evaluate(train_df).predictions
train_results.show(5, False)

+---------------------+-----+----------------------------------------+----------------------------------------+----------+
|features             |label|rawPrediction                           |probability                             |prediction|
+---------------------+-----+----------------------------------------+----------------------------------------+----------+
|(7,[0,1],[17.0,3.0]) |0    |[3.2825143224442486,-3.2825143224442486]|[0.9638240534393888,0.03617594656061128]|0.0       |
|(7,[0,1],[17.0,3.0]) |0    |[3.2825143224442486,-3.2825143224442486]|[0.9638240534393888,0.03617594656061128]|0.0       |
|(7,[0,1],[17.0,4.0]) |0    |[2.540997224459203,-2.540997224459203]  |[0.926966367147523,0.07303363285247705] |0.0       |
|(7,[0,1],[17.0,8.0]) |1    |[-0.4250711674809793,0.4250711674809793]|[0.3953039033751607,0.6046960966248394] |1.0       |
|(7,[0,1],[17.0,10.0])|1    |[-1.9081053634510692,1.9081053634510692]|[0.12919385436053135,0.8708061456394686]|1.0       |
+---------------

In [22]:
# test data
test_results = log_reg.evaluate(test_df).predictions

In [23]:
def metrics (results):
    true_positives = results[(results.label == 1) & (results.prediction== 1)].count()
    true_negatives = results[(results.label == 0) & (results.prediction== 0)].count()
    false_positives = results[(results.label == 0) & (results.prediction== 1)].count()
    false_negatives = results[(results.label == 1) & (results.prediction== 0)].count()

    accuracy=float((true_positives+true_negatives) /(results.count()))
    recall = float(true_positives)/(true_positives + false_negatives)
    precision = float(true_positives) / (true_positives + false_positives)

    print('accuracy: {}, precision: {}, recall: {}'.format(accuracy, precision, recall))

In [24]:
metrics(test_results)

accuracy: 0.9339622641509434, precision: 0.9371069182389937, recall: 0.93125


# Model with Pipeline

In [25]:
from pyspark.sql.types import IntegerType

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

from pyspark.ml import Pipeline

# paths
path_input = "/home/walter/Documents/serie-notas/z_data/20221002_logistic_reg_dataset/data.csv"

# params
integer_type = ['Age', 'Repeat_Visitor', 'Web_pages_viewed', 'Status']

# functions
def change_type(df, integer_type):
    res = df
    for c in integer_type:
        res = res.withColumn(c, res[c].cast(IntegerType()))

    return res

def metrics (results):
    true_positives = results[(results.label == 1) & (results.prediction== 1)].count()
    true_negatives = results[(results.label == 0) & (results.prediction== 0)].count()
    false_positives = results[(results.label == 0) & (results.prediction== 1)].count()
    false_negatives = results[(results.label == 1) & (results.prediction== 0)].count()

    accuracy=float((true_positives+true_negatives) /(results.count()))
    recall = float(true_positives)/(true_positives + false_negatives)
    precision = float(true_positives) / (true_positives + false_positives)

    print('accuracy: {}, precision: {}, recall: {}'.format(accuracy, precision, recall))

In [26]:
# load
data = spark.read.csv(path_input, header=True)

# split train test
train_df, test_df = data.randomSplit([0.7, 0.3])

# change types
train_df = change_type(train_df, integer_type)
test_df = change_type(test_df, integer_type)

In [27]:
# load
data = spark.read.csv(path_input, header=True)

# split train test
train_df, test_df = data.randomSplit([0.7, 0.3])

# change types
train_df = change_type(train_df, integer_type)
test_df = change_type(train_df, integer_type)

# Columnrenamed
train_df = train_df.withColumnRenamed('Status', 'label')
test_df = test_df.withColumnRenamed('Status', 'label')

# define pipe
stage_1 = StringIndexer(inputCol='Country', outputCol='Country_index')
stage_2 = StringIndexer(inputCol='Platform', outputCol='Platform_index')
stage_3 = OneHotEncoder(inputCols=['Country_index', 'Platform_index'], outputCols=['Country_vec', 'Platform_vec'])
stage_4 = VectorAssembler(inputCols=['Age', 'Repeat_Visitor', 'Web_pages_viewed', 'Platform_vec', 'Country_vec'], outputCol='features')
stage_5 = LogisticRegression(featuresCol='features', labelCol='label')

pipe = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4, stage_5])


In [28]:
# model
model = pipe.fit(train_df)

In [29]:
# test data
test_results = model.transform(test_df)
metrics(test_results)

accuracy: 0.9409793998146696, precision: 0.9422106931834558, recall: 0.9386934673366835
