In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

JAR_FILE_PATH = "mysql-connector.jar"
DB_PASS = "shNogaBv09Dp"
DB_USER = "root"
DB_URL = "jdbc:mysql://localhost:3306/university_group"
DB_TABLE = "friends"

print(">>> Attempting to start Spark session with MySQL driver...")

try:
    spark = SparkSession.builder \
        .appName("MySQL_Lab_Task") \
        .config("spark.driver.extraClassPath", JAR_FILE_PATH) \
        .getOrCreate()
    
    print("--- SUCCESS! Spark session started. ---\n")

except Exception as e:
    print("CRITICAL ERROR: Failed to start Spark session.")
    print("Check if the file name in 'JAR_FILE_PATH' is correct.")
    print(e)
    raise SystemExit("Stopped due to Spark error.")

print(f">>> Loading data from table '{DB_TABLE}'...")
try:
    df = spark.read \
        .format("jdbc") \
        .option("url", DB_URL) \
        .option("dbtable", DB_TABLE) \
        .option("user", DB_USER) \
        .option("password", DB_PASS) \
        .load()

    print(f"--- SUCCESS! Data retrieved from MySQL. Here it is ({df.count()} people): ---")
    df.show()

except Exception as e:
    print("ERROR: Failed to read data from MySQL.")
    print("Check if: 1. MySQL server is running, 2. Password (DB_PASS) is correct, 3. Database 'grupa_db' and table 'koledzy' exist.")
    print(e)
    spark.stop()
    raise SystemExit("Stopped due to database error.")

print("\n>>> Filtering people with age > 22:")

df_filtered = df.filter(df.age > 22)

print(f"--- SUCCESS! Found {df_filtered.count()} people older than 22: ---")
df_filtered.show()

print("\n>>> Adding a new person ('Robert Makłowicz') to the MySQL database...")

new_person_data = [
    ('Robert', 'Makłowicz', 'robert@mail.com', '111222333', 'M', 54, 'Zdalnie', 'Gotowanie')
]

new_person_schema = StructType([
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("favorite_tech", StringType(), True)
])

new_person_df = spark.createDataFrame(data=new_person_data, schema=new_person_schema)

new_person_df.write \
    .format("jdbc") \
    .option("url", DB_URL) \
    .option("dbtable", DB_TABLE) \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .mode("append") \
    .save()

print("--- SUCCESS! New person added. ---")

print(f"\n>>> Verification. Full table '{DB_TABLE}' after update:")
final_df = spark.read \
    .format("jdbc") \
    .option("url", DB_URL) \
    .option("dbtable", DB_TABLE) \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .load()

final_df.show()
print(f"Total number of people in the database: {final_df.count()}")
spark.stop()
print("\n>>> Spark session closed. Task completed.")

>>> Attempting to start Spark session with MySQL driver...
--- SUCCESS! Spark session started. ---

>>> Loading data from table 'friends'...
--- SUCCESS! Data retrieved from MySQL. Here it is (14 people): ---
+---+----------+----------+--------------------+---------+---+---+------+-------------+
| id|first_name| last_name|               email|    phone|sex|age|  city|favorite_tech|
+---+----------+----------+--------------------+---------+---+---+------+-------------+
|  1|    Daniel|    Cachro|daniel.cachro@mai...|123555888|  M| 22|Miasto|          SQL|
|  2|   Dariusz|       Gał|dariusz.gal@mail.com|123555888|  M| 23|Miasto|         Java|
|  3|     Dawid|    Morawa|dawid.morawa@mail...|123555888|  M| 21|Miasto|        React|
|  4|   Dominik|Spytkowski|dominik.spytkowsk...|123555888|  M| 24|Miasto|        Spark|
|  5|     Jakub|     Bąbol|jakub.babol@mail.com|123555888|  M| 22|Miasto|      HTML XD|
|  6|   Jędrzej| Pawłowski|jedrzej.pawlowski...|123555888|  M| 23|Miasto|   JavaScript|