<a href="https://colab.research.google.com/github/Eriya18/spark-library-management/blob/main/Library_ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [71]:
# Install dependencies
!pip install pyspark pyarrow pandas

# Create data folders
import os

BASE = "/content/data"
STREAM_DIR = f"{BASE}/stream_input"

os.makedirs(BASE, exist_ok=True)
os.makedirs(STREAM_DIR, exist_ok=True)

print("Folders ready!")


Folders ready!


In [72]:
!python gen_books.py
!python gen_users.py
!python gen_categories.py
!python gen_loans_txt.py
!python gen_returns_parquet.py

import os
print("Generated files:", os.listdir("/content/data"))


books.json generated
users.csv generated
categories.csv generated
loans.txt generated
Traceback (most recent call last):
  File "/content/spark-library-management/gen_returns_parquet.py", line 18, in <module>
    df.to_parquet('data/returns.parquet', index=False)
  File "/usr/local/lib/python3.12/dist-packages/pandas/util/_decorators.py", line 333, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/pandas/core/frame.py", line 3113, in to_parquet
    return to_parquet(
           ^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/pandas/io/parquet.py", line 480, in to_parquet
    impl.write(
  File "/usr/local/lib/python3.12/dist-packages/pandas/io/parquet.py", line 228, in write
    self.api.parquet.write_table(
  File "/usr/local/lib/python3.12/dist-packages/pyarrow/parquet/core.py", line 1902, in write_table
    with ParquetWriter(
         ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages

In [73]:
import shutil
shutil.rmtree("data/returns.parquet", ignore_errors=True)
print("Deleted directory named returns.parquet.")


Deleted directory named returns.parquet.


In [74]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Library_ETL_Colab") \
    .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", "4")
spark.conf.set("spark.sql.session.timeZone", "UTC")

print("Spark started.")


Spark started.


In [75]:
import os

# Delete old parquet
os.remove("/content/data/returns.parquet")
print("Old returns.parquet deleted.")

# Create Spark-compatible parquet
import random
from datetime import datetime, timedelta
from pyspark.sql.functions import to_timestamp

NUM = 120
loans=[f"L{i:06d}" for i in range(1,301)]
users=[f"U{i:05d}" for i in range(1,201)]
books=[f"B{i:05d}" for i in range(1,151)]

rows=[]
for i in range(NUM):
    loan_id=random.choice(loans)
    ts = (datetime.now() - timedelta(days=random.randint(0,30))).strftime("%Y-%m-%d %H:%M:%S")
    rows.append((loan_id, random.choice(users), random.choice(books), ts))

df = spark.createDataFrame(rows, ["loan_id","user_id","book_id","returned_at"])
df = df.withColumn("returned_at", to_timestamp("returned_at"))
df.write.mode("overwrite").parquet("data/returns.parquet")

print("New returns.parquet created!")


IsADirectoryError: [Errno 21] Is a directory: '/content/data/returns.parquet'

In [None]:
from pyspark.sql.functions import to_timestamp

BASE = "/content/data"

books = spark.read.option("multiline", "true").json(f"{BASE}/books.json")
users = spark.read.csv(f"{BASE}/users.csv", header=True, inferSchema=True)
categories = spark.read.csv(f"{BASE}/categories.csv", header=True, inferSchema=True)
loans = spark.read.option("header", True).option("delimiter", "|").csv(f"{BASE}/loans.txt")
returns = spark.read.parquet(f"{BASE}/returns.parquet")

print("Batch data loaded!")


In [None]:
loans = loans.withColumn("loan_date", to_timestamp("loan_date")) \
             .withColumn("due_date", to_timestamp("due_date"))

returns = returns.withColumn("returned_at", to_timestamp("returned_at"))


In [None]:
from pyspark.sql.types import StructType, StructField, StringType

EVENT_SCHEMA = StructType([
    StructField("event_id", StringType(), True),
    StructField("loan_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("book_id", StringType(), True),
    StructField("loan_date", StringType(), True),
    StructField("due_date", StringType(), True),
    StructField("status", StringType(), True),
])


In [None]:
from pyspark.sql.functions import to_timestamp, expr

STREAM_DIR = "/content/data/stream_input"
OUTPUT = "/content/data/stream_output"
CHECKPOINT = "/content/data/stream_checkpoint"

stream_df = (
    spark.readStream
        .schema(EVENT_SCHEMA)
        .json(STREAM_DIR)
)

stream_df = (
    stream_df
        .withColumn("loan_date", to_timestamp("loan_date"))
        .withColumn("due_date", to_timestamp("due_date"))
        .withColumn("event_date", expr("date(loan_date)"))
)

query = (
    stream_df.writeStream
        .format("parquet")
        .option("path", OUTPUT)
        .option("checkpointLocation", CHECKPOINT)
        .outputMode("append")
        .start()
)

print("ðŸ”¥ Streaming started in background.")


In [None]:
import json, uuid, time

for i in range(3):
    event = {
        "event_id": str(uuid.uuid4()),
        "loan_id": f"LS{i:05d}",
        "user_id": "U00001",
        "book_id": "B00001",
        "loan_date": "2025-01-01 10:00:00",
        "due_date": "2025-01-14 10:00:00",
        "status": "borrowed"
    }

    with open(f"/content/data/stream_input/event_{i}.json", "w") as f:
        json.dump(event, f)

    print("Generated:", event)
    time.sleep(2)


In [None]:
df = spark.read.parquet("/content/data/stream_output")
df.show(10, truncate=False)


In [None]:
from pyspark.sql.functions import col

loans_books = loans.join(books, on="book_id", how="left")
loans_books_users = loans_books.join(users, on="user_id", how="left")
loans_full = loans_books_users.join(categories, on="category_id", how="left")
loans_with_returns = loans_full.join(
    returns,
    on=["loan_id", "user_id", "book_id"],
    how="left"
)

stream_events = spark.read.parquet("/content/data/stream_output")

all_loans_final = loans_with_returns.unionByName(
    stream_events, allowMissingColumns=True
)

all_loans_final.show(20, truncate=False)


In [None]:
!pip install dash==2.11.1 jupyter-dash pyngrok plotly pandas pyarrow fastparquet


In [None]:
from pyngrok import ngrok
from pyngrok.exception import PyngrokNgrokHTTPError
import time

ngrok.set_auth_token("35lkbwn07pXhttFG2rebGL1aO5x_7Sek5qRH7QNSwqoXkYxq")

public_url = None
max_retries = 5 # Increased max retries
retry_delay_seconds = 15 # Increased delay between connection attempts

for attempt in range(max_retries):
    print(f"Attempt {attempt + 1} to connect ngrok...")
    try:
        ngrok.kill()
        # Wait for the ngrok process to fully terminate and for the server-side endpoint to potentially clear.
        time.sleep(retry_delay_seconds) # Use the same delay for cleanup before attempting connection
        public_url = ngrok.connect(8050)
        print("ðŸ”— Dashboard URL:", public_url)
        break # Success, exit loop
    except PyngrokNgrokHTTPError as e:
        if "already online" in str(e) or "ERR_NGROK_334" in str(e):
            print(f"Ngrok endpoint still online. Retrying in {retry_delay_seconds} seconds...")
            if attempt < max_retries - 1: # Only sleep if more retries are pending
                time.sleep(retry_delay_seconds)
            else: # If max retries reached for this specific error
                print("Max retries reached for 'endpoint already online' error.")
                break
        else:
            print(f"An unexpected ngrok error occurred: {e}")
            break # Other error, no point in retrying
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        break

if public_url is None:
    print("Failed to establish ngrok tunnel after multiple attempts.")

In [None]:
import dash
from dash import html, dcc, Output, Input
import pandas as pd
import plotly.express as px
import os

# Data folder
DATA_DIR = "/content/data/stream_output"

app = dash.Dash(__name__)
server = app.server

def load_data():
    if not os.path.exists(DATA_DIR):
        return pd.DataFrame()

    files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR) if f.endswith(".parquet")]
    if not files:
        return pd.DataFrame()

    df = pd.concat([pd.read_parquet(f) for f in files], ignore_index=True)
    return df

app.layout = html.Div([
    html.H2("ðŸ“˜ Real-Time Library Dashboard"),
    dcc.Interval(id="refresh", interval=5000, n_intervals=0),
    dcc.Graph(id="live_chart")
])

@app.callback(
    Output("live_chart", "figure"),
    Input("refresh", "n_intervals")
)
def update_graph(n):
    df = load_data()
    if df.empty:
        return px.bar(title="No streaming data yet")

    return px.bar(df, x="book_id", title="Live Streaming Events Count")



In [None]:
app.run_server(host="0.0.0.0", port=8050)
