In [2]:
# importa bibliotecas e inicia sessão spark
# Referência: https://spark.apache.org/docs/latest/sql-getting-started.html
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("create_parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Carrega dados

In [36]:
from pyspark.sql.types import IntegerType, StringType, StructType, ByteType
from pyspark.sql.functions import from_unixtime

# criando esquema de u_data
# note que o formato do data esta em string
# isso pq este arquivo usa o timestamp unix secondis since 1/1/1970 UTC
# e sera convertido depois
data_schema = StructType() \
      .add("user_id",IntegerType(),True) \
      .add("movie_id",IntegerType(),True) \
      .add("nota",IntegerType(),True) \
      .add("data",StringType(),True) 

df_data = spark.read.csv("u_data.csv", header=False, sep="\t", schema=data_schema)
df_data = df_data.withColumn("date", from_unixtime(df_data["data"]))
# removendo coluna em formato de data unix
df_data = df_data.drop('data')

print("\n Schema")
df_data.printSchema()
print("\n Head")
df_data.show()


 Schema
root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- nota: integer (nullable = true)
 |-- date: string (nullable = true)


 Head
+-------+--------+----+-------------------+
|user_id|movie_id|nota|               date|
+-------+--------+----+-------------------+
|    196|     242|   3|1997-12-04 13:55:49|
|    186|     302|   3|1998-04-04 16:22:22|
|     22|     377|   1|1997-11-07 05:18:36|
|    244|      51|   2|1997-11-27 03:02:03|
|    166|     346|   1|1998-02-02 03:33:16|
|    298|     474|   4|1998-01-07 12:20:06|
|    115|     265|   2|1997-12-03 15:51:28|
|    253|     465|   5|1998-04-03 15:34:27|
|    305|     451|   3|1998-02-01 07:20:17|
|      6|      86|   3|1997-12-31 19:16:53|
|     62|     257|   2|1997-11-12 20:07:14|
|    286|    1014|   5|1997-11-17 13:38:45|
|    200|     222|   5|1997-10-05 06:05:40|
|    210|      40|   3|1998-03-27 18:59:54|
|    224|      29|   3|1998-02-21 21:40:57|
|    303|     785|   3|1997-11-1

In [28]:
item_schema = StructType() \
    .add("movie_id",IntegerType(),True) \
    .add("movie_title",StringType(),True) \
    .add("release_data",StringType(),True) \
    .add("video_release_date",StringType(),True) \
    .add("imdb_url",StringType(),True) \
    .add("unknow",StringType(),True) \
    .add("action",ByteType(),True) \
    .add("adventure",ByteType(),True) \
    .add("animation",ByteType(),True) \
    .add("children",ByteType(),True) \
    .add("comedy",ByteType(),True) \
    .add("crime",ByteType(),True) \
    .add("documentary",ByteType(),True) \
    .add("drama",ByteType(),True) \
    .add("fantasy",ByteType(),True) \
    .add("film-noir",ByteType(),True) \
    .add("horror",ByteType(),True) \
    .add("musical",ByteType(),True) \
    .add("mystery",ByteType(),True) \
    .add("romance",ByteType(),True) \
    .add("sci-fi",ByteType(),True) \
    .add("thriller",ByteType(),True) \
    .add("war",ByteType(),True) \
    .add("western",ByteType(),True)

df_item = spark.read.csv("u_item.csv", header=False, sep="|", schema=item_schema)

# removendo coluna vazia/desconhecida
df_item = df_item.drop('unknow')

print("\n Schema")
df_item.printSchema()
print("\n Head")
df_item.show()


 Schema
root
 |-- movie_id: integer (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- release_data: string (nullable = true)
 |-- video_release_date: string (nullable = true)
 |-- imdb_url: string (nullable = true)
 |-- action: byte (nullable = true)
 |-- adventure: byte (nullable = true)
 |-- animation: byte (nullable = true)
 |-- children: byte (nullable = true)
 |-- comedy: byte (nullable = true)
 |-- crime: byte (nullable = true)
 |-- documentary: byte (nullable = true)
 |-- drama: byte (nullable = true)
 |-- fantasy: byte (nullable = true)
 |-- film-noir: byte (nullable = true)
 |-- horror: byte (nullable = true)
 |-- musical: byte (nullable = true)
 |-- mystery: byte (nullable = true)
 |-- romance: byte (nullable = true)
 |-- sci-fi: byte (nullable = true)
 |-- thriller: byte (nullable = true)
 |-- war: byte (nullable = true)
 |-- western: byte (nullable = true)


 Head
+--------+--------------------+------------+------------------+--------------------+------+---

In [27]:
user_schema = StructType() \
    .add("user_id",IntegerType(),True) \
    .add("age",IntegerType(),True) \
    .add("gender",StringType(),True) \
    .add("occupation",StringType(),True) \
    .add("zip_code",StringType(),True) 

df_user = spark.read.csv("u_user.csv", header=False, sep="|", schema=user_schema)
print("\n Schema")
df_user.printSchema()
print("\n Head")
df_user.show()


 Schema
root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip_code: string (nullable = true)


 Head
+-------+---+------+-------------+--------+
|user_id|age|gender|   occupation|zip_code|
+-------+---+------+-------------+--------+
|      1| 24|     M|   technician|   85711|
|      2| 53|     F|        other|   94043|
|      3| 23|     M|       writer|   32067|
|      4| 24|     M|   technician|   43537|
|      5| 33|     F|        other|   15213|
|      6| 42|     M|    executive|   98101|
|      7| 57|     M|administrator|   91344|
|      8| 36|     M|administrator|   05201|
|      9| 29|     M|      student|   01002|
|     10| 53|     M|       lawyer|   90703|
|     11| 39|     F|        other|   30329|
|     12| 28|     F|        other|   06405|
|     13| 47|     M|     educator|   29206|
|     14| 45|     M|    scientist|   55106|
|     15| 49|     F|     educator|

## Agrega os dados

In [40]:
# juntando u_data e u_user por id
new_data = df_data.join(df_user, on=['user_id'], how='left')
# juntando por id do filme
new_data = new_data.join(df_item, on=["movie_id"], how='left')

print("\n Schema")
new_data.printSchema()
print("\n Head")
new_data.show()


 Schema
root
 |-- movie_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- nota: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- release_data: string (nullable = true)
 |-- video_release_date: string (nullable = true)
 |-- imdb_url: string (nullable = true)
 |-- action: byte (nullable = true)
 |-- adventure: byte (nullable = true)
 |-- animation: byte (nullable = true)
 |-- children: byte (nullable = true)
 |-- comedy: byte (nullable = true)
 |-- crime: byte (nullable = true)
 |-- documentary: byte (nullable = true)
 |-- drama: byte (nullable = true)
 |-- fantasy: byte (nullable = true)
 |-- film-noir: byte (nullable = true)
 |-- horror: byte (nullable = true)
 |-- musical: byte (nullable = true)
 |-- mystery: byte (nullable = true)
 |-- romance: 

## Salva os dados

In [41]:
# salvando os dados em parquet
new_data.write.parquet("movies_parquet")