In [1]:
import pickle
import pandas as pd
import sklearn as sk
from sklearn import metrics
from sklearn.model_selection import train_test_split

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import DoubleType

# Задача
### Как применить sklearn модель на 1Тб данных?

# 1) Подготовим данны

In [2]:
spark = SparkSession.builder.appName('PySparkTasks').getOrCreate()

The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.


24/06/29 12:58:51 WARN Utils: Your hostname, u-host resolves to a loopback address: 127.0.1.1; using 192.168.1.49 instead (on interface wlp0s20f3)
24/06/29 12:58:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/29 12:58:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/29 12:58:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/06/29 12:58:52 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
spark

In [4]:
df = spark.read.parquet('iris.parquet')

In [6]:
df.select('species').distinct().show()

+----------+
|   species|
+----------+
| virginica|
|versicolor|
|    setosa|
+----------+



In [5]:
df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [7]:
si = StringIndexer(inputCol='species', outputCol="type")
df = si.fit(df).transform(df).drop('species')

In [8]:
df.show(5)

+------------+-----------+------------+-----------+----+
|sepal_length|sepal_width|petal_length|petal_width|type|
+------------+-----------+------------+-----------+----+
|         5.1|        3.5|         1.4|        0.2| 0.0|
|         4.9|        3.0|         1.4|        0.2| 0.0|
|         4.7|        3.2|         1.3|        0.2| 0.0|
|         4.6|        3.1|         1.5|        0.2| 0.0|
|         5.0|        3.6|         1.4|        0.2| 0.0|
+------------+-----------+------------+-----------+----+
only showing top 5 rows



In [9]:
pdf = df.toPandas()

In [10]:
pdf.head(10)

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,type
0,5.1,3.5,1.4,0.2,0.0
1,4.9,3.0,1.4,0.2,0.0
2,4.7,3.2,1.3,0.2,0.0
3,4.6,3.1,1.5,0.2,0.0
4,5.0,3.6,1.4,0.2,0.0
5,5.4,3.9,1.7,0.4,0.0
6,4.6,3.4,1.4,0.3,0.0
7,5.0,3.4,1.5,0.2,0.0
8,4.4,2.9,1.4,0.2,0.0
9,4.9,3.1,1.5,0.1,0.0


# 2) Учим Sklearn модель

In [11]:
train, test = train_test_split(pdf, random_state = 42)

In [None]:
features_col = ['sepal_length','sepal_width','petal_length','petal_width']

In [None]:
X_train = train[features_col]
y_train = train.type
X_test = test[features_col]
y_test = test.type

In [None]:
type(y_train)

In [None]:
from sklearn.tree import DecisionTreeClassifier

In [None]:
model = DecisionTreeClassifier(max_depth = 3, random_state = 1)

In [None]:
model = model.fit(X_train,y_train)

In [None]:
prediction=model.predict(X_test)
print('The accuracy of the Decision Tree is {:.3f}'.format(metrics.accuracy_score(prediction,y_test)))

In [None]:
type(model)

In [None]:
with open('model.pickle', 'wb') as f:
    pickle.dump(model, f)

# 3) Применяем Sklearn модель на больших данных

In [None]:
import pyspark.sql.functions as F

In [None]:
@F.pandas_udf(returnType=DoubleType())
def predict_pandas_udf(*cols):
    import pandas as pd
    # cols will be a tuple of pandas.Series here.
    X = pd.concat(cols, axis=1)
    with open('model.pickle', 'rb') as f:
        load_model = pickle.load(f)
    return pd.Series(load_model.predict(X))

In [None]:
df_result = df.withColumn('result', predict_pandas_udf(*features_col))

In [None]:
df_result.show()

In [None]:
type(df_result)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="type", predictionCol="result", metricName="accuracy")

In [None]:
accuracy = evaluator.evaluate(df_result)
print("DecisionTreeClassifier [Accuracy] = %g"% (accuracy))
print("DecisionTreeClassifier [Error] = %g " % (1.0 - accuracy))