In [0]:
# 1.)
pracownicy_data = [
    {'name': 'Jan', 'age': 28, 'department': 'IT'},
    {'name': 'Anna', 'age': 34, 'department': 'HR'},
    {'name': 'Piotr', 'age': 22, 'department': 'Finance'},
    {'name': 'Ewa', 'age': 45, 'department': 'IT'},
    {'name': 'Marek', 'age': 31, 'department': 'HR'},
    {'name': 'Joanna', 'age': 29, 'department': 'Finance'}
]


df = spark.createDataFrame(pracownicy_data)

display(df)

age,department,name
28,IT,Jan
34,HR,Anna
22,Finance,Piotr
45,IT,Ewa
31,HR,Marek
29,Finance,Joanna


In [0]:
spark.sql("SHOW VOLUMES IN big_data.default").show()

+--------+-----------+
|database|volume_name|
+--------+-----------+
| default| pracownicy|
+--------+-----------+



In [0]:
# 2.)
file_path = "/Volumes/big_data/default/pracownicy/pracownicy.csv"

# Load the data again
df_employees = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

display(df_employees.limit(5))

��n�a�m�e�,�a�g�e�,�d�e�p�a�r�t�m�e�n�t�
�J�a�n�,�2�8�,�I�T�
�A�n�n�a�,�3�4�,�H�R�
�P�i�o�t�r�,�2�2�,�F�i�n�a�n�c�e�
�E�w�a�,�4�5�,�I�T�
�M�a�r�e�k�,�3�1�,�H�R�


In [0]:
# 3. )
from pyspark.sql.functions import col

file_path = "/Volumes/big_data/default/pracownicy/pracownicy.csv"

df_employees = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("encoding", "UTF-16") \
    .load(file_path)

# 2. Verify columns are clean now (Optional)
print("Columns:", df_employees.columns)

# 3. Filter and Select
df_filtered = df_employees.filter(col("age") > 30) \
                          .select("name", "age")

display(df_filtered)

Columns: ['name', 'age', 'department']


name,age
Anna,34
Ewa,45
Marek,31


In [0]:
# 4. )

from pyspark.sql.functions import col, avg, desc

df_sorted = df_employees.orderBy(col("age").desc())

print("Sorted by age (descending):")
display(df_sorted)

df_grouped = df_employees.groupBy("department") \
                         .agg(avg("age").alias("average_age"))

print("Average age by department:")
display(df_grouped)

Sorted by age (descending):


name,age,department
Ewa,45,IT
Anna,34,HR
Marek,31,HR
Joanna,29,Finance
Jan,28,IT
Piotr,22,Finance


Average age by department:


department,average_age
HR,32.5
IT,36.5
Finance,25.5


In [0]:

# 5. )
from pyspark.sql.functions import col

df_modified = df_employees.withColumn("age_in_5_years", col("age") + 5)

df_final = df_modified.drop("department")

display(df_final)

name,age,age_in_5_years
Jan,28,33
Anna,34,39
Piotr,22,27
Ewa,45,50
Marek,31,36
Joanna,29,34


In [0]:
# 6. )

projects_file_path = "/Volumes/big_data/default/pracownicy/projekty.csv"

df_projects = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("encoding", "UTF-8") \
    .load(projects_file_path)

print("Projects Data:")
display(df_projects)


df_joined = df_employees.join(df_projects, on="name", how="inner")

print("Joined Data (Employees + Projects):")
display(df_joined)

Projects Data:


name,projects
Jan,5
Anna,3
Piotr,4
Ewa,7
Marek,2
Joanna,6


Joined Data (Employees + Projects):


name,age,department,projects
Jan,28,IT,5
Anna,34,HR,3
Piotr,22,Finance,4
Ewa,45,IT,7
Marek,31,HR,2
Joanna,29,Finance,6


In [0]:
# 7. )

from pyspark.sql.functions import col, when, lit

df_with_nulls = df_employees.withColumn("age", 
                                        when(col("name") == "Jan", None)
                                        .otherwise(col("age"))) \
                            .withColumn("department", 
                                        when(col("name") == "Anna", None)
                                        .otherwise(col("department")))

print("DataFrame with NULLs:")
display(df_with_nulls)


