In [1]:
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as f
from pyspark.sql import types as t
from pyspark.sql.functions import *


#================== connection between  spark and kafka=======================================#
#==============================================================================================
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName('taxi') \
        .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2') \
        .config("fs.s3a.endpoint", "http://minio:9000") \
        .config("fs.s3a.access.key", "minioadmin") \
        .config("fs.s3a.secret.key", "minioadmin") \
        .config("fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .getOrCreate()
#==============================================================================================
#=========================================== ReadStream from kafka===========================#
socketDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "course-kafka:9092") \
    .option("Subscribe", "my_trip")\
    .load()\
    .selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
#==============================================================================================
#==============================Create schema for create df from json=========================#
schema = t.StructType() \
    .add("vendorid", t.StringType())                    .add("lpep_pickup_datetime", t.StringType()) \
    .add("lpep_dropoff_datetime", t.StringType())       .add("store_and_fwd_flag", t.StringType()) \
    .add("ratecodeid", t.StringType())                  .add("pickup_longitude", t.StringType()) \
    .add("pickup_latitude", t.StringType())             .add("dropoff_longitude", t.StringType()) \
    .add("dropoff_latitude", t.StringType())            .add("passenger_count", t.StringType()) \
    .add("trip_distance", t.StringType())               .add("fare_amount", t.StringType()) \
    .add("extra", t.StringType())                       .add("mta_tax", t.StringType()) \
    .add("tip_amount", t.StringType())                  .add("tolls_amount", t.StringType()) \
    .add("improvement_surcharge", t.StringType())       .add("total_amount", t.StringType())\
    .add("payment_type", t.StringType())                .add("trip_type", t.StringType())

#==============================================================================================
#==========================change json to dataframe with schema==============================#
taxiTripsDF = socketDF.select(f.col("value").cast("string")).select(f.from_json(f.col("value"), schema).alias("value")).select("value.*")

#====# 1: Remove spaces from column names====================================================#
taxiTripsDF = taxiTripsDF \
    .withColumnRenamed("vendorid", "vendorid")                          .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")     .withColumnRenamed("passenger_count", "passenger_count") \
    .withColumnRenamed("trip_distance", "trip_distance")                .withColumnRenamed("ratecodeid", "ratecodeid") \
    .withColumnRenamed("store_and_fwd_flag", "store_and_fwd_flag")      .withColumnRenamed("payment_type", "PaymentType")


#============================================================================================#
#==== 3: Add date columns from timestamp=====================================================#
#============================================================================================#
taxiTripsDF = taxiTripsDF.withColumn('TripStartDT', taxiTripsDF['pickup_datetime'].cast('date'))
taxiTripsDF = taxiTripsDF.withColumn('TripEndDT', taxiTripsDF['dropoff_datetime'].cast('date'))

#============================================================================================#
#==== 4: Add/convert/casting Additional columns types=========================================#
#============================================================================================#
taxiTripsDF = taxiTripsDF\
    .withColumn('trip_distance', taxiTripsDF['trip_distance'].cast('double'))\
    .withColumn('pickup_longitude', taxiTripsDF['pickup_longitude'].cast('double')) \
    .withColumn('pickup_latitude', taxiTripsDF['pickup_latitude'].cast('double')) \
    .withColumn('dropoff_longitude', taxiTripsDF['dropoff_longitude'].cast('double')) \
    .withColumn('dropoff_latitude', taxiTripsDF['dropoff_latitude'].cast('double'))

taxiTripsDF = taxiTripsDF\
    .withColumn("hourd", hour(taxiTripsDF["pickup_datetime"])) \
    .withColumn("minuted", minute(taxiTripsDF["pickup_datetime"])) \
    .withColumn("secondd", second(taxiTripsDF["pickup_datetime"]))\
    .withColumn("dayofweek", dayofweek(taxiTripsDF["TripStartDT"])) \
    .withColumn("week_day_full", date_format(col("TripStartDT"), "EEEE"))

## CREATE A CLEANED DATA-FRAME BY DROPPING SOME UN-NECESSARY COLUMNS & FILTERING FOR UNDESIRED VALUES OR OUTLIERS
taxiTripsDF = taxiTripsDF\
    .drop('store_and_fwd_flag','fare_amount','extra','tolls_amount','mta_tax','improvement_surcharge','trip_type','ratecodeid','pickup_datetime','dropoff_datetime','PaymentType','TripStartDT','TripEndDT')\
    .filter("passenger_count > 0 and passenger_count < 8  AND tip_amount >= 0 AND tip_amount < 30 AND fare_amount >= 1 AND fare_amount < 150 AND trip_distance > 0 AND trip_distance < 100 ")


# taxiTripsDF.printSchema()
# taxiTripsDF.show(8,False)




socketDF = socketDF\
    .writeStream\
            .format("console")\
            .outputMode("append")\
            .start()\
            .awaitTermination()


# s3_query = taxiTripsDF\
#     .writeStream\
#             .format("console")\
#             .outputMode("append")\
#             .start()\
#             .awaitTermination()







:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/developer/.ivy2/cache
The jars for the packages stored in: /home/developer/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-337dd2c8-2d6c-447a-a8c0-ca5974d1699a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 420ms :: artifacts dl 14ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from c

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+
|key|value|
+---+-----+
+---+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+

-------------------------------------------
Batc

                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "1",...|
+----+--------------------+



ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=59>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/sock

-------------------------------------------
Batch: 7
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+



Py4JError: An error occurred while calling o118.awaitTermination

-------------------------------------------
Batch: 8
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+

-------------------------------------------
Batch: 9
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+

-------------------------------------------
Batch: 10
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "1",...|
+----+--------------------+



                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "1",...|
+----+--------------------+



                                                                                

-------------------------------------------
Batch: 12
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+

-------------------------------------------
Batch: 13
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+



                                                                                

-------------------------------------------
Batch: 14
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+

-------------------------------------------
Batch: 15
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+



                                                                                

-------------------------------------------
Batch: 16
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "2",...|
+----+--------------------+

-------------------------------------------
Batch: 17
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"vendorid": "1",...|
+----+--------------------+

