In [5]:
%pip install findspark



In [6]:
import os
import findspark
findspark.init()

In [7]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F, types as T

In [8]:
from google.colab import auth
auth.authenticate_user()

In [9]:
# Build SparkSession (pull in the BigQuery connector)
spark = (
    SparkSession.builder
      .appName("BQ in Colab")
      .config(
        "spark.jars.packages",
        "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.34.0"
      )
      .config("spark.sql.execution.bigquery.parentProject", "lookerstudio-seminar")
      .getOrCreate()
)

In [10]:
df = (spark.read
      .format("bigquery")
      .option('parentProject','lookerstudio-seminar')
      .option("project",      "bigquery-public-data")
      .option("dataset",      "iowa_liquor_sales")
      .option("table",        "sales")
      .load())

In [11]:
df.printSchema()

root
 |-- invoice_and_item_number: string (nullable = true)
 |-- date: date (nullable = true)
 |-- store_number: string (nullable = true)
 |-- store_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- store_location: string (nullable = true)
 |-- county_number: string (nullable = true)
 |-- county: string (nullable = true)
 |-- category: string (nullable = true)
 |-- category_name: string (nullable = true)
 |-- vendor_number: string (nullable = true)
 |-- vendor_name: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- item_description: string (nullable = true)
 |-- pack: long (nullable = true)
 |-- bottle_volume_ml: long (nullable = true)
 |-- state_bottle_cost: double (nullable = true)
 |-- state_bottle_retail: double (nullable = true)
 |-- bottles_sold: long (nullable = true)
 |-- sale_dollars: double (nullable = true)
 |-- volume_sold_liters: double (nullable = tr

In [12]:
df.schema

StructType([StructField('invoice_and_item_number', StringType(), True), StructField('date', DateType(), True), StructField('store_number', StringType(), True), StructField('store_name', StringType(), True), StructField('address', StringType(), True), StructField('city', StringType(), True), StructField('zip_code', StringType(), True), StructField('store_location', StringType(), True), StructField('county_number', StringType(), True), StructField('county', StringType(), True), StructField('category', StringType(), True), StructField('category_name', StringType(), True), StructField('vendor_number', StringType(), True), StructField('vendor_name', StringType(), True), StructField('item_number', StringType(), True), StructField('item_description', StringType(), True), StructField('pack', LongType(), True), StructField('bottle_volume_ml', LongType(), True), StructField('state_bottle_cost', DoubleType(), True), StructField('state_bottle_retail', DoubleType(), True), StructField('bottles_sold

In [13]:
df.select("invoice_and_item_number", "store_name", "store_number").show()

+-----------------------+--------------------+------------+
|invoice_and_item_number|          store_name|store_number|
+-----------------------+--------------------+------------+
|       RINV-04661200006|MART STOP #1 / DA...|        4640|
|       RINV-04206100001|KIMBERLY MART / D...|        3715|
|       RINV-05530900056|QUIK TRIP #544 / ...|        4622|
|       RINV-05460000115|      KEOKUK SPIRITS|        2191|
|       RINV-05111400055|MUGHAL, INC. / WA...|       10200|
|       RINV-04318300141|HY-VEE FOOD STORE...|        2555|
|       RINV-05572400037|HY-VEE FOOD STORE...|        2601|
|       RINV-05560100015|CASEY'S GENERAL S...|        5317|
|       RINV-04356100115|WAL-MART 4256 / AMES|        4004|
|       RINV-05073100131|      KEOKUK SPIRITS|        2191|
|       RINV-04910100113|HY-VEE FOOD STORE...|        2622|
|       RINV-04850600171|HY-VEE FOOD STORE...|        2666|
|       RINV-04629500034|FAREWAY STORES #0...|        4092|
|       RINV-04616200014|YESWAY STORE # 

In [14]:
df.show()

+-----------------------+----------+------------+--------------------+--------------------+------------+--------+--------------------+-------------+-----------+---------+--------------------+-------------+--------------------+-----------+--------------------+----+----------------+-----------------+-------------------+------------+------------+------------------+-------------------+
|invoice_and_item_number|      date|store_number|          store_name|             address|        city|zip_code|      store_location|county_number|     county| category|       category_name|vendor_number|         vendor_name|item_number|    item_description|pack|bottle_volume_ml|state_bottle_cost|state_bottle_retail|bottles_sold|sale_dollars|volume_sold_liters|volume_sold_gallons|
+-----------------------+----------+------------+--------------------+--------------------+------------+--------+--------------------+-------------+-----------+---------+--------------------+-------------+--------------------+----

In [15]:
df.limit(10)

DataFrame[invoice_and_item_number: string, date: date, store_number: string, store_name: string, address: string, city: string, zip_code: string, store_location: string, county_number: string, county: string, category: string, category_name: string, vendor_number: string, vendor_name: string, item_number: string, item_description: string, pack: bigint, bottle_volume_ml: bigint, state_bottle_cost: double, state_bottle_retail: double, bottles_sold: bigint, sale_dollars: double, volume_sold_liters: double, volume_sold_gallons: double]

In [16]:
df

DataFrame[invoice_and_item_number: string, date: date, store_number: string, store_name: string, address: string, city: string, zip_code: string, store_location: string, county_number: string, county: string, category: string, category_name: string, vendor_number: string, vendor_name: string, item_number: string, item_description: string, pack: bigint, bottle_volume_ml: bigint, state_bottle_cost: double, state_bottle_retail: double, bottles_sold: bigint, sale_dollars: double, volume_sold_liters: double, volume_sold_gallons: double]

In [17]:
from graphviz import Digraph

In [18]:
# Extract month
df = df.withColumn("month", F.month("date"))

# Calculate monthly sales
monthly_sales = df.groupBy("month") \
    .agg(F.sum("sale_dollars").alias("monthly_total_sales"))

# Join back to original DataFrame
df = df.join(monthly_sales, on="month", how="left")

In [19]:
# --------------------------------------
# 1. Sales Performance Flowchart (Spark Version)
# --------------------------------------
def create_sales_performance_flow(df):
    flowchart = Digraph("Sales_Performance", format="png")
    flowchart.attr(rankdir="LR", bgcolor="white")

    # Calculate metrics
    total_sales = df.agg(F.sum("sale_dollars")).collect()[0][0]
    top_product = df.groupBy("item_description").agg(F.sum("sale_dollars").alias("total")) \
                   .orderBy(F.desc("total")).first()["item_description"]
    top_county = df.groupBy("county").agg(F.sum("sale_dollars").alias("total")) \
                  .orderBy(F.desc("total")).first()["county"]

    # Nodes
    flowchart.node("Sales", f"Total Sales\n${total_sales:,.0f}", shape="box", fillcolor="#e6f3ff")
    flowchart.node("Product", f"Top Product\n{top_product}", shape="box", fillcolor="#d4f7d4")
    flowchart.node("County", f"Top County\n{top_county}", shape="box", fillcolor="#d4f7d4")
    flowchart.node("Trends", "Monthly Trends", shape="ellipse", fillcolor="#ffe6e6")

    # Edges
    flowchart.edge("Sales", "Product", label="Driven by")
    flowchart.edge("Sales", "County", label="Top Region")
    flowchart.edge("Sales", "Trends", label="Over Time")

    # Monthly trends
    monthly_data = df.groupBy("month").agg(F.sum("sale_dollars").alias("total")) \
                   .orderBy("month").collect()

    with flowchart.subgraph(name="cluster_trends") as c:
        c.attr(label="Monthly Sales", style="dashed")
        for row in monthly_data:
            month = row["month"]
            sales = row["total"]
            c.node(f"Month_{month}", f"Month {month}\n${sales:,.0f}", shape="note")

    return flowchart

# Generate and render
sales_flow = create_sales_performance_flow(df)
sales_flow.render("sales_performance_flow_spark", view=True)
print("sales_performance_flow_spark graph generated!")

sales_performance_flow_spark graph generated!


In [20]:
from graphviz import Digraph
from pyspark.sql.functions import col, sum, avg, count_distinct

In [22]:
# Calculate all metrics in one Spark query
metrics = df.agg(
    sum("sale_dollars").alias("total_sales"),
    sum("bottles_sold").alias("total_bottles"),
    sum("volume_sold_liters").alias("total_liters"),
    avg(col("state_bottle_retail") - col("state_bottle_cost")).alias("profit_per_bottle"),
    (sum("sale_dollars") / count_distinct("invoice_and_item_number")).alias("aov"),
    (count_distinct("invoice_and_item_number") / count_distinct("store_number")).alias("aof")
).collect()[0]

# Create Graphviz visualization
g = Digraph('PerformanceDashboard', format='png')
g.attr(rankdir='TB', bgcolor='white', layout='neato')

# Main container
with g.subgraph(name='cluster_main') as c:
    c.attr(label='Iowa Liquor Sales Performance', style='filled', fillcolor='#f0f0f0', fontsize='16')

    # First row metrics
    with c.subgraph(name='cluster_row1') as row1:
        row1.attr(rank='same')
        row1.node('TS', f"Total Sales\n${metrics['total_sales']:,.0f}",
                 shape='box', style='filled', fillcolor='#e6f3ff', width='2', height='1.5')
        row1.node('TB', f"Total Bottles\n{metrics['total_bottles']:,.0f}",
                 shape='box', style='filled', fillcolor='#e6f3ff', width='2', height='1.5')
        row1.node('TV', f"Total Volume\n{metrics['total_liters']:,.0f} L",
                 shape='box', style='filled', fillcolor='#e6f3ff', width='2', height='1.5')

    # Second row metrics
    with c.subgraph(name='cluster_row2') as row2:
        row2.attr(rank='same')
        row2.node('PPB', f"Profit/Bottle\n${metrics['profit_per_bottle']:.2f}",
                 shape='box', style='filled', fillcolor='#d4f7d4', width='2', height='1.5')
        row2.node('AOV', f"Avg Order Value\n${metrics['aov']:.2f}",
                 shape='box', style='filled', fillcolor='#d4f7d4', width='2', height='1.5')
        row2.node('AOF', f"Avg Order Freq\n{metrics['aof']:.2f}/store",
                 shape='box', style='filled', fillcolor='#d4f7d4', width='2', height='1.5')

# Add invisible edges to force layout
c.edges([
    ('TS', 'TB'),
    ('TB', 'TV'),
    ('PPB', 'AOV'),
    ('AOV', 'AOF')
])

g.render('performance_dashboard', view=True)
print("metrics graph generated!")

metrics graph generated!


In [23]:
from pyspark.sql.functions import year, month

# Monthly sales trend
monthly_sales = df.groupBy(year("date").alias("year"), month("date").alias("month")) \
    .agg(sum("sale_dollars").alias("sales")) \
    .orderBy("year", "month") \
    .collect()

# Generate Graphviz timeline
g = Digraph('SalesTrend', format='pdf')
g.attr(rankdir='LR', bgcolor='white')

prev_node = None
for row in monthly_sales:
    node_id = f"{row['year']}-{row['month']}"
    g.node(node_id, f"{row['year']}-{row['month']}\n${row['sales']:,.0f}", shape='note')
    if prev_node:
        g.edge(prev_node, node_id, style='dashed')
    prev_node = node_id

g.render('sales_trend_timeline')

'sales_trend_timeline.png'

In [None]:
# Top 10 counties by sales
county_sales = df.groupBy("county") \
    .agg(sum("sale_dollars").alias("sales")) \
    .orderBy(col("sales").desc()) \
    .limit(10) \
    .collect()

g = Digraph('CountySales', format='png')
g.attr(rankdir='TB', bgcolor='white')

for row in county_sales:
    g.node(row['county'],
           f"{row['county']}\n${row['sales']:,.0f}",
           shape='folder',
           fillcolor='#e6f3ff')

g.render('county_sales_hierarchy')

In [25]:
# Top 10 products
top_products = df.groupBy("item_description") \
    .agg(sum("sale_dollars").alias("sales")) \
    .orderBy(col("sales").desc()) \
    .limit(10) \
    .collect()

g = Digraph('TopProducts', format='png')
g.attr(rankdir='TB', bgcolor='white')

for i, row in enumerate(top_products):
    g.node(str(i),
           f"{row['item_description']}\n${row['sales']:,.0f}",
           shape='box',
           fillcolor='#d4f7d4')

g.render('top_products_ranking')

'top_products_ranking.png'

In [28]:
from graphviz import Digraph
from pyspark.sql.functions import col, sum, lag, window
from pyspark.sql.window import Window
import matplotlib.cm as cm
import matplotlib.colors as mcolors

# 1. Calculate profit margins and time trends
# ==============================================
# Profit margin calculation
df = df.withColumn("profit_margin",
                  (col("state_bottle_retail") - col("state_bottle_cost")) / col("state_bottle_retail"))

# Time trend calculation (MoM growth)
window_spec = Window.partitionBy("county", "store_number", "item_description") \
                  .orderBy("date") \
                  # .rowsBetween(-1, 0)

df = df.withColumn("prev_month_sales",
                  lag("sale_dollars", 1).over(window_spec)) \
       .withColumn("sales_growth",
                  (col("sale_dollars") - col("prev_month_sales")) / col("prev_month_sales"))

# 2. Color mapping setup
# ==============================================
cmap = cm.get_cmap('RdYlGn')  # Red-Yellow-Green colormap
norm = mcolors.Normalize(vmin=0, vmax=0.5)  # 0-50% margin range

def margin_to_color(margin):
    return "#{:02x}{:02x}{:02x}".format(*[int(255*x) for x in cmap(norm(margin))][:3])

# 3. Hierarchical visualization with enhancements
# ==============================================
def create_enhanced_county_view(df):
    g = Digraph('EnhancedCountyView', format='pdf')
    g.attr(rankdir='TB', bgcolor='white', layout='dot')

    # Get top 10 counties
    counties = df.groupBy("county") \
               .agg(sum("sale_dollars").alias("total_sales"),
                    avg("sales_growth").alias("avg_growth")) \
               .orderBy(col("total_sales").desc()) \
               .limit(10) \
               .collect()

    max_county_sales = max([c['total_sales'] for c in counties])

    for county in counties:
        # County node with dynamic width and growth label
        county_width = 1 + (county['total_sales'] / max_county_sales) * 1.5
        growth_label = f"{'+' if county['avg_growth'] > 0 else ''}{county['avg_growth']*100:.1f}% MoM" if county['avg_growth'] else "N/A"

        g.node(county['county'],
              f"{county['county']}\n${county['total_sales']/1e6:.1f}M\n({growth_label})",
              shape='folder',
              style='filled',
              fillcolor='#e6f3ff',
              width=str(county_width),
              fontsize='14')

        # Get top 3 stores
        stores = df.filter(col("county") == county['county']) \
                 .groupBy("store_number", "store_name") \
                 .agg(sum("sale_dollars").alias("store_sales"),
                      avg("sales_growth").alias("store_growth")) \
                 .orderBy(col("store_sales").desc()) \
                 .limit(3) \
                 .collect()

        max_store_sales = max([s['store_sales'] for s in stores]) if stores else 1

        for store in stores:
            # Store node with dynamic width
            store_width = 0.5 + (store['store_sales'] / max_store_sales) * 1.0
            growth_label = f"{'+' if store['store_growth'] > 0 else ''}{store['store_growth']*100:.1f}%"

            g.node(str(store['store_number']),
                  f"{store['store_name']}\n${store['store_sales']/1e3:.1f}K\n({growth_label})",
                  shape='box',
                  style='filled',
                  fillcolor='#d4f7d4',
                  width=str(store_width))

            g.edge(county['county'], str(store['store_number']))

            # Get top 2 products
            products = df.filter(col("store_number") == store['store_number']) \
                       .groupBy("item_description") \
                       .agg(sum("sale_dollars").alias("product_sales"),
                            avg("profit_margin").alias("avg_margin")) \
                       .orderBy(col("product_sales").desc()) \
                       .limit(2) \
                       .collect()

            max_product_sales = max([p['product_sales'] for p in products]) if products else 1

            for product in products:
                # Product node with color-coded margin
                product_width = 0.3 + (product['product_sales'] / max_product_sales) * 0.7
                color = margin_to_color(product['avg_margin'])

                g.node(product['item_description'],
                      f"{product['item_description'][:15]}...\n${product['product_sales']:,.0f}",
                      shape='ellipse',
                      style='filled',
                      fillcolor=color,
                      width=str(product_width),
                      fontcolor='black' if product['avg_margin'] > 0.2 else 'white')

                g.edge(str(store['store_number']), product['item_description'])

    return g

# 4. Execute and render
# ==============================================
enhanced_graph = create_enhanced_county_view(df)
enhanced_graph.render('enhanced_county_view', view=True)

  cmap = cm.get_cmap('RdYlGn')  # Red-Yellow-Green colormap


'enhanced_county_view.pdf'