Spark DataFrame version of the transformations (no saves).

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.master('local[*]').getOrCreate()
ruta_carpeta = 'file:///home/jovyan/work/datasets/Originales'

In [9]:
# Movies
movies_schema = StructType([
    StructField('filmId', IntegerType(), True),
    StructField('film', StringType(), True),
    StructField('genders', StringType(), True),
])

df_movies = spark.read.csv(ruta_carpeta + '/movies.dat', sep='::', header=False, schema=movies_schema)
print('Filas movies:', df_movies.count())
df_movies.show(5, truncate=False)

Filas movies: 3883
+------+----------------------------------+----------------------------+
|filmId|film                              |genders                     |
+------+----------------------------------+----------------------------+
|1     |Toy Story (1995)                  |Animation|Children's|Comedy |
|2     |Jumanji (1995)                    |Adventure|Children's|Fantasy|
|3     |Grumpier Old Men (1995)           |Comedy|Romance              |
|4     |Waiting to Exhale (1995)          |Comedy|Drama                |
|5     |Father of the Bride Part II (1995)|Comedy                      |
+------+----------------------------------+----------------------------+
only showing top 5 rows



In [10]:
# Limpieza de movies
df_movies_mod = (df_movies
    .withColumn('year', F.regexp_extract('film', r'\((\d{4})\)', 1).cast(IntegerType()))
    .withColumn('film', F.regexp_replace('film', r'\s*\(\d{4}\)$', ''))
    .withColumn('genders', F.split(F.col('genders'), '\|'))
    .withColumn('genders', F.expr('filter(genders, x -> x <> "")'))
)

df_movies_mod.select('film', 'year', 'genders').show(5, truncate=False)


