In [56]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = (
    SparkSession.builder
    .master("local[2]")
    .appName("spark_graphframes_lab")
    .config("spark.jars.packages", "io.graphframes:graphframes-spark4_2.13:0.10.0")
    .config("spark.jars.repositories", "https://repo1.maven.org/maven2")
    .getOrCreate()
)

In [57]:
event_history = spark.read.option("multiLine", True).json("../data/venue/event_history.json")
venue_df = spark.read.option("multiLine", True).json("../data/venue/venues.json")
client_df = spark.read.option("multiLine", True).json("../data/venue/client_profiles.json")

print(f"Events: {event_history.count()}")
print(f"Venues: {venue_df.count()}")
print(f"Clients: {client_df.count()}")

Events: 500
Venues: 120
Clients: 80


In [58]:

venue_nodes = (
    venue_df
    .select("*")
    .withColumn("id", F.col("venue_id"))
    .withColumn("type", F.lit("venue"))
    .select("id", "type", *[c for c in venue_df.columns if c != "venue_id"])
)

# client nodes
client_nodes = (
    client_df
    .select("*")
    .withColumn("id", F.col("client_id"))
    .withColumn("type", F.lit("client"))
    .select("id", "type", *[c for c in client_df.columns if c != "client_id"])
)

# event nodes (select key attributes)
event_nodes = (
    event_history
    .select(
        "event_id",
        "event_name",
        "event_type",
        "event_style",
        "attendee_count",
        "duration_days",
        "total_cost",
        "venue_cost",
        "catering_cost",
        "av_cost",
        "overall_satisfaction",
        "client_rating",
        "venue_rating",
        "outcome",
        "would_rebook",
        "would_recommend",
        "budget_met",
        "city",
        "event_dates"
    )
    .withColumn("id", F.col("event_id"))
    .withColumn("type", F.lit("event"))
    .select("id", "type", *[c for c in event_history.columns if c in ["event_name", "event_type", "event_style", "attendee_count", "duration_days", "total_cost", "venue_cost", "catering_cost", "av_cost", "overall_satisfaction", "client_rating", "venue_rating", "outcome", "would_rebook", "would_recommend", "budget_met", "city", "event_dates"]])
)

# combine all nodes
all_nodes = venue_nodes.unionByName(client_nodes, allowMissingColumns=True).unionByName(event_nodes, allowMissingColumns=True)

print("=== Node Summary ===")
print(f"Total nodes: {all_nodes.count()}")
print(f"Venue nodes: {venue_nodes.count()}")
print(f"Client nodes: {client_nodes.count()}")
print(f"Event nodes: {event_nodes.count()}")
print("\nNode types distribution:")
all_nodes.groupBy("type").count().show()

=== Node Summary ===
Total nodes: 700
Venue nodes: 120
Client nodes: 80
Event nodes: 500

Node types distribution:
+------+-----+
|  type|count|
+------+-----+
| venue|  120|
|client|   80|
| event|  500|
+------+-----+



In [59]:

# edge 1: Event → Venue (hosted_at)
event_venue_edges = (
    event_history
    .select(
        "event_id",
        "venue_id",
        "venue_rating",
        "venue_cost",
        "outcome",
        "would_rebook",
        "would_recommend"
    )
    .withColumn("src", F.col("event_id"))
    .withColumn("dst", F.col("venue_id"))
    .withColumn("relationship", F.lit("hosted_at"))
    .select("src", "dst", "relationship", "venue_rating", "venue_cost", "outcome", "would_rebook", "would_recommend")
    .filter(F.col("src").isNotNull() & F.col("dst").isNotNull())
)

# edge 2: Event → Client (organized_by)
event_client_edges = (
    event_history
    .select(
        "event_id",
        "client_id",
        "client_rating",
        "total_cost",
        "budget_met",
        "overall_satisfaction"
    )
    .withColumn("src", F.col("event_id"))
    .withColumn("dst", F.col("client_id"))
    .withColumn("relationship", F.lit("organized_by"))
    .select("src", "dst", "relationship", "client_rating", "total_cost", "budget_met", "overall_satisfaction")
    .filter(F.col("src").isNotNull() & F.col("dst").isNotNull())
)

# edge 3: Client → Venue (booked) - aggregated from events
client_venue_edges = (
    event_history
    .select(
        "client_id",
        "venue_id",
        "overall_satisfaction",
        "total_cost",
        "venue_cost",
        "event_dates"
    )
    .withColumn("src", F.col("client_id"))
    .withColumn("dst", F.col("venue_id"))
    .withColumn("relationship", F.lit("booked"))
    .groupBy("src", "dst", "relationship")
    .agg(
        F.count("*").alias("booking_count"),
        F.avg("overall_satisfaction").alias("avg_satisfaction"),
        F.avg("total_cost").alias("avg_total_cost"),
        F.avg("venue_cost").alias("avg_venue_cost"),
        F.min("event_dates").alias("first_booking_date"),
        F.max("event_dates").alias("last_booking_date")
    )
    .filter(F.col("src").isNotNull() & F.col("dst").isNotNull())
)

