In [1]:
import pyspark
from pyspark.pandas import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col, array_contains

spark = SparkSession.builder.appName('MusicGen').getOrCreate()

ModuleNotFoundError: No module named 'pyspark'

In [3]:
from keras.layers import (
    Input, Dense, Activation, TimeDistributed, Softmax, TextVectorization, Reshape,
    RepeatVector, Conv1D, Bidirectional, AveragePooling1D, UpSampling1D, Embedding,
    Concatenate, GlobalAveragePooling1D, LSTM, Multiply, MultiHeadAttention
)
from keras.models import Model
import tensorflow as tf
import keras
import numpy as np

In [4]:
schema = StructType() \
      .add("title", StringType(),True) \
      .add("tag", StringType(), True) \
      .add("artist", StringType(), True) \
      .add("year", IntegerType(), True) \
      .add("views", IntegerType(), True) \
      .add("features", StringType(), True) \
      .add("lyrics", StringType(), False) \
      .add("id", IntegerType(), True) \
      .add("language_cld3", StringType(), True) \
      .add("language_ft", StringType(), True) \
      .add("language", StringType(), True)

# df = spark.read.csv("song_lyrics.csv")
# df.printSchema()
DATASET_PATH = "song_lyrics.csv"
df = spark.read.format("csv") \
      .option("header", True) \
      .option("multiLine", True) \
      .option("escape","\"") \
      .schema(schema) \
      .load(DATASET_PATH)
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- views: integer (nullable = true)
 |-- features: string (nullable = true)
 |-- lyrics: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- language_cld3: string (nullable = true)
 |-- language_ft: string (nullable = true)
 |-- language: string (nullable = true)



In [5]:
cols = ("artist", "year", "views", "id", "language_cld3", "language_ft")

df = df.drop(*cols)

In [None]:
train, test = df.randomSplit([0.8, 0.2], 69)

In [6]:
from pyspark.sql.functions import monotonically_increasing_id 

train = train.select("*").withColumn("id", monotonically_increasing_id())

In [7]:
train.printSchema()

root
 |-- title: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- features: string (nullable = true)
 |-- lyrics: string (nullable = true)
 |-- language: string (nullable = true)
 |-- id: long (nullable = false)



In [17]:
wanted_tag = 'pop'
df_ = train.filter(f"tag = {wanted_tag} AND language = en")
n_rows = df_.count()
n_rows

2138587

In [41]:
class BatchDataset(tf.keras.utils.Sequence):    
    def __init__(self, dataset_spark, batch_size, dataset_len):
        self.batch_size = batch_size
        self.dataset_spark = dataset_spark
        self.dataset_full_len = dataset_len
    
    def __len__(self):
        return int(np.ceil(self.dataset_full_len / self.batch_size))

    def __getitem__(self, idx):
        print(f"Iter: {idx}")
        rows = self.dataset_spark \
                    .where(df.id > (self.batch_size * idx)) \
                    .limit(self.batch_size)
        rows = rows.toPandas()
        X = rows["lyrics"].to_numpy()
        return X


vocab_size = 10_000
n_grams = 10
batch_size = 60_000

vectorize_layer = TextVectorization(
        max_tokens=vocab_size, output_sequence_length=n_grams
    )
dataset = BatchDataset(df_, batch_size, n_rows)

In [42]:
vectorize_layer.adapt(dataset)

KeyboardInterrupt: 

In [None]:
# Create model.
model = tf.keras.models.Sequential()
model.add(tf.keras.Input(shape=(1,), dtype=tf.string))
model.add(vectorize_layer)

# Save.
filepath = "tmp-model"
model.save(filepath)

# Load.
# loaded_model = tf.keras.models.load_model(filepath)
# loaded_vectorizer = loaded_model.layers[0]

In [None]:
def predict_word(seq_len, latent_dim, vocab_size):
    input_layer = Input(shape=(seq_len-1,))
    x = input_layer
    x = Embedding(vocab_size, latent_dim, name='embedding', mask_zero=True)(x)
    x = MultiHeadAttention(num_heads=3, key_dim=2)(x, value=x)
    x = GlobalAveragePooling1D()(x)
    latent_rep = x
    x = Dense(vocab_size)(x)
    x = Softmax()(x)
    return Model(input_layer, x), Model(input_layer, latent_rep)

predictor, latent = predict_word(n_grams, 15, vocab_size)
predictor.summary()
#opt = keras.optimizers.SGD(learning_rate=1, momentum=0.9)
opt = keras.optimizers.Nadam(learning_rate=0.1)
loss_fn = keras.losses.SparseCategoricalCrossentropy(
    ignore_class=1,
    name="sparse_categorical_crossentropy",
)

predictor.compile(loss=loss_fn, optimizer=opt, metrics=["accuracy"])

In [None]:
# Quantos para frente
N = 10
def get_last_token(x):
        """
        Function to map the dataset to (x, y) pairs.
        The y is last token of x.
        x is output of vectorization - last token.
        """
        vectorized_x = vectorize_layer(x)
        X = []
        Y = []
        # y_ = x_[:,-1:]
        # x_ = x_[:, :-1]
        i = 0
        while i + N < len(vectorized_x):
            X.append(vectorized_x[i:i+N])
            Y.append(vectorized_x[i+N])
            i += 1
        return X, Y



In [None]:
history = predictor.fit(dataset.map(get_last_token), epochs=1, verbose=1)