### The Topic

Let's create a topic called: monster_movement

In [1]:
%pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2
Note: you may need to restart the kernel to use updated packages.


In [2]:
from kafka import KafkaAdminClient
from kafka.admin import NewTopic

# Kafka broker configuration
bootstrap_servers = "kafka:9092"

# Create an instance of KafkaAdminClient
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

# Define the topics to be created
topics = [
    NewTopic(name="monster_movements", num_partitions=1, replication_factor=1)
]

# Create the topics
admin_client.create_topics(new_topics=topics)

# Close the admin client
admin_client.close()

### The Data

Next we'll generate the data. We want to use the functions we create in our Spark streaming practice examples. What we want to achieve is:

1. Create 200 events.
2. The event should pick five random rows from our dnd_monsters.csv and return the monster's name and str characteristic.
3. Along with the name and str we need the latitude and longitude of the monster and a timestamp. Latitude is between -90 and 90 and longitude is -180 to 180.
4. Add a timestamp field called ts.
5. Remember to put everything into JSON then into a field called value for the kafka message.
5. Don't write to Kafka yet.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, current_timestamp
import random
import time
import json

# Create a SparkSession
spark = SparkSession.builder.appName("Kafka Monsters").getOrCreate()

# Read data from the CSV file into a DataFrame
df = spark.read.csv("./work/data/dnd_monsters.csv",  header=True)

# Define the number of events to generate
num_of_events = 200

# Select random rows and extract required fields (name, strength)
random_monsters = df.orderBy(rand()).limit(num_of_events).select("name", "str")

# Add latitude and longitude to DataFrame
random_monsters = random_monsters.withColumn("lat", rand() * 180 - 90)
random_monsters = random_monsters.withColumn("long", rand() * 360 - 180)

# Add a timestamp field to each row
random_monsters = random_monsters.withColumn("ts", current_timestamp())

# Format the data into JSON and store in a new column
formatted_monsters = random_monsters.toJSON().map(json.loads)

# Show the data
for monsters in formatted_monsters.collect():
    print(monsters)

### Let's add more

Now that we have our JSON monster locations printing and ready to use, but first let's change those latitudes and longitudes to countries. Again try this yourself first and don't give up too quickly if you find it hard, use the internet. Here's the library and how to use it:

In [3]:
%pip install geopy

