In [1]:
import json
from time import sleep
from graphframes import *
from pyspark.sql.functions import col, desc, lit
from geopy.distance import geodesic

In [2]:
from kafka import KafkaConsumer, KafkaProducer

In [3]:
import os
from pyspark.sql import SparkSession
os.environ['PYSPARK_PYTHON'] = "/usr/bin/python3"
spark = SparkSession.builder.appName("graph processing")\
.config("spark.jars.packages","graphframes:graphframes:0.8.2-spark3.2-s_2.12") \
.master("local[*]").getOrCreate()

25/04/14 15:21:58 WARN Utils: Your hostname, bigdata2024 resolves to a loopback address: 127.0.1.1; using 10.3.132.222 instead (on interface ens3)
25/04/14 15:21:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/hadoop/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/hadoop/.ivy2/cache
The jars for the packages stored in: /home/hadoop/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-37985f59-0f52-4cf3-9a65-a33fe0808bf2;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.2-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 138ms :: artifacts dl 5ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.2-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------

In [4]:
def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes =  bytes(key, encoding='utf-8') 
        value_bytes =json.dumps(value).encode('utf-8') # bytes(value, encoding='utf-8') 
        print(value_bytes)
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully (producer).')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))

def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

In [5]:
def haversine_distance(lat1, lon1, lat2, lon2):
    R = 6371.0 
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat, dlon = lat2 - lat1, lon2 - lon1
    a = sin(dlat/2)**2 + cos(lat1)*cos(lat2)*sin(dlon/2)**2
    return 2 * R * atan2(sqrt(a), sqrt(1 - a))
    
def print_trip_summary_with_graphframes(edge_list, station_lookup):
    edges_df = spark.createDataFrame(edge_list, ["src", "dst", "count"])
    grouped_edges = edges_df.groupBy("src", "dst").sum("count").withColumnRenamed("sum(count)", "count")
    
    vertex_ids = set([src for src, _, _ in edge_list] + [dst for _, dst, _ in edge_list])
    vertices_data = [(sid, station_lookup.get(sid, f"Station {sid}")) for sid in vertex_ids]
    vertices_df = spark.createDataFrame(vertices_data, ["id", "name"])
    
    g = GraphFrame(vertices_df, grouped_edges)

    triplets_df = g.triplets \
        .select(
            col("edge.count").alias("trip_count"),
            col("src.name").alias("start_name"),
            col("dst.name").alias("end_name")
        ) \
        .orderBy(desc("trip_count"))
    
    print("\n--- Top 10 Trips ---")
    for row in triplets_df.take(10):
        print(f"There were {row['trip_count']} trips from {row['start_name']} to {row['end_name']}.")
    print("---------------------\n")

In [None]:
def start_kafka_trip_consumer():
    topic = "trip_data3"
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='trip_graph_group',
        value_deserializer=lambda x: x.decode('utf-8')
    )

    print("Listening to topic 'trip_data'...")
    edge_buffer = []
    station_lookup = {}

    try:
        for msg in consumer:
            fields = msg.value.strip().split(',')
            # print(fields)
            if len(fields) < 8:
                continue

            start_station_id = fields[4]
            end_station_id = fields[7]
            start_station_name = fields[3]
            end_station_name = fields[6]
 
            station_lookup[start_station_id] = start_station_name
            station_lookup[end_station_id] = end_station_name
 
            edge_buffer.append((start_station_id, end_station_id, 1))

            try:
                start_coords = (float(fields[11]), float(fields[12]))
                end_coords = (float(fields[13]), float(fields[14]))
            except:
                continue
                
            try:
                distance_km = geodesic(start_coords, end_coords).km
                if distance_km > 15:
                    print(f"ALERT: Bike {bike_id} traveled {distance_km:.2f} km from '{start_station_name}' to '{end_station_name}'")
            except Exception as ex:
                print(f"Failed to calculate distance: {ex}")
                
            # Print every 10 messages
            if len(edge_buffer) % 10 == 0:
                print_trip_summary_with_graphframes(edge_buffer, station_lookup)

    except KeyboardInterrupt:
        print("Stopped.")
    finally:
        consumer.close()

if __name__ == '__main__':
    start_kafka_trip_consumer()

Listening to topic 'trip_data'...





--- Top 10 Trips ---


                                                                                

There were 2 trips from Golden Gate at Polk to South Van Ness at Market.
There were 1 trips from Market at 10th to Golden Gate at Polk.
There were 1 trips from 2nd at Townsend to Yerba Buena Center of the Arts (3rd @ Howard).
There were 1 trips from Golden Gate at Polk to Civic Center BART (7th at Market).
There were 1 trips from Harry Bridges Plaza (Ferry Building) to Post at Kearney.
There were 1 trips from Harry Bridges Plaza (Ferry Building) to Howard at 2nd.
There were 1 trips from Steuart at Market to Embarcadero at Bryant.
There were 1 trips from Market at 4th to San Francisco Caltrain (Townsend at 4th).
There were 1 trips from Commercial at Montgomery to Market at 4th.
---------------------


--- Top 10 Trips ---
There were 2 trips from Golden Gate at Polk to South Van Ness at Market.
There were 1 trips from Paseo de San Antonio to San Jose Diridon Caltrain Station.
There were 1 trips from Cowper at University to University and Emerson.
There were 1 trips from Howard at 2nd to 