# Recomendação de músicas e filmes

### Movies Dataset

#### Importação de bibliotecas

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, explode, from_json
from pyspark.sql.types import StringType, ArrayType, IntegerType, StructType, StructField
from re import sub

#### Constantes

In [None]:
# Caminho dos datasets
movies_metadata_path = "../datasets/movies_metadata.csv"
movies_credits_path = "../datasets/movies_credits.csv"
spotify_path = "../datasets/spotify_dataset.csv"
soundtrack_path = "../datasets/sound_track_imdb_top_250_movie_tv_series.csv"
# movies_metadata_path = "dbfs:/FileStore/shared_uploads/beatrizpatricio@estudante.ufscar.br/movies_metadata.csv"
# movies_credits_path = "dbfs:/FileStore/shared_uploads/beatrizpatricio@estudante.ufscar.br/credits.csv"

# Conexão com o Neo4j
neo4j_url = "neo4j://localhost:7687"
neo4j_user = "neo4j"
neo4j_password = "sparkneo4j"
dbname = "neo4j"
connector_path = "../neo4j-spark-connector-5.3.8-s_2.13.jar"

# Meu Neo4j
#neo4j_url = "neo4j+s://90016f46.databases.neo4j.io"  # ou bolt+s://<host>.databases.neo4j.io para AuraDB neo4j+s://90016f46.databases.neo4j.io
#neo4j_user = "neo4j"
#neo4j_password = "iA5r3A7HdNjl084_m47abMkDF6SlUpm1n2jVG7bR6HY"


#### Preparação da conexão com o Neo4j

In [None]:
spark = (
    SparkSession.builder.config("neo4j.url", neo4j_url)
    .config("neo4j.authentication.basic.username", neo4j_user)
    .config("neo4j.authentication.basic.password", neo4j_password)
    .config("neo4j.database", dbname)
    .config("spark.jars", connector_path)
    .getOrCreate()
)

### 1. Leitura do dataset de metadata

In [None]:
def getMetadataDataset ():
    df_metadata = (spark.read
        .format("csv")
        .option("mode", "DROPMALFORMED")
        .option("header", True)
        .option("inferSchema", True)
        .load(movies_metadata_path)
    )

    return df_metadata

df_metadata = getMetadataDataset()
df_metadata.printSchema()
print(f"{df_metadata.count()} linhas")
df_metadata.first()

### 2. Criação da coluna de identificador

In [None]:
def getIdentifierName (title):
    if title:
        return sub(r'[^a-zA-Z0-9]', '', title).lower().strip()
    return None

def addIdentifierColumn (df_metadata):
    getIdentifierNameUdf = udf(getIdentifierName, StringType())
    return df_metadata.withColumn("identifierByName", getIdentifierNameUdf(df_metadata['title'])) \
        .withColumn("movie_id", col("id"))

df_metadata = addIdentifierColumn(df_metadata)
print(f"Exemplo de identificador para o filme '{df_metadata.first()['title']}': {df_metadata.first()['identifierByName']}")

### 3. Remoção de colunas que não interessam para o projeto

_As colunas remanescentes são: id, title, genres, identifierByName e imdb\_id_

In [None]:
def dropColumns (df_metadata):
    columnsToDrop = [
        "adult",
        "belongs_to_collection",
        "budget",
        "homepage",
        "original_language",
        "original_title",
        "overview",
        "popularity",
        "poster_path",
        "production_companies",
        "production_countries",
        "revenue",
        "runtime",
        "spoken_languages",
        "status",
        "tagline",
        "video",
        "vote_average",
        "vote_count",
        "id",
        "imdb_id"
    ]

    return df_metadata.drop(*columnsToDrop)

df_metadata = dropColumns(df_metadata)
df_metadata.printSchema()
print()
print(df_metadata.first())

### 4. Remoção de linhas sem título ou identificador, e com data de lançamento mal-formada

In [None]:
def dropRows (df_metadata):
    df_metadata = df_metadata.distinct()
    df_metadata = df_metadata.na.drop("all")
    df_metadata = df_metadata.filter(col("release_date").rlike(r'\d{4}-\d{2}-\d{2}'))
    df_metadata = df_metadata.filter((col("title").isNotNull()) & (col("title").cast("string") == col("title")))
    return df_metadata

print(df_metadata.count(), " linhas originalmente")
df_metadata = dropRows(df_metadata)
print(df_metadata.count(), " linhas após remoção")

### 5. Leitura do dataset de créditos

In [None]:
def getCreditsDataset():
    df_credits = (
        spark.read
        .format("csv")
        .option("mode", "DROPMALFORMED")
        .option("header", True)
        .option("inferSchema", True)
        .load(movies_credits_path)
    )

    return df_credits.filter(~col("id").rlike(r'\D+'))

df_credits = getCreditsDataset()
df_credits.printSchema()
print(f"\t{df_credits.count()} linhas")

### 6. Criação do dataframe de Diretores/Filmes

In [None]:
def getDirectorsDataFrame (df_credits):
    crew_schema = ArrayType(
        StructType([
            StructField("credit_id", StringType()),
            StructField("department", StringType()),
            StructField("gender", IntegerType()),
            StructField("id", IntegerType()),
            StructField("job", StringType()),
            StructField("name", StringType()),
            StructField("profile_path", StringType())
        ])
    )

    df_with_crew = df_credits.withColumn("crew_array", from_json(col("crew"), crew_schema))
    df_exploded = df_with_crew.withColumn("crew_member", explode("crew_array"))

    return (
        df_exploded
        .filter(col("crew_member.job") == "Director")
        .select(
            col("crew_member.name").alias("director_name"),
            col("id").alias("movie_id")
        )
    )

