In [None]:
# To connect to the Jupiter Server
# In the docker attach screen, look for a message like this:
#    http://127.0.0.1:8888/lab?token=xxxx
# The "xxxx" is the password to be used when the Jupyter Kernel Connection ask for it...
# Then select the "Existing Jupiter Server" option
# Specify the URL: http://127.0.0.1:8888
# Specify the password: xxxx
# Select the desired Kernel from the list

# References:
# google: what is the default python jupiterlab server password
# https://stackoverflow.com/questions/41117554/what-is-default-password-for-jupyter-created-on-googles-data-proc

# https://github.com/jupyter/notebook/commit/7fa5d5a1be147e9c8e14f61a2f4b3c0db1e2c00b
# For servers with token-authentication enabled, the URL in the above listing will include the token,
# so you can copy and paste that URL into your browser to login.

Step 1.1: Setup Environment

In [None]:
# Init Minio
!sh /home/PyCon2024/Project/Scripts/1.init_minio.sh "data/raygun"

In [None]:
# Install necessary packages
# !pip install pyspark
# !pip install s3fs
# !pip install minio
# !pip install pyhive
import sys
!{sys.executable} -m pip install pyspark
!{sys.executable} -m pip install s3fs
!{sys.executable} -m pip install minio
!{sys.executable} -m pip install pyhive
!{sys.executable} -m pip install trino

In [None]:
# Install dotenv to load environment variables
!{sys.executable} -m pip install python-dotenv

In [None]:
# Load environment variables
import os
from dotenv import load_dotenv
load_dotenv('minio.env')

# Access the environment variables
minio_access_key = os.getenv('MINIO_ACCESS_KEY')
minio_secret_key = os.getenv('MINIO_SECRET_KEY')
minio_endpoint = os.getenv('MINIO_ENDPOINT', "http://minio:9000")
minio_bucket_name = os.getenv('MINIO_BUCKET_NAME', "data-lakehouse")

In [None]:
print("Minio Access Key:", minio_access_key)
print("Minio Secret Key:", minio_secret_key)
print("Minio Endpoint:", minio_endpoint)
print("Minio Bucket Name:", minio_bucket_name)

In [None]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from minio import Minio
import os

In [None]:
import pyspark
import pyspark.sql. functions as sqlF

from sqlalchemy.sql import text
from sqlalchemy import create_engine

import pandas as pd

In [None]:
# Initialize Spark session
# spark = SparkSession.builder \
#     .appName("RaygunErrorTraceAnalysis") \
#     .config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
#     .config("spark.hadoop.fs.s3a.access.key", minio_access_key) \
#     .config("spark.hadoop.fs.s3a.secret.key", minio_secret_key) \
#     .config("spark.hadoop.fs.s3a.path.style.access", True) \
#     .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
#     .config("spark.jars.packages", "io.delta:delta-core_2.13:2.1.0") \
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
#     .enableHiveSupport() \
#     .getOrCreate()

spark = SparkSession.builder \
    .appName("RaygunErrorTraceAnalysis") \
    .config("spark.driver.host", "localhost") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0,org.apache.hadoop:hadoop-aws:3.3.3") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("hive.metastore.uris", "thrift://metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
spark.sparkContext.getConf().getAll()

Step 1.2: Upload Multiple JSON Files to MinIO

In [None]:
# Local directory path containing JSON files
local_directory = "../data/raygun"

# MinIO bucket name
bucket_name = minio_bucket_name

# Path to the JSON files in MinIO
json_files_path = f"s3a://{minio_bucket_name}/Raw/raygun/"

# Minio endpoint (only domian and port)
minio_endpoint_domain_port = minio_endpoint.replace('http://', '').replace('https://', '')

print("Local directory:", local_directory)
print("Minio bucket name:", bucket_name)
print("JSON files path:", json_files_path)
print("Minio endpoint (domain and port only):", minio_endpoint_domain_port)

In [None]:
# Initialize Minio client
minio_client = Minio(
    minio_endpoint_domain_port,
    access_key=minio_access_key,
    secret_key=minio_secret_key,
    secure=False
)

In [None]:
# Create the bucket if it doesn't exist
if minio_client.bucket_exists(bucket_name):
    print(f"Bucket {bucket_name} already exists")
else:
    minio_client.make_bucket(bucket_name)
    # Upload JSON files to MinIO
    for filename in os.listdir(local_directory):
        if filename.endswith(".json"):
            file_path = os.path.join(local_directory, filename)
            minio_client.fput_object(bucket_name, filename, file_path)
            print(f"Uploaded {filename} to {bucket_name}")

Step 2: Read Multiple JSON Files from MinIO

In [None]:
# Read JSON files into DataFrame
print(f"Set 'df' from: {json_files_path}")
df = spark.read.option("multiline","true").json(json_files_path)

In [None]:
df.printSchema()

In [None]:
# df.show(truncate=False)
df.show()

Step 3: Process JSON Data

In [None]:
# Flatten the nested structure for easier analysis
df_flattened = df.select(
    col("Error.Message").alias("ErrorMessage"),
    col("Error.ClassName").alias("ErrorClassName"),
    col("Error.FileName").alias("ErrorFileName"),
    explode("Error.StackTrace").alias("StackTrace"),
    col("MachineName"),
    col("Request.HostName").alias("RequestHostName"),
    col("Request.Url").alias("RequestUrl"),
    col("Request.HttpMethod").alias("RequestHttpMethod"),
    col("Request.IpAddress").alias("RequestIpAddress"),
    col("Request.QueryString"),
    col("Request.Headers"),
    col("Request.Data")
)

In [None]:
df_flattened.printSchema()

In [None]:
df_flattened.show(truncate=False)

Step 4: Save Data into Apache Hive

In [None]:
hive_location = "/opt/hive/data/warehouse"
dest = f"{hive_location}/raygun_error_traces"

import shutil
shutil.rmtree(dest, ignore_errors=True)


In [None]:
# Save the processed data into Hive table
df_flattened \
    .write \
    .mode("overwrite") \
    .saveAsTable("raygun_error_traces")


In [None]:
# Verify the data is saved correctly
spark.sql("SELECT * FROM raygun_error_traces LIMIT 10").show(truncate=False)

In [None]:
# Get the IP addresses summary
spark.sql("SELECT RequestIpAddress FROM raygun_error_traces GROUP BY RequestIpAddress").show(truncate=False)

Step 5: Query with Trino

In [None]:
# Connect to Trino

# With trino-python-client
# https://github.com/trinodb/trino-python-client

from trino.dbapi import connect
conn = connect(
    host='trino',
    port=8081,
    user='admin',
    catalog='hive',
    schema='default',
)
# Create a cursor object using the cursor() method
cursor = conn.cursor()

In [None]:
# Connect to Trino

# With SQLAlchhemy
# https://github.com/trinodb/trino-python-client?tab=readme-ov-file#sqlalchemy

# engine = create_engine('trino://admin@trino:8081')
# conn = engine.connect()

In [None]:

# Execute a query
cursor.execute("SELECT * FROM raygun_error_traces LIMIT 10")

In [None]:
# Fetch the data
rows = cursor.fetchall()

In [None]:
# Display the data
for row in rows:
    print(row)