print("=== Edge Summary ===")
print(f"Event → Venue edges: {event_venue_edges.count()}")
print(f"Event → Client edges: {event_client_edges.count()}")
print(f"Client → Venue edges: {client_venue_edges.count()}")

=== Edge Summary ===
Event → Venue edges: 500
Event → Client edges: 500
Client → Venue edges: 497


In [60]:

client_venue_pairs = event_history.select("client_id", "venue_id").distinct()

co_client_edges = (
    client_venue_pairs.alias("cv1")
    .join(
        client_venue_pairs.alias("cv2"),
        F.col("cv1.client_id") == F.col("cv2.client_id"),
        "inner"
    )
    .filter(F.col("cv1.venue_id") < F.col("cv2.venue_id"))  # Avoid duplicates and self-loops
    .groupBy(
        F.col("cv1.venue_id").alias("src"),
        F.col("cv2.venue_id").alias("dst")
    )
    .agg(
        F.count("*").alias("shared_client_count"),
        F.collect_list("cv1.client_id").alias("shared_clients")
    )
    .withColumn("relationship", F.lit("co_client"))
    .select("src", "dst", "relationship", "shared_client_count", "shared_clients")
)

print(f"Co-Client edges (Venue → Venue): {co_client_edges.count()}")
print("\nTop 10 co-client relationships:")
co_client_edges.orderBy(F.col("shared_client_count").desc()).show(10, truncate=False)

Co-Client edges (Venue → Venue): 111

Top 10 co-client relationships:
+-------+-------+------------+-------------------+---------------------------+
|src    |dst    |relationship|shared_client_count|shared_clients             |
+-------+-------+------------+-------------------+---------------------------+
|VEN-222|VEN-751|co_client   |3                  |[CLI-798, CLI-281, CLI-711]|
|VEN-842|VEN-987|co_client   |2                  |[CLI-276, CLI-725]         |
|VEN-704|VEN-707|co_client   |2                  |[CLI-292, CLI-314]         |
|VEN-241|VEN-778|co_client   |1                  |[CLI-722]                  |
|VEN-241|VEN-786|co_client   |1                  |[CLI-722]                  |
|VEN-607|VEN-744|co_client   |1                  |[CLI-744]                  |
|VEN-669|VEN-996|co_client   |1                  |[CLI-264]                  |
|VEN-222|VEN-778|co_client   |1                  |[CLI-711]                  |
|VEN-579|VEN-778|co_client   |1                  |[CLI-961]  

In [None]:
# union all edge DataFrames (they need to have compatible schemas)

event_venue_edges_std = event_venue_edges.select(
    "src", "dst", "relationship",
    F.lit(None).cast("int").alias("shared_client_count"),
    F.lit(None).cast("array<string>").alias("shared_clients"),
    F.col("venue_rating").alias("rating"),
    F.col("venue_cost").alias("cost"),
    F.col("outcome"),
    F.col("would_rebook"),
    F.col("would_recommend"),
    F.lit(None).cast("double").alias("avg_satisfaction"),
    F.lit(None).cast("double").alias("avg_total_cost"),
    F.lit(None).cast("double").alias("avg_venue_cost"),
    F.lit(None).cast("array<string>").alias("first_booking_date"),
    F.lit(None).cast("array<string>").alias("last_booking_date")
)

event_client_edges_std = event_client_edges.select(
    "src", "dst", "relationship",
    F.lit(None).cast("int").alias("shared_client_count"),
    F.lit(None).cast("array<string>").alias("shared_clients"),
    F.col("client_rating").alias("rating"),
    F.col("total_cost").alias("cost"),
    F.lit(None).cast("string").alias("outcome"),
    F.lit(None).cast("boolean").alias("would_rebook"),
    F.lit(None).cast("boolean").alias("would_recommend"),
    F.col("overall_satisfaction").alias("avg_satisfaction"),
    F.col("total_cost").alias("avg_total_cost"),
    F.lit(None).cast("double").alias("avg_venue_cost"),
    F.lit(None).cast("array<string>").alias("first_booking_date"),
    F.lit(None).cast("array<string>").alias("last_booking_date")
)