df_filled = df_with_nulls.na.fill({
    "age": 0, 
    "department": "Unknown"
})

print("DataFrame after filling NULLs:")
display(df_filled)


df_dropped = df_with_nulls.na.drop(how="any")

print("DataFrame after dropping rows with NULLs:")
display(df_dropped)

DataFrame with NULLs:


name,age,department
Jan,,IT
Anna,34.0,
Piotr,22.0,Finance
Ewa,45.0,IT
Marek,31.0,HR
Joanna,29.0,Finance


DataFrame after filling NULLs:


name,age,department
Jan,0,IT
Anna,34,Unknown
Piotr,22,Finance
Ewa,45,IT
Marek,31,HR
Joanna,29,Finance


DataFrame after dropping rows with NULLs:


name,age,department
Piotr,22,Finance
Ewa,45,IT
Marek,31,HR
Joanna,29,Finance


In [0]:
# 8.)
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, desc

window_spec = Window.orderBy(col("projects").desc())

df_ranked = df_joined.withColumn("rank", rank().over(window_spec))

print("Employees ranked by project count:")
display(df_ranked)

Employees ranked by project count:




name,age,department,projects,rank
Ewa,45,IT,7,1
Joanna,29,Finance,6,2
Jan,28,IT,5,3
Piotr,22,Finance,4,4
Anna,34,HR,3,5
Marek,31,HR,2,6


In [0]:
# 9.)

from pyspark.sql.functions import col, split, explode, lower, trim, count


file_path = "/Volumes/big_data/default/pracownicy/word_count_sample.txt"
dbutils.fs.put(file_path, sample_text, True)

df_text = spark.read.text(file_path)

print("Raw text DataFrame:")
display(df_text)

df_words = df_text.select(explode(split(col("value"), " ")).alias("word"))

df_counts = df_words.select(lower(trim(col("word"))).alias("word")) \
                    .filter(col("word") != "") \
                    .groupBy("word") \
                    .count() \
                    .orderBy(col("count").desc())

print("Top 5 most frequent words:")
display(df_counts.limit(5))

Wrote 162 bytes.
Raw text DataFrame:


value
Apache Spark is fast.
Spark is a unified analytics engine for Big Data.
Big Data requires distributed computing.
Spark uses DataFrames for distributed processing.


Top 5 most frequent words:


word,count
spark,3
distributed,2
big,2
is,2
for,2


In [0]:
catalog_name = "main"      
schema_name = "default"   
volume_name = "my_files"  

volume_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/moje_wyniki"

print(f"Attempting to save to Volume: {volume_path}")

try:
    # Save to CSV
    df_joined.write.mode("overwrite").csv(f"{volume_path}/csv", header=True)
    
    # Save to JSON
    df_joined.write.mode("overwrite").json(f"{volume_path}/json")
    
    print("Success! Files saved to Volume.")
except Exception as e:
    print(f"Error: Make sure the Volume '{volume_name}' exists in '{catalog_name}.{schema_name}'.")
    print(e)

Attempting to save to Volume: /Volumes/main/default/my_files/moje_wyniki
Error: Make sure the Volume 'my_files' exists in 'main.default'.
[NO_SUCH_CATALOG_EXCEPTION] Catalog 'main' was not found. Please verify the catalog name and then retry the query or command again. SQLSTATE: 42704

JVM stacktrace:
org.apache.spark.sql.catalyst.analysis.NoSuchCatalogException
	at com.databricks.sql.managedcatalog.client.ManagedCatalogClientImpl$ConvertVolumeException.convertVolumeException(ManagedCatalogClientImpl.scala:9145)
	at com.databricks.sql.managedcatalog.client.ManagedCatalogClientImpl$ConvertVolumeException.convertVolumeException(ManagedCatalogClientImpl.scala:9151)
	at com.databricks.sql.managedcatalog.client.ManagedCatalogClientImpl$ConvertVolumeException.convertVolumeException(ManagedCatalogClientImpl.scala:9165)
	at com.databricks.sql.managedcatalog.client.ManagedCatalogClientImpl.handleVolumeException(ManagedCatalogClientImpl.scala:9133)
	at com.databricks.sql.managedcatalog.client.Ma