In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#!apt-get -qq install openjdk-11-jdk-headless -y      # Java 11

In [None]:
!realpath drive/MyDrive/add/ml-latest/genome-scores.csv

import os
os.environ.pop("PYSPARK_SUBMIT_ARGS", None)
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--driver-memory 240g "                # daj 12 GB do JVM
    "--executor-memory 240g "
    "--conf spark.driver.maxResultSize=32g "
    "--conf spark.sql.shuffle.partitions=8 "  # mniej partycji = mniej obiektów
    "pyspark-shell"
)



/content/drive/MyDrive/add/ml-latest/genome-scores.csv


In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .appName("TrainOnPredParquet")
        .master("local[*]")                     # TPU-runtime ma zwykle 2 rdzenie CPU
        .config("spark.ui.showConsoleProgress", "false")
        .config("spark.sql.shuffle.partitions", "2")
        .getOrCreate()
)

print("Spark OK ✅")



# Directory containing the CSV files
DATA_DIR = "/content/drive/MyDrive/add/ml-latest/"

# Read CSV files into Spark DataFrames
movies_df = spark.read.csv(
    os.path.join(DATA_DIR, "movies.csv"),
    header=True,
    inferSchema=True
)

tags_df = spark.read.csv(
    os.path.join(DATA_DIR, "tags.csv"),
    header=True,
    inferSchema=True
)

ratings_df = spark.read.csv(
    os.path.join(DATA_DIR, "ratings.csv"),
    header=True,
    inferSchema=True
)

links_df = spark.read.csv(
    os.path.join(DATA_DIR, "links.csv"),
    header=True,
    inferSchema=True
)

Spark OK ✅


In [None]:
from pyspark.sql.functions import split, explode, col, lit

genres_flag_df = (
    movies_df
    .select(
        "movieId",
        explode(split(col("genres"), "\\|")).alias("genre")
    )
    .withColumn("value", lit(1))
)

genres_pivot = (
    genres_flag_df
    .groupBy("movieId")
    .pivot("genre")
    .sum("value")
    .na.fill(0)
)

movies_genres_df = (
    movies_df.select("movieId", "title")
    .join(genres_pivot, on="movieId", how="left")
)

# Obliczenie statystyk
num_genres   = len([c for c in movies_genres_df.columns if c not in ("movieId", "title")])
num_movies   = movies_genres_df.count()
num_users    = ratings_df.select("userId").distinct().count()
num_ratings  = ratings_df.count()
num_tags     = tags_df.count()
num_links    = links_df.count()

print(f"Gatunki przetworzone. Liczba kolumn gatunków: {num_genres}")
print(f"Liczba filmów: {num_movies}")
print(f"Liczba użytkowników: {num_users}")
print(f"Liczba ocen: {num_ratings}")
print(f"Liczba tagów: {num_tags}")
print(f"Liczba linków: {num_links}")
print("-" * 100)

movies_genres_df.show(5)


Gatunki przetworzone. Liczba kolumn gatunków: 21
Liczba filmów: 86537
Liczba użytkowników: 330975
Liczba ocen: 33832162
Liczba tagów: 2328315
Liczba linków: 86537
----------------------------------------------------------------------------------------------------
+-------+--------------------+----------------------------------+------------------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|movieId|               title| We're Comin' To Get Ya!"" (2014)"|(no genres listed)|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|IMAX|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+-------+--------------------+----------------------------------+------------------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|      1|    Toy Story (1995)|    

In [None]:
from pyspark.sql.functions import *

movies_df = movies_df.withColumn(
    "releaseYear",
    regexp_extract(col("title"), r"\((\d{4})\)", 1).cast("int")
)

tags_df = tags_df.withColumn(
    "tagYear",
    year(from_unixtime(col("timestamp")))
)

In [None]:
ratings_df = ratings_df.withColumn(
    "timestamp",
    year(from_unixtime(col("timestamp")))
)

In [None]:
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import collect_list, concat_ws


tags_df = spark.read.csv(
    os.path.join(DATA_DIR, "tags.csv"),
    header=True,
    inferSchema=True
)

tags_df.select(
    countDistinct(lower(col("tag"))).alias("unique_tags_lowercase")
).show()

tags_df = tags_df.groupBy("movieId") \
    .agg(concat_ws(" ", collect_list("tag")).alias("tags"))

tags_df.show(5, truncate=False)

+---------------------+
|unique_tags_lowercase|
+---------------------+
|               143812|
+---------------------+

+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## PCA DLA TF-IDF

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.sql import functions as F
from pyspark.ml.feature import PCA
# 0. Jeśli masz kolumny a–z, nie są już potrzebne:

# 1. Tokenizacja (słowa dłuższe niż 1 litera; split po nie-literach)
regex_tok = RegexTokenizer(
    inputCol="tags",
    outputCol="tokens",
    pattern="[^a-z]+",    # wszystko co nie-litera = separator
    minTokenLength=3      # odfiltruj jedno-literowe
)
tags_tok = regex_tok.transform(tags_df)

# 3. TF – CountVectorizer
cv = CountVectorizer(
    inputCol="tokens",
    outputCol="tf",
    vocabSize=5000
)
cv_model = cv.fit(tags_tok)
tags_tf = cv_model.transform(tags_tok)

# 4. IDF
idf = IDF(
    inputCol="tf",
    outputCol="tfidf",
    minDocFreq=5          # filtracja w IDF
)
idf_model = idf.fit(tags_tf)
tags_tfidf_df = idf_model.transform(tags_tf).select("movieId", "tfidf")

k = 50                                  # docelowy wymiar; dobierz wg  ~95-98 % wariancji
pca = PCA(k=k, inputCol="tfidf", outputCol="pca_vec")
pca_model = pca.fit(tags_tfidf_df)
tags_pca = pca_model.transform(tags_tfidf_df)

# --- wektor → tablica → kolumny ---
# od Spark 3.1 mamy wbudowaną funkcję vector_to_array
arr_col = F.vector_to_array("pca_vec")
cols = [F.col("movieId")] + [arr_col[i].alias(f"pca_{i}") for i in range(k)]

tags_final = tags_pca.select(cols)


AttributeError: module 'pyspark.sql.functions' has no attribute 'vector_to_array'

In [None]:
from pyspark.sql.types import ArrayType, DoubleType

to_array_udf = F.udf(lambda v: v.toArray().tolist(),
                      ArrayType(DoubleType()))
arr_col = to_array_udf("pca_vec")

# teraz można rozwinąć tablicę w kolumny
k = 50                                # liczba składowych z PCA
cols = [F.col("movieId")] + [
    arr_col[i].alias(f"pca_{i}") for i in range(k)
]

tags_final = tags_pca.select(cols)

In [None]:
tags_df = tags_final

In [None]:
tags_df.show(5) # poprawne obliczenie pca dla tfidf

+-------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-------------------+--------------------+-------------------+--------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+----

In [None]:
from functools import reduce
from pyspark.sql import functions as F

movies_genres_df = (
    movies_genres_df
    .withColumn("name_len", F.length("title") - F.lit(5))
)

genre_cols = [
    "Action", "Adventure", "Animation", "Children", "Comedy", "Crime",
    "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "IMAX",
    "Musical", "Mystery", "Romance", "Sci-Fi", "Thriller", "War", "Western"
]

movies_genres_df = movies_genres_df.withColumn(
    "name_len",
    F.length("title") - F.lit(5)
)

movies_genres_df = movies_genres_df.withColumn(
    "genres_cnt",
    reduce(lambda x, y: x + y, [F.col(c) for c in genre_cols])
)

movies_genres_df.show(5)

+-------+--------------------+----------------------------------+------------------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+--------+----------+
|movieId|               title| We're Comin' To Get Ya!"" (2014)"|(no genres listed)|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|IMAX|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|name_len|genres_cnt|
+-------+--------------------+----------------------------------+------------------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+--------+----------+
|      1|    Toy Story (1995)|                                 0|                 0|     0|        1|        1|       1|     1|    0|          0|    0|      1|        0|     0|   0|      0|      0|      0|     0|       0|  0|      0|     

In [None]:
cols = movies_genres_df.columns

col_to_drop = cols[1:2]
movies_genres_df = movies_genres_df.drop(*col_to_drop)

In [None]:
movies_genres_df = movies_genres_df.join(movies_df.select('movieId',  'releaseYear'), on="movieId")

In [None]:
movies_genres_df.show(5)

+-------+----------------------------------+------------------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+--------+----------+-----------+
|movieId| We're Comin' To Get Ya!"" (2014)"|(no genres listed)|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|IMAX|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|name_len|genres_cnt|releaseYear|
+-------+----------------------------------+------------------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+--------+----------+-----------+
|      2|                                 0|                 0|     0|        1|        0|       1|     0|    0|          0|    0|      1|        0|     0|   0|      0|      0|      0|     0|       0|  0|      0|       9|         3|       1995|
|      4|           

In [None]:
tags_df.show(5)

+-------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-------------------+--------------------+-------------------+--------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+----

In [None]:
ratings_df.show(5)

In [None]:
df = movies_genres_df.join(tags_df, on="movieId", how="left")
df = df.join(ratings_df, on="movieId", how="right")

In [None]:
df.show(3)

+-------+----------------------------------+------------------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+--------+----------+-----------+-------------------+-------------------+------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+-----------------+-------------------+------------------+------------------+--------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+

In [None]:
df = df.drop('userId')

In [None]:
df.select("releaseYear") \
    .describe() \
    .show()