client_venue_edges_std = client_venue_edges.select(
    "src", "dst", "relationship",
    F.lit(None).cast("int").alias("shared_client_count"),
    F.lit(None).cast("array<string>").alias("shared_clients"),
    F.lit(None).cast("double").alias("rating"),
    F.col("avg_venue_cost").alias("cost"),
    F.lit(None).cast("string").alias("outcome"),
    F.lit(None).cast("boolean").alias("would_rebook"),
    F.lit(None).cast("boolean").alias("would_recommend"),
    F.col("avg_satisfaction"),
    F.col("avg_total_cost"),
    F.col("avg_venue_cost"),
    F.col("first_booking_date"),
    F.col("last_booking_date")
)

co_client_edges_std = co_client_edges.select(
    "src", "dst", "relationship",
    F.col("shared_client_count"),
    F.col("shared_clients"),
    F.lit(None).cast("double").alias("rating"),
    F.lit(None).cast("long").alias("cost"),
    F.lit(None).cast("string").alias("outcome"),
    F.lit(None).cast("boolean").alias("would_rebook"),
    F.lit(None).cast("boolean").alias("would_recommend"),
    F.lit(None).cast("double").alias("avg_satisfaction"),
    F.lit(None).cast("double").alias("avg_total_cost"),
    F.lit(None).cast("double").alias("avg_venue_cost"),
    F.lit(None).cast("array<string>").alias("first_booking_date"),
    F.lit(None).cast("array<string>").alias("last_booking_date")
)

# combine all edges
all_edges = (
    event_venue_edges_std
    .unionByName(event_client_edges_std, allowMissingColumns=True)
    .unionByName(client_venue_edges_std, allowMissingColumns=True)
    .unionByName(co_client_edges_std, allowMissingColumns=True)
)

print("=== Combined Edge Summary ===")
print(f"Total edges: {all_edges.count()}")
print("\nEdge types distribution:")

=== Combined Edge Summary ===
Total edges: 1608

Edge types distribution:


In [62]:
from graphframes import GraphFrame

graph = GraphFrame(all_nodes, all_edges)

print("=== Graph Created Successfully ===")
print(f"Number of vertices: {graph.vertices.count()}")
print(f"Number of edges: {graph.edges.count()}")
print(f"\nVertex types:")
graph.vertices.groupBy("type").count().show()
print(f"\nEdge types:")
graph.edges.groupBy("relationship").count().orderBy(F.col("count").desc()).show()

=== Graph Created Successfully ===
Number of vertices: 700
Number of edges: 1608

Vertex types:
+------+-----+
|  type|count|
+------+-----+
| venue|  120|
|client|   80|
| event|  500|
+------+-----+


Edge types:
+------------+-----+
|relationship|count|
+------------+-----+
|   hosted_at|  500|
|organized_by|  500|
|      booked|  497|
|   co_client|  111|
+------------+-----+



In [67]:
print("top 10 venues by degree (most connected)")
venue_degrees = (
    graph.degrees
    .join(venue_nodes.select("id", "name", "city", "venue_type"), "id", "inner")
    .orderBy(F.col("degree").desc())
    .limit(10)
)
venue_degrees.show(truncate=False)

print("\n=== Top 10 Clients by Degree (Most Active) ===")
client_degrees = (
    graph.degrees
    .join(client_nodes.select("id", "company_name", "industry"), "id", "inner")
    .orderBy(F.col("degree").desc())
    .limit(10)
)
client_degrees.show(truncate=False)

top 10 venues by degree (most connected)
+-------+------+---------------------------------------------+---------------+-----------------+
|id     |degree|name                                         |city           |venue_type       |
+-------+------+---------------------------------------------+---------------+-----------------+
|VEN-170|44    |Walker - Blanda Convention Center            |Austin, TX     |convention_center|
|VEN-170|44    |Pagac - Fadel Center                         |Denver, CO     |conference_center|
|VEN-842|38    |Kuvalis, Grimes and Kassulke Hall            |Denver, CO     |resort           |
|VEN-744|36    |Treutel and Sons Center                      |Austin, TX     |historic_venue   |
|VEN-751|35    |Zulauf, Harris and Swaniawski Center         |Portland, OR   |historic_venue   |
|VEN-222|35    |Dickens - Greenholt Hotel & Conference Center|Los Angeles, CA|conference_center|
|VEN-778|33    |Schiller - Green Hall                        |Los Angeles, CA|conventi

In [68]:
print("finding paths: Client → Event → Venue")

client_journey = graph.find("(client)-[e1]->(event); (event)-[e2]->(venue)")

print(f"Total client journey paths: {client_journey.count()}")
print("\nSample client journeys:")
client_journey.select(
    F.col("client.id").alias("client_id"),
    F.col("event.id").alias("event_id"),
    F.col("venue.id").alias("venue_id"),
    F.col("e1.relationship").alias("client_event_rel"),
    F.col("e2.relationship").alias("event_venue_rel")
).show(10, truncate=False)

finding paths: Client → Event → Venue
Total client journey paths: 2010

