In [2]:
!pip install findspark

Defaulting to user installation because normal site-packages is not writeable


In [1]:
import findspark
findspark.init()
findspark.find()

'/usr/lib/spark'

In [2]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, StructField
from pyspark.sql import functions as F
from pyspark.sql import types as t
from pyspark.sql.functions import udf
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
import os

In [3]:
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars org.apache.spark:spark-sql-kafka-0-10_2.12:2.0.0 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.0.0 pyspark-shell'
# Создаем объект SparkSession, который является точкой входа в Spark SQL и Spark Streaming API.

spark = SparkSession\
    .builder\
    .appName("MySparkApp")\
    .config("spark.jars.packages", 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3')\
    .getOrCreate()

In [4]:
IP = '158.160.77.249'

read_options = {
    # "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="mlops" password="mlops_pw";',
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol" : "PLAINTEXT",
    "kafka.bootstrap.servers": f'{IP}:9092',
    "group.id": 'test_group',
    "subscribe": 'test',
    "startingOffsets": "earliest",
}

write_kafka_params = {
    # "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="mlops" password="mlops_pw";',
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol" : "PLAINTEXT",
    "kafka.bootstrap.servers": f'{IP}:9092',
    "topic": "test"
}

schema = t.StructType(
    [
        t.StructField('X', t.ArrayType(t.DoubleType()), True),
        t.StructField('y', t.FloatType(), True),
    ],
)

In [None]:
def extract_data_rows_from_json(df):
    df = (df.selectExpr('CAST(value AS STRING)') \
      .select(from_json('value', schema).alias('raw_data')))

    df = df.select('raw_data.y', *[col('raw_data.X').getItem(i).alias(f'X{i+1}') for i in range(0, 3)])
    return df


def transform_training_row_into_lp(df):
    
    features = Vectors.dense(row["x"])
    label = row["label"]
    return LabeledPoint(label, features)


def transform_test_row(row):
    return Vectors.dense(row["x"])


In [13]:
vector_assembler = VectorAssembler(inputCols=["X1", "X2", "X3"], outputCol="features")
linear_regression = LinearRegression(featuresCol="features", labelCol="y")

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .options(**options) \
  .load()



df = (df.selectExpr('CAST(value AS STRING)') \
      .select(from_json('value', schema).alias('raw_data')))

df = df.select('raw_data.y', *[col('raw_data.X').getItem(i).alias(f'X{i+1}') for i in range(0, 3)])

vectorized_data = vector_assembler.transform(df)

model = linear_regression.fit(vectorized_data)

stream_writer = vectorized_data.select().writeStream \
    .format("console") \
    .start()

# # Запись предсказаний в Kafka
# # stream_writer = predictions.writeStream \
# #     .format("kafka") \
# #     .outputMode("append") \
# #     .options(**write_kafka_params) \
# #     .start().awaitTermination()



# Запуск потока обработки
stream_writer.awaitTermination()



# Предсказание на потоковых данных с помощью модели
predictions = model.transform(vectorized_data)
print(predictions)

In [26]:
df = spark \
  .readStream \
  .format("kafka") \
  .options(**read_options) \
  .load()

In [27]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [28]:
df = (df.selectExpr('CAST(value AS STRING)') \
      .select(from_json('value', schema).alias('raw_data')))
print("df schema")
df.printSchema()

df schema
root
 |-- raw_data: struct (nullable = true)
 |    |-- X: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- y: float (nullable = true)



In [34]:
def extract_features(values):
    # values = x.split(",")
    features_ = []
    for i in values:
        features_.append(float(i))
    features = Vectors.dense(features_)
    return features

extract_features_udf = udf(extract_features, VectorUDT())

def extract_label(x):
    values = x.split(",")
    label = float(values[0])
    return label

In [36]:
df = df.withColumn("features", extract_features_udf(col("raw_data.X")))

In [22]:
# df = df.select('raw_data.y', *[col('raw_data.X').getItem(i).alias(f'X{i+1}') for i in range(0, 3)])

In [37]:
df.printSchema()

root
 |-- raw_data: struct (nullable = true)
 |    |-- X: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- y: float (nullable = true)
 |-- features: vector (nullable = true)



In [24]:
# vector_assembler = VectorAssembler(inputCols=["X1", "X2", "X3"], outputCol="features")
# vectorized_data = vector_assembler.transform(df)

In [38]:
# Создание и обучение модели линейной регрессии
# vectorized_data = (vectorized_data
#            .writeStream
#            .format('console')
#            .queryName('console-output')
#            .start())

linear_regression = LinearRegression(featuresCol="features", labelCol="y")
model = linear_regression.fit(vectorized_data)

# Предсказание на потоковых данных с помощью модели
predictions = model.transform(vectorized_data)

# Определение настроек для записи данных в Kafka

write_kafka_params = {
    # "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="mlops" password="mlops_pw";',
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol" : "PLAINTEXT",
    "kafka.bootstrap.servers": '158.160.70.184:9092',
    "topic": "test"
}

# Запись предсказаний в Kafka
stream_writer = predictions.writeStream \
    .format("kafka") \
    .outputMode("append") \
    .options(**write_kafka_params) \
    .start().awaitTermination()

# Запуск потока обработки
stream_writer.awaitTermination()

AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka

In [12]:
import numpy as np

names = [str(x) for x in np.random.choice(["Alex", "James", "Michael", "Peter", "Harry"], size=3)]
ids = [int(x) for x in np.random.randint(1, 10, 3)]
fruits = [str(x) for x in np.random.randint(1, 10, 3)]

df = spark.createDataFrame(list(zip(names, ids, fruits)), ["Name", "ID", "Fruit"])

In [21]:
df.rdd.map(lambda row: Vectors.dense(row[1], row[2])).collect()

[DenseVector([7.0, 1.0]), DenseVector([2.0, 3.0]), DenseVector([5.0, 5.0])]

# Start

In [5]:
df = spark \
  .readStream \
  .format("kafka") \
  .options(**read_options) \
  .load()

In [6]:
df = (df.selectExpr('CAST(value AS STRING)') \
      .select(from_json('value', schema).alias('raw_data')))
print("df schema")
df.printSchema()

df schema
root
 |-- raw_data: struct (nullable = true)
 |    |-- X: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- y: float (nullable = true)



In [7]:
df = df.select('raw_data.y', *[col('raw_data.X').getItem(i).alias(f'X{i+1}') for i in range(0, 3)])
df.printSchema()

root
 |-- y: float (nullable = true)
 |-- X1: double (nullable = true)
 |-- X2: double (nullable = true)
 |-- X3: double (nullable = true)



In [8]:
vector_assembler = VectorAssembler(inputCols=["X1", "X2", "X3"], outputCol="features")
vectorized_data = vector_assembler.transform(df)

In [9]:
def process_batch(df, batch_id):
    df.write \
      .format("parquet") \
      .mode("append") \
      .save("df.parquet")

vectorized_data.writeStream \
    .foreachBatch(process_batch) \
    .start() \
    .awaitTermination(3)

False

In [10]:
train = spark.read.parquet("df.parquet")

In [11]:
train.show()

+------+-------------------+-------------------+-----+--------------------+
|     y|                 X1|                 X2|   X3|            features|
+------+-------------------+-------------------+-----+--------------------+
| -54.0| 134.35635413628114| -91.79349863237272|-89.0|[134.356354136281...|
|  -4.0| 18.489089918529782| 199.70239010752857|-81.0|[18.4890899185297...|
| 117.0|  134.3184321122452|   268.522116453999|-41.0|[134.318432112245...|
| 207.0|  175.8296964288368| 217.64853437525244| 49.0|[175.829696428836...|
|  -9.0| -120.5086494891984|-137.85461564540003| 97.0|[-120.50864948919...|
| -68.0| -95.15583628456142|-273.92613450221893| 73.0|[-95.155836284561...|
|-121.0|   -45.951964528486|-253.29454266107354|-14.0|[-45.951964528486...|
| -14.0| 182.54884467641264|-141.33374607420234|-53.0|[182.548844676412...|
| 128.0|   148.122095766139| 204.58089576519555|-16.0|[148.122095766139...|
|  48.0|-28.814188874385728| 208.00874692284435| -4.0|[-28.814188874385...|
| -61.0|  -4

In [12]:
linear_regression = LinearRegression(featuresCol="features", labelCol="y")
model = linear_regression.fit(train)

In [None]:
# Предсказание на потоковых данных с помощью модели
predictions = model.transform(vectorized_data)

# Определение настроек для записи данных в Kafka

write_kafka_params = {
    # "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="mlops" password="mlops_pw";',
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol" : "PLAINTEXT",
    "kafka.bootstrap.servers": '158.160.70.184:9092',
    "topic": "test"
}

stream_writer = (predictions
           .writeStream
           .format('console')
           .queryName('console-output1')
           .start())


# Запись предсказаний в Kafka
# stream_writer = predictions.writeStream \
#     .format("kafka") \
#     .outputMode("append") \
#     .options(**write_kafka_params) \
#     .start().awaitTermination()

# Запуск потока обработки
stream_writer.awaitTermination()