In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TestKafkaStream") \
    .master("spark://spark-master:7077") \
    .config("spark.driver.host", "jupyter") \
    .config("spark.jars",
        "/opt/bitnami/spark/extra-jars/spark-sql-kafka-0-10_2.12-3.5.0.jar,"
        "/opt/bitnami/spark/extra-jars/spark-token-provider-kafka-0-10_2.12-3.5.0.jar,"
        "/opt/bitnami/spark/extra-jars/kafka-clients-3.5.1.jar,"
        "/opt/bitnami/spark/extra-jars/commons-pool2-2.11.1.jar,"
        "/opt/bitnami/spark/extra-jars/postgresql-42.2.27.jar"
    ) \
    .getOrCreate()

print(spark.sparkContext.master)
print(spark.version)


25/08/06 10:18:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


spark://spark-master:7077
3.5.1


In [None]:
from kafka import KafkaProducer
import pandas as pd
import json
import time

# Конфигурация
TOPIC = 'trip-data'
PARQUET_PATH = '/opt/data/yellow_tripdata_2023-01.parquet'  

# Создаем Kafka-производителя
producer = KafkaProducer(
    bootstrap_servers='kafka:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Читаем Parquet-файл с помощью Pandas
df = pd.read_parquet(PARQUET_PATH)

# Можно убрать слишком длинные или ненужные поля, если хочешь:
# df = df[["vendorid", "lpep_pickup_datetime", "lpep_dropoff_datetime", "passenger_count", "total_amount"]]

# Проходимся по строкам DataFrame и отправляем их в Kafka
for _, row in df.iterrows():
    message = row.to_dict()

    # Конвертируем Timestamp → ISO для сериализации
    for key, value in message.items():
        if isinstance(value, pd.Timestamp):
            message[key] = value.isoformat()
        elif pd.isna(value):
            message[key] = None  # Убираем NaN

    producer.send(TOPIC, message)
    print(f"Sent: {message}")
    time.sleep(0.5)  # можно сделать быстрее, если нужно

producer.flush()




In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, DoubleType, IntegerType, TimestampType

schema = StructType() \
    .add("VendorID", IntegerType()) \
    .add("tpep_pickup_datetime", TimestampType()) \
    .add("tpep_dropoff_datetime", TimestampType()) \
    .add("passenger_count", DoubleType()) \
    .add("trip_distance", DoubleType()) \
    .add("RatecodeID", DoubleType()) \
    .add("store_and_fwd_flag", StringType()) \
    .add("PULocationID", IntegerType()) \
    .add("DOLocationID", IntegerType()) \
    .add("payment_type", IntegerType()) \
    .add("fare_amount", DoubleType()) \
    .add("extra", DoubleType()) \
    .add("mta_tax", DoubleType()) \
    .add("tip_amount", DoubleType()) \
    .add("tolls_amount", DoubleType()) \
    .add("improvement_surcharge", DoubleType()) \
    .add("total_amount", DoubleType()) \
    .add("congestion_surcharge", DoubleType()) \
    .add("airport_fee", DoubleType())

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "trip-data") \
    .option("startingOffsets", "earliest") \
    .load()

# Явное преобразование колонки "value" в строку
df_json = df.selectExpr("CAST(value AS STRING) as value")
df_json.select(from_json(col("value"), schema).alias("data")).filter(col("data").isNull()).writeStream.format("console").start()

df_parsed = df_json.select(from_json(col("value"),schema).alias("data")).select("data.*")

df_filtered = df_parsed.select("VendorID", "trip_distance", "total_amount", "payment_type")

def write_to_postgres(batch_df, batch_id):
    batch_df.show()
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/green_taxi") \
        .option("dbtable", "intermediate.green_taxi") \
        .option("user", "airflow") \
        .option("password", "airflow") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()

df_filtered.writeStream \
    .foreachBatch(write_to_postgres) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoint_trip_data") \
    .start() \
    .awaitTermination()