Sample client journeys:
+------------+--------+--------+----------------+---------------+
|client_id   |event_id|venue_id|client_event_rel|event_venue_rel|
+------------+--------+--------+----------------+---------------+
|EVT-2020-001|VEN-196 |VEN-620 |hosted_at       |co_client      |
|EVT-2020-001|VEN-196 |VEN-607 |hosted_at       |co_client      |
|EVT-2022-002|VEN-455 |VEN-778 |hosted_at       |co_client      |
|EVT-2022-002|VEN-455 |VEN-758 |hosted_at       |co_client      |
|EVT-2024-004|VEN-480 |VEN-573 |hosted_at       |co_client      |
|EVT-2024-004|VEN-480 |VEN-758 |hosted_at       |co_client      |
|EVT-2024-004|VEN-480 |VEN-894 |hosted_at       |co_client      |
|EVT-2024-004|VEN-480 |VEN-894 |hosted_at       |co_client      |
|EVT-2024-004|VEN-480 |VEN-996 |hosted_at       |co_client      |
|EVT-2024-004|VEN-480 |VEN-996 |hosted_at       |co_client      |
+------------+--------+--------+--------------

In [69]:
print("event type → venue type relationships")

# Join events with venues through the graph edges
event_venue_with_types = (
    graph.find("(event)-[e]->(venue)")
    .filter(F.col("e.relationship") == "hosted_at")
    .select(
        F.col("event.id").alias("event_id"),
        F.col("event.event_type").alias("event_type"),
        F.col("venue.id").alias("venue_id"),
        F.col("venue.venue_type").alias("venue_type"),
        F.col("e.outcome").alias("outcome"),
        F.col("e.rating").alias("rating"),
        F.col("e.would_recommend").alias("would_recommend")
    )
    .filter(
        F.col("event.event_type").isNotNull() & 
        F.col("venue.venue_type").isNotNull()
    )
)

# Count event_type → venue_type combinations
event_venue_patterns = (
    event_venue_with_types
    .groupBy("event_type", "venue_type")
    .agg(
        F.count("*").alias("count"),
        F.avg("rating").alias("avg_rating"),
        F.sum(F.when(F.col("would_recommend") == True, 1).otherwise(0)).alias("recommendations"),
        F.sum(F.when(F.col("outcome") == "highly_successful", 1).otherwise(0)).alias("highly_successful_count")
    )
    .withColumn("recommendation_rate", F.col("recommendations") / F.col("count"))
    .withColumn("success_rate", F.col("highly_successful_count") / F.col("count"))
    .orderBy(F.col("count").desc())
)

print("\nTop 20 Event Type → Venue Type Combinations:")
event_venue_patterns.show(20, truncate=False)


event type → venue type relationships

Top 20 Event Type → Venue Type Combinations:
+--------------------+-----------------+-----+------------------+---------------+-----------------------+-------------------+-------------------+
|event_type          |venue_type       |count|avg_rating        |recommendations|highly_successful_count|recommendation_rate|success_rate       |
+--------------------+-----------------+-----+------------------+---------------+-----------------------+-------------------+-------------------+
|product_launch      |conference_center|20   |4.084999999999999 |10             |4                      |0.5                |0.2                |
|corporate_conference|historic_venue   |20   |4.094999999999999 |10             |4                      |0.5                |0.2                |
|training            |hotel            |18   |4.266666666666667 |11             |6                      |0.6111111111111112 |0.3333333333333333 |
|annual_meeting      |conference_center|

In [71]:

print("location → event type")

location_event = (
    graph.find("(event)-[e]->(venue)")
    .filter(F.col("e.relationship") == "hosted_at")
    .select(
        F.col("venue.city").alias("city"),
        F.col("venue.region").alias("region"),
        F.col("event.event_type").alias("event_type"),
        F.col("e.rating").alias("rating"),
        F.col("e.outcome").alias("outcome"),
        F.col("e.would_recommend").alias("would_recommend")
    )
    .filter(
        F.col("venue.city").isNotNull() & 
        F.col("event.event_type").isNotNull()
    )
)

# Aggregate by location (city) and event type
location_patterns = (
    location_event
    .groupBy("city", "event_type")
    .agg(
        F.count("*").alias("count"),
        F.avg("rating").alias("avg_rating"),
        F.sum(F.when(F.col("would_recommend") == True, 1).otherwise(0)).alias("recommendations"),
        F.sum(F.when(F.col("outcome") == "highly_successful", 1).otherwise(0)).alias("highly_successful")
    )
    .withColumn("recommendation_rate", F.col("recommendations") / F.col("count"))
    .withColumn("success_rate", F.col("highly_successful") / F.col("count"))
    .orderBy(F.col("city"), F.col("count").desc())
)

