In [1]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=dc278d385d9a4ccc2bcc6e23ffc65681f6b11b7c12aec31c15527d20b765cc43
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from google.colab import files
import os

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("KeyValueStore") \
    .getOrCreate()

# Directory to store uploaded files
upload_dir = "uploads"
os.makedirs(upload_dir, exist_ok=True)

In [3]:
# Initialize empty DataFrame to mimic key-value store
schema = "key STRING, value BINARY"
kv_store = spark.createDataFrame([], schema)


In [4]:
# Function to print help
def print_help():
    print("?\t- Print this help")
    print("r\t- Read a key")
    print("ra\t- Read all keys")
    print("u\t- Upload file for a new key")
    print("ub\t- Upload files for new keys in bulk")
    print("d\t- Delete key")
    print("q\t- Quit")

# Function to read a key
def read_key():
    key = input("Enter key to read: ")
    value = kv_store.filter(kv_store.key == key).select("value").collect()
    if value:
        save_value_as_file(key, value[0].value)
    else:
        print("Key not found.")

# Function to save value as a file
def save_value_as_file(key, value):
    filename = os.path.join(upload_dir, f"{key}")
    with open(filename, "wb") as f:
        f.write(value)
    print(f"Value saved as file: {filename}")

# Function to read all keys
def read_all_keys():
    keys = kv_store.select("key").collect()
    if keys:
        print("Keys in dictionary:")
        for row in keys:
            print(row.key)
    else:
        print("Dictionary is empty.")

# Function to delete a key
def delete_key():
    key = input("Enter key to delete: ")
    global kv_store
    kv_store = kv_store.filter(kv_store["key"] != key)
    print("Key deleted successfully.")


# Function to upload file for a new key
def upload_file():
    key = input("Enter new key: ")
    print("Upload the file:")
    uploaded = files.upload()
    if uploaded:
        value = next(iter(uploaded.values()))
        value_df = spark.createDataFrame([(key, value)], schema=["key", "value"])
        global kv_store
        kv_store = kv_store.union(value_df)
        print("File uploaded successfully.")
    else:
        print("File upload canceled.")

# Function to upload files for new keys in bulk
def upload_bulk():
    num_keys = int(input("Enter the number of keys to insert: "))
    for i in range(num_keys):
        key = input(f"Enter key {i + 1}: ")
        print(f"Upload the file for key {key}:")
        uploaded = files.upload()
        if uploaded:
            value = next(iter(uploaded.values()))
            value_df = spark.createDataFrame([(key, value)], schema=["key", "value"])
            global kv_store
            kv_store = kv_store.union(value_df)
            print(f"File uploaded for key {key} successfully.")
        else:
            print(f"File upload canceled for key {key}.")


In [None]:
# Main loop
while True:
    print("\nCommands:")
    print_help()
    cmd = input("Enter command (? for help): ")

    if cmd == "?":
        print_help()
    elif cmd == "r":
        read_key()
    elif cmd == "ra":
        read_all_keys()
    elif cmd == "u":
        upload_file()
    elif cmd == "d":
        delete_key()
    elif cmd == "ub":
        upload_bulk()
    elif cmd == "q":
        break
    else:
        print("Invalid command. Enter '?' for help.")

print("Program ended.")

# Stop SparkSession
spark.stop()


Commands:
?	- Print this help
r	- Read a key
ra	- Read all keys
u	- Upload file for a new key
ub	- Upload files for new keys in bulk
d	- Delete key
q	- Quit
Enter command (? for help): u
Enter new key: key1
Upload the file:


Saving pkpadmin,+463-2445-1-CE.pdf to pkpadmin,+463-2445-1-CE.pdf
File uploaded successfully.

Commands:
?	- Print this help
r	- Read a key
ra	- Read all keys
u	- Upload file for a new key
ub	- Upload files for new keys in bulk
d	- Delete key
q	- Quit


In [None]:
# Main loop
while True:
    print("\nCommands:")
    print_help()
    cmd = input("Enter command (? for help): ")

    if cmd == "?":
        print_help()
    elif cmd == "r":
        read_key()
    elif cmd == "ra":
        read_all_keys()
    elif cmd == "u":
        upload_file()
    elif cmd == "d":
        delete_key()
    elif cmd == "ub":
        upload_bulk()
    elif cmd == "q":
        break
    else:
        print("Invalid command. Enter '?' for help.")

print("Program ended.")

# Stop SparkSession
spark.stop()


Commands:
?	- Print this help
r	- Read a key
ra	- Read all keys
u	- Upload file for a new key
ub	- Upload files for new keys in bulk
d	- Delete key
q	- Quit
Enter command (? for help): u
Enter new key: key1
Upload the file:


Saving prog1.py to prog1.py
File uploaded successfully.

Commands:
?	- Print this help
r	- Read a key
ra	- Read all keys
u	- Upload file for a new key
ub	- Upload files for new keys in bulk
d	- Delete key
q	- Quit
Enter command (? for help): r
Enter key to read: key1
Value saved as file: uploads/key1

Commands:
?	- Print this help
r	- Read a key
ra	- Read all keys
u	- Upload file for a new key
ub	- Upload files for new keys in bulk
d	- Delete key
q	- Quit
Enter command (? for help): q
Program ended.
