In [1]:
import plotly.graph_objects as go
import plotly.offline as pyo

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType

import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
spark = (
    SparkSession.builder.appName("IcebergLocalDevelopment")
    .config(
        "spark.jars.packages",
        "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2,org.xerial:sqlite-jdbc:3.42.0.0",
    )
    .config("spark.sql.crossJoin.enabled", "true")
    .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .config("spark.sql.catalog.default", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.default.type", "jdbc")
    # After error with both catalog-impl and type, we use only type
    # .config(
    #     "spark.sql.catalog.default.catalog-impl",
    #     "org.apache.iceberg.jdbc.JdbcCatalog",
    # )
    .config(
        "spark.sql.catalog.default.uri",
        "jdbc:sqlite:/tmp/warehouse/pyiceberg_catalog.db",
    )
    .config(
        "spark.sql.catalog.default.warehouse",
        "hdfs://localhost:9000/user/hive/warehouse",
    )
    .config("spark.sql.catalog.default.jdbc.driver", "org.sqlite.JDBC")
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

25/06/11 13:35:16 WARN Utils: Your hostname, rohitkarki resolves to a loopback address: 127.0.1.1; using 10.13.164.166 instead (on interface wlp4s0)
25/06/11 13:35:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/rohitkarki/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/rohitkarki/.ivy2/cache
The jars for the packages stored in: /home/rohitkarki/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
org.xerial#sqlite-jdbc added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1b5d5b1f-7ec6-430e-bf6d-1d0e4453269f;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.5.2 in central
	found org.xerial#sqlite-jdbc;3.42.0.0 in central
:: resolution report :: resolve 152ms :: artifacts dl 2ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.5.2 from central in [default]
	org.xerial#sqlite-jdbc;3.42.0.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      de

25/06/11 13:35:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Check current catalog and namespace
spark.sql("SHOW CURRENT NAMESPACE").show()

# List namespaces in the default catalog specifically
spark.sql("SHOW NAMESPACES IN default").show()

# If sales namespace exists, try to use it
spark.sql("USE default.sales")
spark.sql("SHOW TABLES").show()


+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|  default|
+-------------+---------+

25/06/11 13:35:40 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
+---------+
|namespace|
+---------+
|    sales|
+---------+

+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|    sales|       Drivers|      false|
|    sales|ai_job_dataset|      false|
|    sales|     employees|      false|
|    sales|  transactions|      false|
|    sales|      vehicles|      false|
+---------+--------------+-----------+



In [4]:
spark.sql(
    """
SELECT COUNT(latest_inspection_result) as count_of_inspection_result, latest_inspection_result
FROM sales.vehicles
GROUP BY latest_inspection_result
ORDER BY count_of_inspection_result DESC
LIMIT 10
    """).show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------------+------------------------+
|count_of_inspection_result|latest_inspection_result|
+--------------------------+------------------------+
|                      1143|                    Pass|
|                         6|                    Fail|
+--------------------------+------------------------+



                                                                                

In [4]:
spark.sql(
    """
SELECT COUNT(*) as count, table.status
FROM sales.Drivers as table
GROUP BY status
ORDER BY COUNT(*) DESC
"""
).show(10)

+-----+--------------------+
|count|              status|
+-----+--------------------+
| 2579|          Incomplete|
|  440|Approved - Licens...|
|  153|              Denied|
|   83|        Under Review|
|    7|Pending Fitness I...|
+-----+--------------------+



                                                                                

In [4]:
df = spark.table("sales.Drivers").groupBy("status").count().orderBy("count", ascending=False)
df.show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+-----+
|              status|count|
+--------------------+-----+
|          Incomplete| 2579|
|Approved - Licens...|  440|
|              Denied|  153|
|        Under Review|   83|
|Pending Fitness I...|    7|
+--------------------+-----+



                                                                                

In [7]:
# --- Collect Spark DataFrame results to a Python list for Plotly ---
# .collect() brings all results to the driver program. Be cautious with large datasets.
data_for_plotly = df.collect()

# Convert Row objects to dictionaries and extract statuses and counts
statuses = [row['status'] for row in data_for_plotly]
counts = [row['count'] for row in data_for_plotly]

# --- Plotly Chart Generation ---
# Create a bar trace
trace = go.Bar(
    x=statuses,
    y=counts,
    marker=dict(
        color='rgb(58, 148, 240)', # A pleasant blue color for the bars
        line=dict(
            width=1.5,
            color='rgb(255,255,255)' # White border for bars
        )
    ),
    name='Application Status Count',
    hovertemplate='<b>Status:</b> %{x}<br><b>Count:</b> %{y}<extra></extra>' # Custom hover text
)

# Define the layout of the chart
layout = go.Layout(
    title=dict(
        text='Count by Application Status', # Chart title
        font=dict(
            family='Inter',
            size=20,
            color='#333'
        )
    ),
    xaxis=dict(
        title=dict(
            text='Application Status', # X-axis title
            font=dict(
                family='Inter',
                size=16,
                color='#555'
            )
        ),
        tickangle=-45, # Angle the x-axis labels for better readability
        automargin=True, # Automatically adjust margins for long labels
        tickfont=dict(
            family='Inter',
            size=12,
            color='#666'
        )
    ),
    yaxis=dict(
        title=dict(
            text='Number of Applications', # Y-axis title
            font=dict(
                family='Inter',
                size=16,
                color='#555'
            )
        ),
        rangemode='tozero', # Ensure y-axis starts from zero
        tickfont=dict(
            family='Inter',
            size=12,
            color='#666'
        )
    ),
    margin=dict(
        l=60, r=20, t=70, b=120 # Adjust margins to prevent labels from being cut off
    ),
    plot_bgcolor='rgba(0,0,0,0)', # Transparent plot background
    paper_bgcolor='rgba(0,0,0,0)', # Transparent paper background
    font=dict(
        family='Inter',
        color='#333'
    )
)

# Create the figure object
fig = go.Figure(data=[trace], layout=layout)
fig.show()

In [None]:
# Set default catalog to local for convenience
# spark.sql("USE default")

# # Create sample data
# data = [
#     (1, "electronics", 299.99),
#     (2, "clothing", 79.99),
#     (3, "groceries", 45.50),
#     (4, "electronics", 999.99),
#     (5, "clothing", 120.00),
# ]

# # Define schema
# schema = StructType(
#     [
#         StructField("id", LongType(), True),
#         StructField("category", StringType(), True),
#         StructField("amount", DoubleType(), True),
#     ]
# )

# # # Create DataFrame
# # df = spark.createDataFrame(data, schema)

# # Write DataFrame to Iceberg table
# # df.write.format("iceberg").mode("overwrite").saveAsTable("hello.sales_data_df")

# # Read the data back
# print("Data written via DataFrame API:")
# spark.table("hello.sales_data_df").show()

# # Perform analytics
# print("Category-wise totals:")
# spark.sql(
#     """
# SELECT category, 
#        COUNT(*) as count, 
#        SUM(amount) as total_amount,
#        AVG(amount) as avg_amount
# FROM hello.sales_data_df 
# GROUP BY category 
# ORDER BY total_amount DESC
# """
# ).show()


25/06/10 10:51:58 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
Available catalogs:
+-------+
|catalog|
+-------+
|default|
+-------+

Data written via DataFrame API:


                                                                                

+---+-----------+------+
| id|   category|amount|
+---+-----------+------+
|  1|electronics|299.99|
|  6|      books| 25.99|
|  2|   clothing| 79.99|
|  3|  groceries|  45.5|
|  7|electronics|599.99|
|  4|electronics|999.99|
|  5|   clothing| 120.0|
+---+-----------+------+

Category-wise totals:
+-----------+-----+------------+-----------------+
|   category|count|total_amount|       avg_amount|
+-----------+-----+------------+-----------------+
|electronics|    3|     1899.97|633.3233333333334|
|   clothing|    2|      199.99|           99.995|
|  groceries|    1|        45.5|             45.5|
|      books|    1|       25.99|            25.99|
+-----------+-----+------------+-----------------+