print("\nTop patterns by City → Event Type:")
location_patterns.show(50, truncate=False)

# aggregate by region and event type
region_patterns = (
    location_event
    .groupBy("region", "event_type")
    .agg(
        F.count("*").alias("count"),
        F.avg("rating").alias("avg_rating"),
        F.sum(F.when(F.col("would_recommend") == True, 1).otherwise(0)).alias("recommendations"),
        F.sum(F.when(F.col("outcome") == "highly_successful", 1).otherwise(0)).alias("highly_successful")
    )
    .withColumn("recommendation_rate", F.col("recommendations") / F.col("count"))
    .withColumn("success_rate", F.col("highly_successful") / F.col("count"))
    .orderBy(F.col("region"), F.col("count").desc())
)

print("\npatterns by region → event type")
region_patterns.show(50, truncate=False)

location → event type

Top patterns by City → Event Type:
+---------------+--------------------+-----+------------------+---------------+-----------------+-------------------+-------------------+
|city           |event_type          |count|avg_rating        |recommendations|highly_successful|recommendation_rate|success_rate       |
+---------------+--------------------+-----+------------------+---------------+-----------------+-------------------+-------------------+
|Atlanta, GA    |annual_meeting      |2    |3.5               |0              |0                |0.0                |0.0                |
|Atlanta, GA    |corporate_conference|2    |3.5               |0              |0                |0.0                |0.0                |
|Atlanta, GA    |team_building       |2    |3.5               |0              |0                |0.0                |0.0                |
|Atlanta, GA    |client_appreciation |1    |4.7               |1              |0                |1.0              

In [None]:

# get all event-venue relationships with types
event_venue_types = (
    graph.find("(event)-[e]->(venue)")
    .filter(F.col("e.relationship") == "hosted_at")
    .select(
        F.col("event.id").alias("event_id"),
        F.col("event.event_type").alias("event_type"),
        F.col("venue.id").alias("venue_id"),
        F.col("venue.venue_type").alias("venue_type")
    )
    .filter(
        F.col("event.event_type").isNotNull() & 
        F.col("venue.venue_type").isNotNull()
    )
)

# get co-client edges
co_client_venue_pairs = (
    graph.edges
    .filter(F.col("relationship") == "co_client")
    .select("src", "dst", "shared_client_count", "shared_clients")
)

# get event types for each venue in co-client pairs
venue1_events = event_venue_types.alias("v1_events").select(
    F.col("venue_id").alias("venue1_id"),
    F.col("event_type").alias("venue1_event_type"),
    F.col("venue_type").alias("venue1_venue_type")
)

venue2_events = event_venue_types.alias("v2_events").select(
    F.col("venue_id").alias("venue2_id"),
    F.col("event_type").alias("venue2_event_type"),
    F.col("venue_type").alias("venue2_venue_type")
)

co_client_patterns = (
    co_client_venue_pairs
    .join(venue1_events, F.col("src") == F.col("venue1_id"), "inner")
    .join(venue2_events, F.col("dst") == F.col("venue2_id"), "inner")
    .filter(F.col("venue1_event_type") == F.col("venue2_event_type"))  # Same event type
    .groupBy(
        "src", "dst", 
        "venue1_venue_type", "venue2_venue_type",
        "venue1_event_type",
        "shared_client_count"
    )
    .agg(F.count("*").alias("shared_event_type_count"))
    .orderBy(F.col("shared_client_count").desc(), F.col("shared_event_type_count").desc())
)

co_client_patterns.show(20, truncate=False)

# do co-client venues tend to serve similar event types?
venue1_types = (
    event_venue_types
    .select("venue_id", "venue_type")
    .distinct()
    .select(
        F.col("venue_id").alias("venue1_id"),
        F.col("venue_type").alias("venue1_type")
    )
)

venue2_types = (
    event_venue_types
    .select("venue_id", "venue_type")
    .distinct()
    .select(
        F.col("venue_id").alias("venue2_id"),
        F.col("venue_type").alias("venue2_type")
    )
)

co_client_venue_type_summary = (
    co_client_venue_pairs
    .join(
        venue1_types,
        F.col("src") == F.col("venue1_id"),
        "inner"
    )
    .join(
        venue2_types,
        F.col("dst") == F.col("venue2_id"),
        "inner"
    )
    .groupBy("venue1_type", "venue2_type")
    .agg(
        F.count("*").alias("co_client_count"),
        F.avg("shared_client_count").alias("avg_shared_clients")
    )
    .orderBy(F.col("co_client_count").desc())
)

co_client_venue_type_summary.show(truncate=False)

