# From `sklearn` to `pyspark`

## `sklearn`

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, make_scorer
from sklearn.pipeline import Pipeline

In [2]:
df = pd.read_csv('data/Dataset.csv')

Data from [Kaggle](https://www.kaggle.com/datasets/sagarbanik/phishing-url-binary-datatset), about distinguishing phishing from legitimate URLs. A "1" label means legitimate and a "0" label means phishing.

In [3]:
df.head()

Unnamed: 0,whois_regDate,whois_expDate,whois_updatedDate,dot_count,url_len,digit_count,special_count,hyphen_count,double_slash,single_slash,at_the_rate,protocol,protocol_count,web_traffic,label
0,8451,2870,422,1,10,0,0,0,0,0,0,0,0,0,1
1,5741,102,295,1,11,0,0,0,0,0,0,0,0,0,1
2,8419,345,720,1,9,0,0,0,0,0,0,0,0,0,1
3,7695,2166,545,1,9,0,0,0,0,0,0,0,0,0,1
4,9316,2455,6,1,6,0,0,0,0,0,0,0,0,0,1


In [4]:
X = df.drop('label', axis=1)
y = df['label']
X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                    random_state=42)

In [5]:
pipe = Pipeline(
    steps=[('ss', StandardScaler()),
           ('logreg', LogisticRegression())]
)

In [6]:
pipe.fit(X_train, y_train)

Pipeline(steps=[('ss', StandardScaler()), ('logreg', LogisticRegression())])

In [7]:
roc_auc_score(y_train, pipe.predict(X_train))

1.0

In [8]:
roc_auc_score(y_test, pipe.predict(X_test))

1.0

## Now in `pyspark`!

In [9]:
import pyspark
from pyspark.ml.classification import LogisticRegression as SparkLR
from pyspark.ml.feature import StandardScaler as SparkScaler,\
    VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline as SparkPipe

In [10]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [11]:
df_spark = spark.read.format('csv').option('header', 'true').\
load('data/Dataset.csv')

In [12]:
df_spark.head()

Row(whois_regDate='8451', whois_expDate='2870', whois_updatedDate='422', dot_count='1', url_len='10', digit_count='0', special_count='0', hyphen_count='0', double_slash='0', single_slash='0', at_the_rate='0', protocol='0', protocol_count='0', web_traffic='0', label='1')

In [13]:
for col in df_spark.columns:
    df_spark = df_spark.withColumn(col, df_spark[col].cast('int'))

In [14]:
df_spark.head()

Row(whois_regDate=8451, whois_expDate=2870, whois_updatedDate=422, dot_count=1, url_len=10, digit_count=0, special_count=0, hyphen_count=0, double_slash=0, single_slash=0, at_the_rate=0, protocol=0, protocol_count=0, web_traffic=0, label=1)

In [15]:
train, test = df_spark.randomSplit([0.75, 0.25])

In [16]:
stages = [VectorAssembler(inputCols=train.columns[:-1],
                         outputCol='as_vec'),
         SparkScaler(inputCol='as_vec', outputCol='features'),
         SparkLR()]
spark_pipe = SparkPipe(stages=stages).fit(train)

In [17]:
train_predictions = spark_pipe.transform(train)

In [18]:
evaluator = BinaryClassificationEvaluator()

In [19]:
evaluator.evaluate(train_predictions,
                  {evaluator.metricName: 'areaUnderROC'})

1.0

In [20]:
test_predictions = spark_pipe.transform(test)

In [21]:
evaluator.evaluate(test_predictions,
                  {evaluator.metricName: 'areaUnderROC'})

0.9999997429522652