In [None]:
#!pip install kafka-python
#!pip install pyspark

In [1]:
from kafka import KafkaProducer
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import expr
from pyspark.sql.functions import from_json
import json
import os
import time
os.environ['HADOOP_HOME'] = 'C:/hadoop/hadoop-3.3.6'
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["HADOOP_HOME"], "bin")


In [2]:
# Path to CSV file
csv_path = r"C:\Users\boris\Documents\DataEngenier\Spark\final_project\input\Electric_Vehicle_Population_Data.csv"
# Read CSV file
df = pd.read_csv(csv_path)

In [3]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 150482 entries, 0 to 150481
Data columns (total 17 columns):
 #   Column                                             Non-Null Count   Dtype  
---  ------                                             --------------   -----  
 0   VIN (1-10)                                         150482 non-null  object 
 1   County                                             150479 non-null  object 
 2   City                                               150479 non-null  object 
 3   State                                              150482 non-null  object 
 4   Postal Code                                        150479 non-null  float64
 5   Model Year                                         150482 non-null  int64  
 6   Make                                               150482 non-null  object 
 7   Model                                              150482 non-null  object 
 8   Electric Vehicle Type                              150482 non-null  object

In [None]:
print(df.isna().sum())
print('Total numbers of rows', len(df))

In [4]:
KAFKA_TOPIC = "Electric_Vehicle"
# Create Kafka Producer
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send each row as a Kafka message
for _, row in df.iterrows():
    message = row.to_dict()  # Convert row to dictionary
    producer.send(KAFKA_TOPIC, value=message)  # Send message
    #print(f"Sent: {message}")
    
# Ensure all messages are sent before the script exits
producer.flush()
# Close the producer connection
producer.close()


In [4]:
# Define Spark Session
spark = SparkSession.builder \
    .appName("KafkaToParquet").master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4") \
    .getOrCreate()

In [8]:
"""
KAFKA_TOPIC = "Electric_Vehicle"
# Step 2: Define Schema
schema = StructType([
    StructField('VIN (1-10)', StringType(), True),
    StructField('County', StringType(), True),
    StructField('City', StringType(), True),
    StructField('State', StringType(), True),
    StructField('Postal Code', FloatType(), True),
    StructField('Model Year', IntegerType(), True),
    StructField('Make', StringType(), True),
    StructField('Model', StringType(), True),
    StructField('Electric Vehicle Type', StringType(), True),
    StructField('Clean Alternative Fuel Vehicle (CAFV) Eligibility', StringType(), True),
    StructField('Electric Range', IntegerType(), True),
    StructField('Base MSRP', IntegerType(), True),
    StructField('Legislative District', FloatType(), True),
    StructField('DOL Vehicle ID', IntegerType(), True),
    StructField('Vehicle Location', StringType(), True),
    StructField('Electric Utility', StringType(), True),
    StructField('2020 Census Tract', FloatType(), True)
])

# Step 3: Read data from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()

kafka_df.printSchema()

# Step 4: Convert the Kafka value (in binary format) into a string and parse the JSON if needed
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)")
query_console = parsed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Step 5: Parse the JSON data into the proper schema
final_df = parsed_df.select(from_json("value", schema).alias("data")).select("data.*")

# Step 6: Write to Parquet format
output_path = "C:/Users/boris/Documents/DataEngenier/Spark/final_project/output/raw-data"

query_parquet = final_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("checkpointLocation", "C:/Users/boris/Documents/DataEngenier/Spark/final_project/checkpoints") \
    .option("path", output_path) \
    .start() 

# Stop both queries after they done processing
query_console.awaitTermination()
query_parquet.awaitTermination()
"""

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
# Step 6: Write to Parquet format
output_path = "C:/Users/boris/Documents/DataEngenier/Spark/final_project/output/raw-data"
df_raw_data = spark.read.parquet(output_path)
df_raw_data.show(5)
df_raw_data.printSchema()
print('Total numbers of rows', df_raw_data.count())

+----------+--------+--------+-----+-----------+----------+-------+--------------+---------------------+-------------------------------------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+
|VIN (1-10)|  County|    City|State|Postal Code|Model Year|   Make|         Model|Electric Vehicle Type|Clean Alternative Fuel Vehicle (CAFV) Eligibility|Electric Range|Base MSRP|Legislative District|DOL Vehicle ID|    Vehicle Location|    Electric Utility|2020 Census Tract|
+----------+--------+--------+-----+-----------+----------+-------+--------------+---------------------+-------------------------------------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+
|KM8K33AGXL|    King| Seattle|   WA|    98103.0|      2020|HYUNDAI|          KONA| Battery Electric ...|                             Clean Alternative...|           258|   

In [None]:
#docker run -d --name kafka -p 9092:9092 apache/kafka:latest
#docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Electric_Vehicle --from-beginning


In [None]:
#spark.version


In [6]:
#print(spark.streams.active) 

[]


In [8]:
#for query in spark.streams.active:
    query.stop()