In [None]:
!pip install kaggle
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets download joebeachcapital/gpa-and-iq
! unzip /content/gpa-and-iq

In [None]:
!curl -sSOL https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
!tar -xzf kafka_2.13-3.5.0.tgz

In [None]:
!./kafka_2.13-3.5.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.5.0/config/zookeeper.properties
!./kafka_2.13-3.5.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.5.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

Waiting for 10 secs until kafka and zookeeper services are up and running


In [None]:
import csv
import json

csvfile = open('gpa_iq.csv', 'r')
jsonfile = open('gpa_iq.json', 'w')


reader = csv.DictReader(csvfile)
for row in reader:
   json.dump(row, jsonfile)
   jsonfile.write('\n')

jsonfile = open('gpa_iq.json', 'w')

In [None]:
!./kafka_2.13-3.5.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic gpa < gpa_iq.json
!./kafka_2.13-3.5.0/bin/kafka-console-consumer.sh --topic gpa  --bootstrap-server localhost:9092 --from-beginning  --max-messages 5

[2023-09-14 14:28:14,280] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {gpa=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-09-14 14:28:14,399] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {gpa=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-09-14 14:28:14,508] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {gpa=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-09-14 14:28:14,619] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 6 : {gpa=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-09-14 14:28:14,730] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 7 : {gpa=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
{"obs": "1", "gpa": "7.94", "iq": "111", "gender":

In [None]:
!pip install pyspark

In [None]:
#Подготовим обученную модель

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
spark = SparkSession.builder \
    .appName('hw_7') \
    .config('spark.executor.instances', 4) \
    .getOrCreate()

In [None]:
df = spark.read.csv('gpa_iq.csv', header=True, inferSchema=True).drop('_c0')
df.printSchema()

root
 |-- obs: integer (nullable = true)
 |-- gpa: double (nullable = true)
 |-- iq: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- concept: integer (nullable = true)



In [None]:
desc = df.describe()
for col in desc.columns:
    if col not in ['summary', 'gender']:
        desc = desc.withColumn(col, F.round(col, 2))
desc.show()

+-------+-----+-----+------+------------------+-------+
|summary|  obs|  gpa|    iq|            gender|concept|
+-------+-----+-----+------+------------------+-------+
|  count| 78.0| 78.0|  78.0|                78|   78.0|
|   mean|42.97| 7.45|108.92|1.6025641025641026|  56.96|
| stddev|25.89|  2.1| 13.17| 0.492535016613786|  12.41|
|    min|  1.0| 0.53|  72.0|                 1|   20.0|
|    max| 89.0|10.76| 136.0|                 2|   80.0|
+-------+-----+-----+------+------------------+-------+



In [None]:
numericCols = df.drop('gender').columns
numeric_data = df.select(numericCols).toPandas()

In [None]:
df.groupby('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|     1|   31|
|     2|   47|
+------+-----+



In [None]:
label_stringIdx = StringIndexer(inputCol='gender', outputCol='label')
assembler = VectorAssembler(inputCols=numericCols, outputCol='features')

train, test = df.randomSplit([0.7, 0.3], seed=42)

In [None]:
train.groupby('gender').count().show(), test.groupby('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|     1|   17|
|     2|   28|
+------+-----+

+------+-----+
|gender|count|
+------+-----+
|     1|   14|
|     2|   19|
+------+-----+



(None, None)

In [None]:
lr = LogisticRegression(maxIter=10)

In [None]:
pipeline = Pipeline(stages=[label_stringIdx, assembler, lr])

# обучаемся на трейне:
lrModel = pipeline.fit(train)
# предсказания
train_predictions = lrModel.transform(train)
test_predictions = lrModel.transform(test)

In [None]:
#посмотрим на метрику
evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')

train_auc_roc = evaluator.evaluate(train_predictions)
test_auc_roc = evaluator.evaluate(test_predictions)

print(f'Train AUC-ROC: {train_auc_roc}\nTest AUC-ROC:{test_auc_roc}')

Train AUC-ROC: 0.796218487394958
Test AUC-ROC:0.650375939849624


In [None]:
#сохраним модель
lrModel.write().overwrite().save('gpa_lr_Model')

In [None]:
!apt-get install openjdk-8-jre

In [None]:
!wget https://downloads.apache.org/cassandra/4.0.11/apache-cassandra-4.0.11-bin.tar.gz
!tar -xzvf apache-cassandra-4.0.11-bin.tar.gz

In [None]:
!apache-cassandra-4.0.11/bin/cassandra -R

In [None]:
!pip install cassandra-driver

In [None]:
!export SPARK_KAFKA_VERSION=0.10
!pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.kafka:kafka-clients:3.3.1,com.datastax.spark:spark-cassandra-connector_2.12:3.4.0

Python 3.10.12 (main, Jun 11 2023, 05:26:28) [GCC 11.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-af4bdf18-6e56-4d59-931f-59606815db84;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-cl

In [None]:
# from pyspark.sql import SparkSession, DataFrame
# from pyspark.sql import functions as F
# from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
# from pyspark.ml import PipelineModel
# import datetime
# import shutil


# schema = StructType()
#     .add("obs", StringType())
#     .add("gpa", StringType())
#     .add("iq", StringType())
#     .add("gender", StringType())
#     .add("concept", StringType())


# raw_files = spark
#     .readStream
#     .format("csv")
#     .schema(schema)
#     .options(path="gpa_iq.csv", header=True)
#     .load()

# raw_orders = spark.readStream.
#     format("kafka").
#     option("kafka.bootstrap.servers", "localhost:9092").
#     option("subscribe", "gpa").
#     option("startingOffsets", "earliest").
#     option("maxOffsetsPerTrigger", "5").
#     load()

# parsed_gpa = raw_orders
#     .select(F.from_json(F.col("value").cast("String"), schema).alias("value"), "offset")
#     .select("value.*", "offset")


# parsed_gpa = parsed_gpa.withColumn("gpa", parsed_gpa["gpa"].cast(DoubleType()))
#                        .withColumn("iq", parsed_gpa["iq"].cast(IntegerType()))
#                        .withColumn("obs", parsed_gpa["obs"].cast(IntegerType()))
#                        .withColumn("concept", parsed_gpa["concept"].cast(IntegerType()))


# def console_output(df, freq):
#     return df.writeStream
#         .format("console")
#         .trigger(processingTime='%s seconds' % freq )
#         .options(truncate=True)
#         .option("checkpointLocation", "checkpoint_gpa")
#         .start()


# # проставляем время загрузки
# load_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")


# # функция очистки чекпоинта
# def rm_checkpoint():
#   checkpoint_location = "checkpoint_gpa"
#   try:
#     shutil.rmtree(checkpoint_location)
#     print(f"Чекпоинт в {checkpoint_location} успешно удален.")
#   except FileNotFoundError:
#     print(f"Чекпоинт в {checkpoint_location} не найден.")


# timed_files = raw_files.withColumn("p_date", F.lit("load_time"))


# model_path = "gpa_lr_Model"
# model = PipelineModel.load(model_path)
# model

# predictions = model.transform(parsed_gpa)
# predictions = predictions.withColumn("date", F.current_timestamp())

# # можно посмотреть на схему, которая у нас теперь есть:
# predictions.printSchema()

# df = predictions.select('offset', 'gender', 'label', 'prediction', 'probability', 'date')

# st = console_output(df, 15)
# st.stop()

# rm_checkpoint()

In [None]:
# import pandas as pd
# from cassandra.cluster import Cluster
# cluster = Cluster(['127.0.0.1'])
# session = cluster.connect()

# session.execute(
#      "CREATE KEYSPACE IF NOT EXISTS hw_7 "
#      "WITH REPLICATION = { "
#          "'class': 'SimpleStrategy', "
#          "'replication_factor': 1 "
#      "}"
#  )


# session.execute("USE hw_7")

# session.execute(
#      'CREATE TABLE IF NOT EXISTS gpa_table ('
#          'offset BIGINT, '
#          '"gender" VARCHAR, '
#          'label DOUBLE, '
#          'prediction DOUBLE, '
#          'probability LIST<DOUBLE>, '
#          'date TIMESTAMP, '
#          'PRIMARY KEY (offset)'
#      ')'
#  )


# def cassandra_output_ckeckpointed(df, freq):
#      return df.writeStream
#          .format("org.apache.spark.sql.cassandra")
#          .trigger(processingTime=f'{freq} seconds')
#          .option("table", "gpa_table")
#          .option("keyspace", "hw_7")
#          .option("checkpointLocation", "checkpoint")
#          .start()



# def convert_to_list(vector):
#   return vector.tolist()

# convert_to_list_udf = F.udf(convert_to_list, ArrayType(DoubleType()))

# df = df.withColumn("probability", convert_to_list_udf(df["probability"]))

# df.printSchema()


# st = cassandra_output_ckeckpointed(df, 10) !!!!ошибка

# st.stop()

# session.shutdown()
# cluster.shutdown()
# exit()

In [None]:
!apache-cassandra-4.0.11/bin/cqlsh

Connected to [0;1;34mTest Cluster[0m at 127.0.0.1:9042
[cqlsh 6.0.0 | Cassandra 4.0.11 | CQL spec 3.4.5 | Native protocol v5]
Use HELP for help.
cqlsh> USE hw_7;
cqlsh:hw_7> SELECT COUNT(*) FROM gpa_table ALLOW FILTERING;

 [0;1;35mcount[0m
-------
    [0;1;32m20[0m

(1 rows)

Aggregation query used without partition key

cqlsh:hw_7> SELECT * FROM gpa_table LIMIT 15;

 [0;1;31moffset[0m | [0;1;35mdate[0m                            | [0;1;35mgender[0m | [0;1;35mlabel[0m | [0;1;35mprediction[0m | [0;1;35mprobability[0m
--------+---------------------------------+--------+-------+------------+----------------------
     [0;1;32m19[0m | [0;1;32m2023-09-14 14:49:30.016000+0000[0m |      [0;1;33m1[0m |     [0;1;32m1[0m |          [0;1;32m0[0m | [0;1;34m[[0m[0;1;32m0.928038[0m[0;1;34m, [0m[0;1;32m0.071962[0m[0;1;34m][0m
      [0;1;32m2[0m | [0;1;32m2023-09-14 14:49:04.999000+0000[0m |      [0;1;33m2[0m |     [0;1;32m0[0m |          [0;1;32m0[0m |