Zadanie1

INITIAL


In [31]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ActorsData").getOrCreate()
from pyspark.sql.functions import col, lit, explode, regexp_replace, regexp_extract, isnull, ifnull, split, array_contains, expr
from pyspark.sql.functions import desc, count, avg

In [32]:
dataframe = spark.read.csv("actors.csv", header=True, inferSchema=True)
dataframe.show()

+-------------+--------+------------+---------------+----------+--------------------+
|imdb_title_id|ordering|imdb_name_id|       category|       job|          characters|
+-------------+--------+------------+---------------+----------+--------------------+
|    tt0000009|       1|   nm0063086|        actress|      NULL|[Miss Geraldine H...|
|    tt0000009|       2|   nm0183823|          actor|      NULL|      [Mr. Hamilton]|
|    tt0000009|       3|   nm1309758|          actor|      NULL|[Chauncey Depew -...|
|    tt0000009|       4|   nm0085156|       director|      NULL|                NULL|
|    tt0000574|       1|   nm0846887|        actress|      NULL|        [Kate Kelly]|
|    tt0000574|       2|   nm0846894|          actor|      NULL|     [School Master]|
|    tt0000574|       3|   nm3002376|          actor|      NULL|        [Steve Hart]|
|    tt0000574|       4|   nm0170118|        actress|      NULL|                NULL|
|    tt0000574|       5|   nm0846879|       director| 

A)

In [33]:
df_filled = dataframe.fillna({"job": 'Unknown'})

df_nulls = dataframe.withColumn("is_character_null", isnull(col("characters")))

df_ifnull = dataframe.withColumn("job_fixed", ifnull(col("job"), lit("Unknown"))) 

df_nullIf = dataframe.withColumn("job_filtered", expr("nullIf(job, 'Unknown')")) 

df_cleaned = dataframe.withColumn("characters_cleaned", regexp_replace(col("characters"), "[^a-zA-Z ]", ""))

df_extracted = dataframe.withColumn("category_first_word", regexp_extract(col("category"), r"(\w+)", 1))  

df_fixed = dataframe.withColumn("characters_array", split(col("characters"), ", "))
df_exploded = df_fixed.withColumn("character", explode(col("characters_array")))

df_fixed2 = dataframe.withColumn("characters_array", split(col("characters"), ", "))
df_array_check = df_fixed.withColumn("contains_steve", array_contains(col("characters_array"), "Steve Hart"))

df_dropped = dataframe.drop("job")

df_filled.show()
df_nulls.show()
df_ifnull.show()
df_nullIf.show()
df_cleaned.show()
df_extracted.show()
df_exploded.show()
df_array_check.show()
df_dropped.show()

+-------------+--------+------------+---------------+----------+--------------------+
|imdb_title_id|ordering|imdb_name_id|       category|       job|          characters|
+-------------+--------+------------+---------------+----------+--------------------+
|    tt0000009|       1|   nm0063086|        actress|   Unknown|[Miss Geraldine H...|
|    tt0000009|       2|   nm0183823|          actor|   Unknown|      [Mr. Hamilton]|
|    tt0000009|       3|   nm1309758|          actor|   Unknown|[Chauncey Depew -...|
|    tt0000009|       4|   nm0085156|       director|   Unknown|                NULL|
|    tt0000574|       1|   nm0846887|        actress|   Unknown|        [Kate Kelly]|
|    tt0000574|       2|   nm0846894|          actor|   Unknown|     [School Master]|
|    tt0000574|       3|   nm3002376|          actor|   Unknown|        [Steve Hart]|
|    tt0000574|       4|   nm0170118|        actress|   Unknown|                NULL|
|    tt0000574|       5|   nm0846879|       director| 

B)

UŻYCIE 3 FUNKCJI AGREGUJĄCYCH, KTÓRE UWAŻAM ZA CIEKAWE

In [34]:
# 1. Najczęstsza kategoria  
df_category_count = dataframe.groupBy("category").count().orderBy(desc("count"))  

# 2. Średnia liczba ról na film  
df_avg_roles = dataframe.groupBy("imdb_title_id").agg(count("imdb_name_id").alias("num_roles")).agg(avg("num_roles"))  

# 3. Liczba unikalnych aktorów  
df_unique_actors = dataframe.select(count("imdb_name_id").alias("unique_actors"))

df_category_count.show()
df_avg_roles.show()
df_unique_actors.show()

+-------------------+------+
|           category| count|
+-------------------+------+
|              actor|222337|
|            actress|133414|
|             writer|122793|
|           producer|101092|
|           director| 88968|
|           composer| 66861|
|    cinematographer| 55423|
|             editor| 33780|
|production_designer|  9485|
|               self|   909|
|    archive_footage|   444|
|      archive_sound|     7|
+-------------------+------+

+-----------------+
|   avg(num_roles)|
+-----------------+
|9.732469015003261|
+-----------------+

+-------------+
|unique_actors|
+-------------+
|       835513|
+-------------+



ZADANIE2

INITIAL

In [35]:
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import DoubleType

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
import pandas as pd

In [36]:
# Funkcja normalizująca ordering
def normalize_ordering(ordering, max_ordering):
    return ordering / max_ordering if max_ordering != 0 else 0

normalize_udf = udf(lambda x, max_val: normalize_ordering(x, max_val), DoubleType())
max_ordering = dataframe.agg({"ordering": "max"}).collect()[0][0]
df_normalized = dataframe.withColumn("ordering_normalized", normalize_udf(col("ordering"), lit(max_ordering)))
df_normalized.show()

+-------------+--------+------------+---------------+----------+--------------------+-------------------+
|imdb_title_id|ordering|imdb_name_id|       category|       job|          characters|ordering_normalized|
+-------------+--------+------------+---------------+----------+--------------------+-------------------+
|    tt0000009|       1|   nm0063086|        actress|      NULL|[Miss Geraldine H...|                0.1|
|    tt0000009|       2|   nm0183823|          actor|      NULL|      [Mr. Hamilton]|                0.2|
|    tt0000009|       3|   nm1309758|          actor|      NULL|[Chauncey Depew -...|                0.3|
|    tt0000009|       4|   nm0085156|       director|      NULL|                NULL|                0.4|
|    tt0000574|       1|   nm0846887|        actress|      NULL|        [Kate Kelly]|                0.1|
|    tt0000574|       2|   nm0846894|          actor|      NULL|     [School Master]|                0.2|
|    tt0000574|       3|   nm3002376|         

In [37]:
# Funkcja czyszcząca stringi (usuwa znaki specjalne)
@pandas_udf(StringType())
def clean_category(col: pd.Series) -> pd.Series:
    return col.str.replace(r"[^a-zA-Z0-9 ]", "", regex=True)

df_cleaned = dataframe.withColumn("category_cleaned", clean_category(col("category")))
df_cleaned.show()

+-------------+--------+------------+---------------+----------+--------------------+----------------+
|imdb_title_id|ordering|imdb_name_id|       category|       job|          characters|category_cleaned|
+-------------+--------+------------+---------------+----------+--------------------+----------------+
|    tt0000009|       1|   nm0063086|        actress|      NULL|[Miss Geraldine H...|         actress|
|    tt0000009|       2|   nm0183823|          actor|      NULL|      [Mr. Hamilton]|           actor|
|    tt0000009|       3|   nm1309758|          actor|      NULL|[Chauncey Depew -...|           actor|
|    tt0000009|       4|   nm0085156|       director|      NULL|                NULL|        director|
|    tt0000574|       1|   nm0846887|        actress|      NULL|        [Kate Kelly]|         actress|
|    tt0000574|       2|   nm0846894|          actor|      NULL|     [School Master]|           actor|
|    tt0000574|       3|   nm3002376|          actor|      NULL|        [