+---------------------------+----+--------------------------------+
|film                       |year|genders                         |
+---------------------------+----+--------------------------------+
|Toy Story                  |1995|[Animation, Children's, Comedy] |
|Jumanji                    |1995|[Adventure, Children's, Fantasy]|
|Grumpier Old Men           |1995|[Comedy, Romance]               |
|Waiting to Exhale          |1995|[Comedy, Drama]                 |
|Father of the Bride Part II|1995|[Comedy]                        |
+---------------------------+----+--------------------------------+
only showing top 5 rows



In [11]:
# Conteo de generos
genre_counts = (df_movies_mod
    .select(F.explode('genders').alias('genre'))
    .where(F.col('genre').isNotNull())
    .groupBy('genre')
    .count()
    .orderBy(F.desc('count'))
)
genre_counts.show(20, truncate=False)

+-----------+-----+
|genre      |count|
+-----------+-----+
|Drama      |1603 |
|Comedy     |1200 |
|Action     |503  |
|Thriller   |492  |
|Romance    |471  |
|Horror     |343  |
|Adventure  |283  |
|Sci-Fi     |276  |
|Children's |251  |
|Crime      |211  |
|War        |143  |
|Documentary|127  |
|Musical    |114  |
|Mystery    |106  |
|Animation  |105  |
|Fantasy    |68   |
|Western    |68   |
|Film-Noir  |44   |
+-----------+-----+



In [12]:
# Users
users_schema = StructType([
    StructField('userId', IntegerType(), True),
    StructField('gender', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('occupation', IntegerType(), True),
    StructField('zip', StringType(), True),
])

df_users = spark.read.csv(ruta_carpeta + '/users.dat', sep='::', header=False, schema=users_schema)
print('Filas users:', df_users.count())
df_users.show(5, truncate=False)

Filas users: 6040
+------+------+---+----------+-----+
|userId|gender|age|occupation|zip  |
+------+------+---+----------+-----+
|1     |F     |1  |10        |48067|
|2     |M     |56 |16        |70072|
|3     |M     |25 |15        |55117|
|4     |M     |45 |7         |02460|
|5     |M     |25 |20        |55455|
+------+------+---+----------+-----+
only showing top 5 rows



In [13]:
# Transformaciones sobre users
occupation_map = {
    0: 'Otro / No especificado', 1: 'Academico / Educador', 2: 'Artista',
    3: 'Administrativo / Oficina', 4: 'Estudiante universitario / Postgrado',
    5: 'Atencion al cliente', 6: 'Medico / Sector salud', 7: 'Ejecutivo / Gerente',
    8: 'Agricultor', 9: 'Amo/a de casa', 10: 'Estudiante (Escuela/Instituto)',
    11: 'Abogado', 12: 'Programador', 13: 'Jubilado', 14: 'Ventas / Marketing',
    15: 'Cientifico', 16: 'Autonomo', 17: 'Tecnico / Ingeniero', 18: 'Artesano / Oficio manual',
    19: 'Desempleado', 20: 'Escritor'
}

age_groups = {
    1: (17, 'A'),
    18: (21, 'B'),
    25: (30, 'C'),
    35: (40, 'D'),
    45: (47, 'E'),
    50: (52, 'F'),
    56: (60, 'G'),
}

zip_state_ranges = [
    (350, 369, 'AL'), (995, 999, 'AK'), (850, 865, 'AZ'), (716, 729, 'AR'),
    (900, 961, 'CA'), (800, 816, 'CO'), (60, 69, 'CT'), (197, 199, 'DE'),
    (200, 200, 'DC'), (202, 205, 'DC'), (569, 569, 'DC'), (320, 349, 'FL'),
    (300, 319, 'GA'), (967, 968, 'HI'), (832, 838, 'ID'), (600, 629, 'IL'),
    (460, 479, 'IN'), (500, 528, 'IA'), (660, 679, 'KS'), (400, 427, 'KY'),
    (700, 715, 'LA'), (39, 49, 'ME'), (206, 219, 'MD'), (10, 27, 'MA'),
    (480, 499, 'MI'), (550, 567, 'MN'), (386, 397, 'MS'), (630, 658, 'MO'),
    (590, 599, 'MT'), (680, 693, 'NE'), (889, 898, 'NV'), (30, 38, 'NH'),
    (70, 89, 'NJ'), (870, 884, 'NM'), (100, 149, 'NY'), (270, 289, 'NC'),
    (580, 588, 'ND'), (430, 459, 'OH'), (730, 749, 'OK'), (970, 979, 'OR'),
    (150, 196, 'PA'), (28, 29, 'RI'), (290, 299, 'SC'), (570, 577, 'SD'),
    (370, 385, 'TN'), (750, 799, 'TX'), (885, 885, 'TX'), (840, 847, 'UT'),
    (50, 59, 'VT'), (201, 201, 'VA'), (220, 246, 'VA'), (980, 994, 'WA'),
    (247, 268, 'WV'), (530, 549, 'WI'), (820, 831, 'WY'), (6, 9, 'PR'),
    (8, 8, 'VI'), (969, 969, 'GU')
]

from pyspark.sql.functions import lit
map_list = []
for k, v in occupation_map.items():
    map_list.extend([lit(k), lit(v)])
occupation_expr = F.create_map(*map_list)

age_mean_expr = None
age_letter_expr = None
for code, (mean_val, letter) in age_groups.items():
    cond = F.col('age') == lit(code)
    age_mean_expr = F.when(cond, lit(mean_val)) if age_mean_expr is None else age_mean_expr.when(cond, lit(mean_val))
    age_letter_expr = F.when(cond, lit(letter)) if age_letter_expr is None else age_letter_expr.when(cond, lit(letter))
age_mean_expr = age_mean_expr.otherwise(lit(None))
age_letter_expr = age_letter_expr.otherwise(lit(None))

def zip_to_state(zip_code):
    if zip_code is None:
        return None
    digits = ''.join(ch for ch in str(zip_code) if ch.isdigit())
    if len(digits) < 3:
        return None
    prefix = int(digits[:3])
    for low, high, state in zip_state_ranges:
        if low <= prefix <= high:
            return state
    return None

from pyspark.sql.types import StringType
zip_to_state_udf = F.udf(zip_to_state, StringType())

df_users_mod = (df_users
    .withColumn('occupation', F.element_at(occupation_expr, F.col('occupation')).cast(StringType()))
    .withColumn('age_group_mean', age_mean_expr)
    .withColumn('age_group_letter', age_letter_expr)
    .withColumn('state', zip_to_state_udf('zip'))
    .fillna({'state': 'Other'})
    .drop('age', 'zip')
)

df_users_mod.show(5, truncate=False)

+------+------+------------------------------+--------------+----------------+-----+
|userId|gender|occupation                    |age_group_mean|age_group_letter|state|
+------+------+------------------------------+--------------+----------------+-----+
|1     |F     |Estudiante (Escuela/Instituto)|17            |A               |MI   |
|2     |M     |Autonomo                      |60            |G               |LA   |
|3     |M     |Cientifico                    |30            |C               |MN   |
|4     |M     |Ejecutivo / Gerente           |47            |E               |MA   |
|5     |M     |Escritor                      |30            |C               |MN   |
+------+------+------------------------------+--------------+----------------+-----+
only showing top 5 rows



In [14]:
# Ratings
ratings_schema = StructType([
    StructField('userId', IntegerType(), True),
    StructField('filmId', IntegerType(), True),
    StructField('rating', IntegerType(), True),
    StructField('timestamp', IntegerType(), True),
])

df_ratings = spark.read.csv(ruta_carpeta + '/ratings.dat', sep='::', header=False, schema=ratings_schema)
print('Filas ratings:', df_ratings.count())
df_ratings.show(5, truncate=False)

Filas ratings: 1000209
+------+------+------+---------+
|userId|filmId|rating|timestamp|
+------+------+------+---------+
|1     |1193  |5     |978300760|
|1     |661   |3     |978302109|
|1     |914   |3     |978301968|
|1     |3408  |4     |978300275|
|1     |2355  |5     |978824291|
+------+------+------+---------+
only showing top 5 rows



In [15]:
# Convertir timestamp y unir
ratings_with_date = df_ratings.withColumn('date', F.date_format(F.from_unixtime('timestamp'), 'dd/MM/yyyy')).drop('timestamp')

df_ratings_full = (ratings_with_date
    .join(df_movies_mod, on='filmId', how='left')
    .join(df_users_mod, on='userId', how='left')
)

print('Filas resultantes:', df_ratings_full.count())
df_ratings_full.show(5, truncate=False)

Filas resultantes: 1000209
+------+------+------+----------+-------------------------------+--------------------------------+----+------+------------------------------+--------------+----------------+-----+
|userId|filmId|rating|date      |film                           |genders                         |year|gender|occupation                    |age_group_mean|age_group_letter|state|
+------+------+------+----------+-------------------------------+--------------------------------+----+------+------------------------------+--------------+----------------+-----+
|1     |1193  |5     |31/12/2000|One Flew Over the Cuckoo's Nest|[Drama]                         |1975|F     |Estudiante (Escuela/Instituto)|17            |A               |MI   |
|1     |661   |3     |31/12/2000|James and the Giant Peach      |[Animation, Children's, Musical]|1996|F     |Estudiante (Escuela/Instituto)|17            |A               |MI   |
|1     |914   |3     |31/12/2000|My Fair Lady                   |[Musical