df_directors = getDirectorsDataFrame(df_credits)
df_directors.show(truncate=False)
print(f"\t{df_directors.count()} linhas")

### 7. Criação do dataframes de gêneros
_Também é dropada a coluna de gêneros do dataframe original_

In [None]:
def getGenres(df_metadata):
  genres_schema = ArrayType(
    StructType([
      StructField("id", IntegerType(), True),
      StructField("name", StringType(), True)
    ])
  )

  df_parsed = df_metadata.withColumn("genres_json", from_json(col("genres"), genres_schema))
  df_exploded = df_parsed.withColumn("genre", explode(col("genres_json")))

  df_genres_movies = df_exploded.select(
    col("movie_id"),
    col("genre.id").alias("genre_id"),
    col("genre.name").alias("genre_name")
  )

  df_genres = df_genres_movies.select("genre_id", "genre_name").distinct()

  return df_genres_movies, df_genres

df_genres_movies, df_genres = getGenres(df_metadata)
df_metadata = df_metadata.drop("genres")

df_genres_movies.show(truncate=False)
print(f"\t{df_genres_movies.count()} linhas")

df_genres.show(truncate=False)
print(f"\t{df_genres.count()} linhas")

### 8. Criação do Dataframe de filmes (união do df de metadata e de diretores)
_Também é dropada a coluna de movie_id do dataframe de diretores_

In [None]:
def getFinalMovieDataFrame (df_metadata, df_directors):
    return df_metadata.join(
        df_directors,
        on = df_metadata["movie_id"] == df_directors["movie_id"],
        how = "left"
    )

df_movies = getFinalMovieDataFrame(df_metadata, df_directors)
df_directors = df_directors.drop("movie_id")

In [None]:
df_movies.printSchema()
print(df_movies.first())
print()
print(f"Quantidade total de linhas: {df_movies.count()}")

### Spotify dataset 

In [None]:
def getSpotifyDataset ():
    df_spotify = (spark.read
        .format("csv")
        .option("header", True)
        .option("inferSchema", True)
        .load(spotify_path)
    )

    return df_spotify

df_spotify = getSpotifyDataset()
df_spotify.printSchema()
print(f"{df_spotify.count()} linhas")
df_spotify.first()

### Trilha sonora dataset 

In [None]:
df_soundtrack = spark.read.format("csv").option("header", "true").load(soundtrack_path)
df_soundtrack.printSchema()
print(f"{df_soundtrack.count()} linhas")
df_soundtrack.first()

In [None]:
# Substitui "NA" por None em todas as colunas
df_soundtrack = df_soundtrack.replace("NA", None)

In [None]:
from pyspark.sql.functions import col, sum

null_counts = df_soundtrack.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df_soundtrack.columns
])

null_counts.show()

In [None]:
# Seleciona as primeiras 6 colunas (remove as colunas que não serão utilizadas e possuem muitos nulos)
selected_columns = df_soundtrack.columns[:6]
df_soundtrack = df_soundtrack.select(*selected_columns)

In [None]:
from pyspark.sql.functions import regexp_replace
# remove a marcação "(uncredited)" da coluna performed_by
df_soundtrack = df_soundtrack.withColumn("performed_by", regexp_replace("performed_by", r"\(uncredited\)", "").alias("performed_by"))

In [None]:
df_soundtrack.show(truncate=False)

### 8. Inserção no neo4j (filmes)

In [None]:
df_genres.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("labels", ":MovieGenre") \
    .option("node.keys", "genre_id") \
    .save()

In [None]:
df_directors.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("labels", ":Director") \
    .option("node.keys", "director_name") \
    .save()

In [None]:
df_metadata.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("labels", ":Movie") \
    .option("node.keys", "movie_id") \
    .save()

In [None]:
df_genres_movies = df_genres_movies.coalesce(1)

df_genres_movies.write.format("org.neo4j.spark.DataSource") \
    .mode("Append") \
    .option("batch.size", "100") \
    .option("maxTransactionRetryTime", "30s") \
    .option("relationship", "HAS_GENRE") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", ":Movie") \
    .option("relationship.target.labels", ":MovieGenre") \
    .option("relationship.source.node.keys", "movie_id") \
    .option("relationship.target.node.keys", "genre_id") \
    .save()

In [None]:
df_movies = df_movies.coalesce(1)

df_movies.write.format("org.neo4j.spark.DataSource") \
    .mode("Append") \
    .option("batch.size", "100") \
    .option("maxTransactionRetryTime", "30s") \
    .option("relationship", "DIRECTED") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", ":Movie") \
    .option("relationship.target.labels", ":Director") \
    .option("relationship.source.node.keys", "movie_id") \
    .option("relationship.target.node.keys", "director_name") \
    .save()

### 8. Inserção no neo4j (músicas)

In [None]:
df_spotify.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("labels", ":Song") \
    .option("node.keys", "song_id") \
    .save()

### Inserção no neo4j (Trilhas Sonóras)

In [None]:
df_soundtracks = df_soundtracks.coalesce(1)

df_soundtracks.write.format("org.neo4j.spark.DataSource") \
    .mode("Append") \
    .option("batch.size", "100") \
    .option("maxTransactionRetryTime", "30s") \
    .option("relationship", "SOUNDTRACK") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", ":Movie") \
    .option("relationship.target.labels", ":Song") \
    .option("relationship.source.node.keys", "name:title") \
    .option("relationship.target.node.keys", "song_name:name") \
    .save()