+-------+-------+-----------------+-----------------+--------------------+-------------------+-----------------------+
|src    |dst    |venue1_venue_type|venue2_venue_type|venue1_event_type   |shared_client_count|shared_event_type_count|
+-------+-------+-----------------+-----------------+--------------------+-------------------+-----------------------+
|VEN-222|VEN-751|conference_center|historic_venue   |corporate_conference|3                  |4                      |
|VEN-222|VEN-751|conference_center|historic_venue   |product_launch      |3                  |4                      |
|VEN-222|VEN-751|conference_center|historic_venue   |awards_ceremony     |3                  |4                      |
|VEN-222|VEN-751|conference_center|historic_venue   |trade_show          |3                  |2                      |
|VEN-222|VEN-751|conference_center|historic_venue   |workshop            |3                  |2                      |
|VEN-222|VEN-751|conference_center|historic_venu

In [None]:
# find communities/clusters of event types based on venue type usage
from pyspark.sql.window import Window

event_venue_matrix = (
    graph.find("(event)-[e]->(venue)")
    .filter(F.col("e.relationship") == "hosted_at")
    .select(
        F.col("event.event_type").alias("event_type"),
        F.col("venue.venue_type").alias("venue_type")
    )
    .filter(
        F.col("event.event_type").isNotNull() & 
        F.col("venue.venue_type").isNotNull()
    )
    .groupBy("event_type", "venue_type")
    .agg(F.count("*").alias("count"))
    .orderBy("event_type", F.col("count").desc())
)

event_venue_matrix.show(50, truncate=False)

event_type_features = (
    event_venue_matrix
    .groupBy("event_type")
    .pivot("venue_type")
    .sum("count")
    .fillna(0)
)

# calculate similarity between event types (using venue type usage as features)
top_venue_per_event = (
    event_venue_matrix
    .withColumn("rank", F.row_number().over(
        Window.partitionBy("event_type")
        .orderBy(F.col("count").desc())
    ))
    .filter(F.col("rank") <= 3)  # Top 3 venue types per event type
    .select("event_type", "venue_type", "count", "rank")
)



Event Type × Venue Type Matrix (Top 50):
+--------------------+-----------------+-----+
|event_type          |venue_type       |count|
+--------------------+-----------------+-----+
|annual_meeting      |conference_center|17   |
|annual_meeting      |resort           |11   |
|annual_meeting      |hotel            |10   |
|annual_meeting      |historic_venue   |9    |
|annual_meeting      |university       |5    |
|annual_meeting      |convention_center|3    |
|awards_ceremony     |resort           |15   |
|awards_ceremony     |conference_center|14   |
|awards_ceremony     |hotel            |11   |
|awards_ceremony     |convention_center|9    |
|awards_ceremony     |university       |7    |
|awards_ceremony     |historic_venue   |7    |
|client_appreciation |conference_center|16   |
|client_appreciation |resort           |15   |
|client_appreciation |hotel            |10   |
|client_appreciation |historic_venue   |8    |
|client_appreciation |convention_center|7    |
|client_appreciati

In [None]:
# which combinations lead to the most successful outcomes?

print("Success Patterns (Event Type × Venue Type)")

success_patterns = (
    graph.find("(event)-[e]->(venue)")
    .filter(F.col("e.relationship") == "hosted_at")
    .select(
        F.col("event.event_type").alias("event_type"),
        F.col("venue.venue_type").alias("venue_type"),
        F.col("e.outcome").alias("outcome"),
        F.col("e.rating").alias("rating"),
        F.col("e.would_rebook").alias("would_rebook"),
        F.col("e.would_recommend").alias("would_recommend")
    )
    .filter(
        F.col("event.event_type").isNotNull() & 
        F.col("venue.venue_type").isNotNull() &
        F.col("e.outcome").isNotNull()
    )
)

# aggregate success metrics
success_analysis = (
    success_patterns
    .groupBy("event_type", "venue_type")
    .agg(
        F.count("*").alias("total_events"),
        F.sum(F.when(F.col("outcome") == "highly_successful", 1).otherwise(0)).alias("highly_successful"),
        F.sum(F.when(F.col("outcome") == "successful", 1).otherwise(0)).alias("successful"),
        F.sum(F.when(F.col("outcome") == "unsuccessful", 1).otherwise(0)).alias("unsuccessful"),
        F.avg("rating").alias("avg_rating"),
        F.sum(F.when(F.col("would_rebook") == True, 1).otherwise(0)).alias("rebook_count"),
        F.sum(F.when(F.col("would_recommend") == True, 1).otherwise(0)).alias("recommend_count")
    )
    .withColumn("high_success_rate", F.col("highly_successful") / F.col("total_events"))
    .withColumn("overall_success_rate", (F.col("highly_successful") + F.col("successful")) / F.col("total_events"))
    .withColumn("rebook_rate", F.col("rebook_count") / F.col("total_events"))
    .withColumn("recommend_rate", F.col("recommend_count") / F.col("total_events"))
    .filter(F.col("total_events") >= 3)  # Minimum threshold
    .orderBy(F.col("high_success_rate").desc(), F.col("avg_rating").desc())
)