Collecting geopy
  Downloading geopy-2.3.0-py3-none-any.whl (119 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m119.8/119.8 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting geographiclib<3,>=1.52 (from geopy)
  Downloading geographiclib-2.0-py3-none-any.whl (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.3/40.3 kB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: geographiclib, geopy
Successfully installed geographiclib-2.0 geopy-2.3.0
Note: you may need to restart the kernel to use updated packages.


In [4]:
from geopy.geocoders import Nominatim

# Create a geocoder instance
geolocator = Nominatim(user_agent="my_geocoder")

# Define the latitude and longitude coordinates
latitude = 51.5074
longitude = -0.1278

# Reverse geocode to get the location information
location = geolocator.reverse((latitude, longitude), language='en')

# Extract the country from the location information, with a random lat and long it might not have a country
country = "Sea"
try:
    country = location.raw['address']['country']
except:
    print("out to sea")
print(country)

# Print the country
print(country)

United Kingdom
United Kingdom


In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
import random
import datetime
import json

from geopy.geocoders import Nominatim

# Create a geocoder instance
geolocator = Nominatim(user_agent="my_geocoder")


def choose_random_item(lst):
    return random.choice(lst)

def random_int(fro, to):
    return random.randint(fro, to)
# Create a SparkSession
spark = SparkSession.builder.appName("Monsters") \
.config("spark.jars","commons-pool2-2.11.1.jar,spark-sql-kafka-0-10_2.12-3.4.0.jar,spark-streaming-kafka-0-10-assembly_2.12-3.4.0.jar").getOrCreate()

# Read the file into a DataFrame
df = spark.read.csv("./work/data/dnd_monsters.csv",  header=True).where("str is not null").select("name", "str")

# Randomly select 5 lines
random_lines = df.orderBy(rand()).limit(5)

# Show the selected lines
random_lines.show(truncate=False)
list_tuples = random_lines.rdd.map(tuple).collect()


for monster in range(1,200):
    random_tuple = random.choice(list_tuples)
    lat = random_int(-90, 90)
    long = random_int(-180, 180)

    # Reverse geocode to get the location information
    location = geolocator.reverse((lat, long), language='en')

    # Extract the country from the location information
    country = "Sea"
    try:
        country = location.raw['address']['country']
    except:
        print("out to sea")
    print(country)
    data = {
    "name": random_tuple[0],
    "str": random_tuple[1],
    "ts": str(datetime.datetime.now()),
    "lat": lat,
    "long": long,
    "country": country
    }

    json_string = json.dumps(data)

    # Print the JSON string
    print(json_string)
    data = [(json_string,),]
    df = spark.createDataFrame(data, ["value"])
    df.show(1, False)
    # # Write the DataFrame to Kafka
    df.write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("topic", "monster_movements") \
        .save()
    
    

+------------+----+
|name        |str |
+------------+----+
|lich        |11.0|
|steam-mephit|5.0 |
|bone-devil  |18.0|
|ghast       |16.0|
|merfolk     |10.0|
+------------+----+

out to sea
Sea
{"name": "steam-mephit", "str": "5.0", "ts": "2023-08-01 13:30:43.320865", "lat": -42, "long": -44, "country": "Sea"}
+---------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                |
+---------------------------------------------------------------------------------------------------------------------+
|{"name": "steam-mephit", "str": "5.0", "ts": "2023-08-01 13:30:43.320865", "lat": -42, "long": -44, "country": "Sea"}|
+---------------------------------------------------------------------------------------------------------------------+



AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, current_timestamp, udf
from pyspark.sql.types import StringType, StructType, StructField
import random
import time
import json
from geopy.geocoders import Nominatim

# Create a geocoder instance
geolocator = Nominatim(user_agent="my_geocoder")

# Create a SparkSession
spark = SparkSession.builder.appName("Kafka Monsters").getOrCreate()

# Read data from the CSV file into a DataFrame
df = spark.read.csv("./work/data/dnd_monsters.csv", header=True)

# Define the number of events to generate
num_of_events = 200

# Select random rows and extract required fields (name, strength)
random_monsters = df.orderBy(rand()).limit(num_of_events).select("name", "str")

# Add latitude and longitude to DataFrame
random_monsters = random_monsters.withColumn("lat", rand() * 180 - 90)
random_monsters = random_monsters.withColumn("long", rand() * 360 - 180)

# Add a timestamp field to each row
random_monsters = random_monsters.withColumn("ts", current_timestamp())

# Convert latitude and longitude to string
random_monsters = random_monsters.withColumn("lat_str", random_monsters["lat"].cast(StringType()))
random_monsters = random_monsters.withColumn("long_str", random_monsters["long"].cast(StringType()))

# Format the data into JSON and store in a new column
schema = StructType([
    StructField("name", StringType(), True),
    StructField("str", StringType(), True),
    StructField("lat", StringType(), True),
    StructField("long", StringType(), True),
    StructField("ts", StringType(), True),
    StructField("country", StringType(), True)
])

def get_country(lat, long):
    try:
        location = geolocator.reverse((lat, long), language='en')
        country = location.raw['address']['country']
        return country
    except:
        return "Unknown"

get_country_udf = udf(get_country, StringType())

# Add the country field to the DataFrame
random_monsters = random_monsters.withColumn("country", get_country_udf("lat_str", "long_str"))

# Select the required columns for the final DataFrame
final_data = random_monsters.select("name", "str", "lat", "long", "ts", "country")

# Write the DataFrame to Kafka
final_data.selectExpr("CAST(name AS STRING) AS key", "to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "monster_movements") \
    .save()


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.