df.select("timestamp") \
    .describe() \
    .show()

+-------+------------------+
|summary|       releaseYear|
+-------+------------------+
|  count|          33796939|
|   mean|1995.4224058575246|
| stddev|15.312090366726508|
|    min|              1874|
|    max|              2023|
+-------+------------------+

+-------+------------------+
|summary|         timestamp|
+-------+------------------+
|  count|          33832162|
|   mean|2009.7229882027639|
| stddev| 8.077620726993356|
|    min|              1995|
|    max|              2023|
+-------+------------------+



In [None]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "releaseYear",
    when(col("releaseYear") < 1888, 1888).otherwise(col("releaseYear")) # najstarszy film jest z 1888 :)
)

df = df.withColumn(
    "timestamp",
    when(col("releaseYear") > col("timestamp"), lit(-9999))
    .otherwise(col("timestamp"))
)

In [None]:
df = df.withColumn(
    "timestamp_minus_year",
    col("timestamp") - col("releaseYear")
)

In [None]:
df = df.drop('timestamp')

## PCA - TAG GENOME

In [None]:
gentags_df = spark.read.csv(
    os.path.join(DATA_DIR, "genome-tags.csv"),
    header=True,
    inferSchema=True
)

genscores_df = spark.read.csv(
    os.path.join(DATA_DIR, "genome-scores.csv"),
    header=True,
    inferSchema=True
)

In [None]:
gentags_df.count()

1128

In [None]:
from pyspark.sql.functions import first, col
from pyspark.ml.feature import VectorAssembler, PCA
from pyspark.ml.functions import vector_to_array

tag_wide = genscores_df.groupBy("movieId") \
    .pivot("tagId") \
    .agg(first("relevance")) \
    .na.fill(0)

feature_cols = [c for c in tag_wide.columns if c != "movieId"]

# spark potrzebuje zwektoryzowanych wartości ratingu do pca
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)
dataset = assembler.transform(tag_wide).select("movieId", "features")

pca = PCA(k=100, inputCol="features", outputCol="pcaFeatures")
pca_model = pca.fit(dataset)

pca_result = pca_model.transform(dataset).select("movieId", "pcaFeatures")

k = 100

pca_result = pca_result \
    .withColumn("pcaArr", vector_to_array(col("pcaFeatures")))

for i in range(k):
    pca_result = pca_result.withColumn(f"pc_{i+1}", col("pcaArr")[i])

final_pca_df = pca_result.drop("pcaFeatures", "pcaArr")

final_pca_df.show(5, truncate=False)


+-------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+---------------------+--------------------+---------------------+--------------------+----------------------+---------------------+---------------------+---------------------+--------------------+----------

In [None]:
df = df.join(final_pca_df, on = 'movieId', how = 'left')

In [None]:
df = df.drop('title', 'tags')

In [None]:
df=df.drop(df.columns[1])

In [None]:
df.columns

['movieId',
 '(no genres listed)',
 'Action',
 'Adventure',
 'Animation',
 'Children',
 'Comedy',
 'Crime',
 'Documentary',
 'Drama',
 'Fantasy',
 'Film-Noir',
 'Horror',
 'IMAX',
 'Musical',
 'Mystery',
 'Romance',
 'Sci-Fi',
 'Thriller',
 'War',
 'Western',
 'name_len',
 'genres_cnt',
 'releaseYear',
 'pca_0',
 'pca_1',
 'pca_2',
 'pca_3',
 'pca_4',
 'pca_5',
 'pca_6',
 'pca_7',
 'pca_8',
 'pca_9',
 'pca_10',
 'pca_11',
 'pca_12',
 'pca_13',
 'pca_14',
 'pca_15',
 'pca_16',
 'pca_17',
 'pca_18',
 'pca_19',
 'pca_20',
 'pca_21',
 'pca_22',
 'pca_23',
 'pca_24',
 'pca_25',
 'pca_26',
 'pca_27',
 'pca_28',
 'pca_29',
 'pca_30',
 'pca_31',
 'pca_32',
 'pca_33',
 'pca_34',
 'pca_35',
 'pca_36',
 'pca_37',
 'pca_38',
 'pca_39',
 'pca_40',
 'pca_41',
 'pca_42',
 'pca_43',
 'pca_44',
 'pca_45',
 'pca_46',
 'pca_47',
 'pca_48',
 'pca_49',
 'rating',
 'timestamp_minus_year',
 'pc_1',
 'pc_2',
 'pc_3',
 'pc_4',
 'pc_5',
 'pc_6',
 'pc_7',
 'pc_8',
 'pc_9',
 'pc_10',
 'pc_11',
 'pc_12',
 'pc_13',

In [None]:
df.write.mode("overwrite").parquet("/content/drive/MyDrive/add/ml-latest/output/pred.parquet")