print("\n most successful event and venue type combinations:")
success_analysis.select(
    "event_type", "venue_type", "total_events",
    "high_success_rate", "overall_success_rate", 
    "avg_rating", "rebook_rate", "recommend_rate"
).show(30, truncate=False)

# find patterns: which venue types work best for each event type
print("\nbest venue type for each event type (by success rate)")
best_venue_by_success = (
    success_analysis
    .withColumn("rank", F.row_number().over(
        Window.partitionBy("event_type")
        .orderBy(F.col("high_success_rate").desc(), F.col("avg_rating").desc())
    ))
    .filter(F.col("rank") == 1)
    .select(
        "event_type", "venue_type", "total_events",
        "high_success_rate", "overall_success_rate", "avg_rating"
    )
    .orderBy(F.col("high_success_rate").desc())
)

best_venue_by_success.show(truncate=False)

Success Patterns (Event Type × Venue Type)

 most successful event and venue type combinations:
+--------------------+-----------------+------------+-------------------+--------------------+------------------+-------------------+-------------------+
|event_type          |venue_type       |total_events|high_success_rate  |overall_success_rate|avg_rating        |rebook_rate        |recommend_rate     |
+--------------------+-----------------+------------+-------------------+--------------------+------------------+-------------------+-------------------+
|workshop            |historic_venue   |11          |0.5454545454545454 |0.7272727272727273  |4.318181818181819 |0.7272727272727273 |0.7272727272727273 |
|gala                |convention_center|4           |0.5                |0.75                |4.35              |0.75               |0.75               |
|trade_show          |resort           |9           |0.4444444444444444 |0.8888888888888888  |4.466666666666666 |0.8888888888888888 |0

In [None]:
total_events = event_history.count()
total_venues = venue_df.count()
total_clients = client_df.count()

print(f"\nDataset Overview:")
print(f"  Total Events: {total_events}")
print(f"  Total Venues: {total_venues}")
print(f"  Total Clients: {total_clients}")

# event type distribution
print(f"\nEvent Type Distribution:")
event_type_dist = (
    event_history
    .filter(F.col("event_type").isNotNull())
    .groupBy("event_type")
    .count()
    .orderBy(F.col("count").desc())
)
event_type_dist.show(truncate=False)

# venue type distribution
venue_type_dist = (
    venue_df
    .filter(F.col("venue_type").isNotNull())
    .groupBy("venue_type")
    .count()
    .orderBy(F.col("count").desc())
)
venue_type_dist.show(truncate=False)

# most common event_type → venue_type combination
print(f"\nMost Common Event Type → Venue Type Combinations:")
most_common = (
    graph.find("(event)-[e]->(venue)")
    .filter(F.col("e.relationship") == "hosted_at")
    .select(
        F.col("event.event_type").alias("event_type"),
        F.col("venue.venue_type").alias("venue_type")
    )
    .filter(
        F.col("event.event_type").isNotNull() & 
        F.col("venue.venue_type").isNotNull()
    )
    .groupBy("event_type", "venue_type")
    .count()
    .orderBy(F.col("count").desc())
    .limit(10)
)
most_common.show(truncate=False)


Dataset Overview:
  Total Events: 500
  Total Venues: 120
  Total Clients: 80

Event Type Distribution:
+--------------------+-----+
|event_type          |count|
+--------------------+-----+
|corporate_conference|67   |
|product_launch      |57   |
|awards_ceremony     |56   |
|client_appreciation |54   |
|gala                |50   |
|annual_meeting      |49   |
|training            |47   |
|team_building       |47   |
|workshop            |45   |
|trade_show          |28   |
+--------------------+-----+


Venue Type Distribution:
+-----------------+-----+
|venue_type       |count|
+-----------------+-----+
|conference_center|24   |
|resort           |24   |
|university       |22   |
|hotel            |19   |
|historic_venue   |19   |
|convention_center|12   |
+-----------------+-----+


Most Common Event Type → Venue Type Combinations:
+--------------------+-----------------+-----+
|event_type          |venue_type       |count|
+--------------------+-----------------+-----+
|corporat

In [None]:

print("Event Type → Venue Type")

event_venue = (
    graph.find("(event)-[e]->(venue)")
    .filter(F.col("e.relationship") == "hosted_at")
    .select(
        F.col("event.event_type").alias("event_type"),
        F.col("venue.venue_type").alias("venue_type"),
        F.col("e.rating").alias("rating"),  # Fixed: use "rating" not "venue_rating"
        F.col("e.outcome").alias("outcome"),
        F.col("e.would_recommend").alias("would_recommend")
    )
    .filter(F.col("event.event_type").isNotNull() & F.col("venue.venue_type").isNotNull())
)