-------------------------------------------
Batch: 101
-------------------------------------------
-------------------------------------------
Batch: 124
-------------------------------------------
+--------+-------------+------------+------------+
|VendorID|trip_distance|total_amount|payment_type|
+--------+-------------+------------+------------+
|       2|         5.62|       35.38|           1|
|       2|         0.74|        12.8|           1|
+--------+-------------+------------+------------+

+----+
|data|
+----+
+----+

+--------+-------------+------------+------------+
|VendorID|trip_distance|total_amount|payment_type|
+--------+-------------+------------+------------+
|2       |0.74         |12.8        |1           |
+--------+-------------+------------+------------+

-------------------------------------------
Batch: 125
-------------------------------------------
-------------------------------------------
Batch: 102
-------------------------------------------
+----+
|data

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ReadFromMinIO") \
    .master("spark://172.20.0.7:7077") \
    .config("spark.driver.host", "172.17.0.1") \
    .config("spark.jars.packages", ",".join([
        "org.apache.hadoop:hadoop-aws:3.3.6",
        "org.apache.hadoop:hadoop-common:3.3.6",
        "com.amazonaws:aws-java-sdk-bundle:1.12.367"
    ])) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.endpoint.region", "us-east-1") \
    .getOrCreate()

print("✅ Spark version:", spark.version)


In [None]:
print("Spark master:", spark.sparkContext.master)


In [None]:
df = spark.read.parquet("s3a://staging/green_tripdata_2023-01.parquet")
df.show(5)


In [None]:
import pyspark
print(pyspark.__version__)


In [4]:
from pyspark.sql import Row

# Тестовые данные
data = [
    Row(VendorID=1, trip_distance=2.5, total_amount=12.0, payment_type=1),
    Row(VendorID=2, trip_distance=3.0, total_amount=15.5, payment_type=2),
]

df_test = spark.createDataFrame(data)

df_test.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/green_taxi") \
    .option("dbtable", "intermediate.green_taxi") \
    .option("user", "airflow") \
    .option("password", "airflow") \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()


                                                                                

In [5]:
df_filtered.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start() \
    .awaitTermination()


25/08/06 10:28:04 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-16e50cc9-5da0-41f5-9593-cd14ed7f6fcd. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/08/06 10:28:04 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-------------+------------+------------+
|VendorID|trip_distance|total_amount|payment_type|
+--------+-------------+------------+------------+
|2       |0.97         |14.3        |2           |
|2       |1.1          |16.9        |1           |
|2       |2.51         |34.9        |1           |
|1       |1.9          |20.85       |1           |
|2       |1.43         |19.68       |1           |
|2       |1.84         |27.8        |1           |
|2       |1.66         |20.52       |1           |
|2       |11.7         |64.44       |1           |
|2       |2.95         |28.38       |1           |
|2       |3.01         |19.9        |2           |
|2       |1.8          |19.68       |1           |
|1       |7.3          |46.55       |1           |
|2       |0.97         |14.3        |2           |
|2       |1.1          |16.9        |1           |
|2       |2.51         |34.9        

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


-------------------------------------------
Batch: 19
-------------------------------------------
+--------+-------------+------------+------------+
|VendorID|trip_distance|total_amount|payment_type|
+--------+-------------+------------+------------+
|1       |1.2          |13.6        |2           |
+--------+-------------+------------+------------+



KeyboardInterrupt: 

-------------------------------------------
Batch: 20
-------------------------------------------
+--------+-------------+------------+------------+
|VendorID|trip_distance|total_amount|payment_type|
+--------+-------------+------------+------------+
|1       |2.5          |20.6        |2           |
+--------+-------------+------------+------------+

-------------------------------------------
Batch: 21
-------------------------------------------
+--------+-------------+------------+------------+
|VendorID|trip_distance|total_amount|payment_type|
+--------+-------------+------------+------------+
|1       |1.4          |17.15       |1           |
+--------+-------------+------------+------------+