patterns = (
    event_venue
    .groupBy("event_type", "venue_type")
    .agg(
        F.count("*").alias("count"),
        F.avg("rating").alias("avg_rating"),
        F.sum(F.when(F.col("would_recommend") == True, 1).otherwise(0)).alias("recommendations"),
        F.sum(F.when(F.col("outcome") == "highly_successful", 1).otherwise(0)).alias("highly_successful")
    )
    .withColumn("recommendation_rate", F.col("recommendations") / F.col("count"))
    .withColumn("success_rate", F.col("highly_successful") / F.col("count"))
    .orderBy(F.col("count").desc())
)

patterns.show(20, truncate=False)

# patterns.filter(F.col("count") >= 3).orderBy(F.col("success_rate").desc(), F.col("avg_rating").desc()).show(10, truncate=False)
# patterns.show(10, truncate=False)

Event Type → Venue Type
+--------------------+-----------------+-----+------------------+---------------+-----------------+-------------------+-------------------+
|event_type          |venue_type       |count|avg_rating        |recommendations|highly_successful|recommendation_rate|success_rate       |
+--------------------+-----------------+-----+------------------+---------------+-----------------+-------------------+-------------------+
|product_launch      |conference_center|20   |4.084999999999999 |10             |4                |0.5                |0.2                |
|corporate_conference|historic_venue   |20   |4.094999999999999 |10             |4                |0.5                |0.2                |
|training            |hotel            |18   |4.266666666666667 |11             |6                |0.6111111111111112 |0.3333333333333333 |
|annual_meeting      |conference_center|17   |4.164705882352941 |8              |3                |0.47058823529411764|0.176470588235294

In [72]:
print("location → event type")

location_event = (
    graph.find("(event)-[e]->(venue)")
    .filter(F.col("e.relationship") == "hosted_at")
    .select(
        F.col("venue.city").alias("city"),
        F.col("venue.region").alias("region"),
        F.col("event.event_type").alias("event_type"),
        F.col("e.rating").alias("rating"),
        F.col("e.outcome").alias("outcome"),
        F.col("e.would_recommend").alias("would_recommend")
    )
    .filter(
        F.col("venue.city").isNotNull() & 
        F.col("event.event_type").isNotNull()
    )
)

# aggregate by location (city) and event type
location_patterns = (
    location_event
    .groupBy("city", "event_type")
    .agg(
        F.count("*").alias("count"),
        F.avg("rating").alias("avg_rating"),
        F.sum(F.when(F.col("would_recommend") == True, 1).otherwise(0)).alias("recommendations"),
        F.sum(F.when(F.col("outcome") == "highly_successful", 1).otherwise(0)).alias("highly_successful")
    )
    .withColumn("recommendation_rate", F.col("recommendations") / F.col("count"))
    .withColumn("success_rate", F.col("highly_successful") / F.col("count"))
    .orderBy(F.col("city"), F.col("count").desc())
)

print("\nTop patterns by City → Event Type:")
location_patterns.show(50, truncate=False)

# aggregate by region and event type
region_patterns = (
    location_event
    .groupBy("region", "event_type")
    .agg(
        F.count("*").alias("count"),
        F.avg("rating").alias("avg_rating"),
        F.sum(F.when(F.col("would_recommend") == True, 1).otherwise(0)).alias("recommendations"),
        F.sum(F.when(F.col("outcome") == "highly_successful", 1).otherwise(0)).alias("highly_successful")
    )
    .withColumn("recommendation_rate", F.col("recommendations") / F.col("count"))
    .withColumn("success_rate", F.col("highly_successful") / F.col("count"))
    .orderBy(F.col("region"), F.col("count").desc())
)

print("\n=== Patterns by Region → Event Type ===")
region_patterns.show(50, truncate=False)

location → event type

Top patterns by City → Event Type:
+---------------+--------------------+-----+------------------+---------------+-----------------+-------------------+-------------------+
|city           |event_type          |count|avg_rating        |recommendations|highly_successful|recommendation_rate|success_rate       |
+---------------+--------------------+-----+------------------+---------------+-----------------+-------------------+-------------------+
|Atlanta, GA    |annual_meeting      |2    |3.5               |0              |0                |0.0                |0.0                |
|Atlanta, GA    |corporate_conference|2    |3.5               |0              |0                |0.0                |0.0                |
|Atlanta, GA    |team_building       |2    |3.5               |0              |0                |0.0                |0.0                |
|Atlanta, GA    |client_appreciation |1    |4.7               |1              |0                |1.0              