In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
            .master('local[*]') \
            .appName('spark') \
            .getOrCreate()

In [3]:
movies = spark.read.csv('data/title.basics.tsv', sep='\t', header=True, nullValue='\\N')
movies = movies.select('tconst') \
            .filter(movies.titleType == 'movie') \
            .filter(movies.isAdult == 0)

In [4]:
names = spark.read.csv('data/name.basics.tsv', sep='\t', header=True, nullValue='\\N')
names = names.select('nconst','primaryName')

In [5]:
df = spark.read.csv('data/title.principals.tsv', sep='\t', header=True, nullValue='\\N')

In [6]:
df.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- ordering: string (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



In [7]:
df.show(5, False)

+---------+--------+---------+---------------+-----------------------+----------+
|tconst   |ordering|nconst   |category       |job                    |characters|
+---------+--------+---------+---------------+-----------------------+----------+
|tt0000001|1       |nm1588970|self           |null                   |["Self"]  |
|tt0000001|2       |nm0005690|director       |null                   |null      |
|tt0000001|3       |nm0374658|cinematographer|director of photography|null      |
|tt0000002|1       |nm0721526|director       |null                   |null      |
|tt0000002|2       |nm1335271|composer       |null                   |null      |
+---------+--------+---------+---------------+-----------------------+----------+
only showing top 5 rows



Limpeza dos Dados

In [8]:
df = df.select('tconst','ordering','nconst','category') \
       .filter(df.ordering <= 3) \
       .withColumnRenamed('tconst','movie_id')

In [9]:
movies_principals = movies.join(df, movies.tconst == df.movie_id, how='inner')\
                          .select('tconst','ordering','nconst','category')\
                          .withColumnRenamed('nconst','name_id')

movies_principals.show(10, False)

+---------+--------+---------+--------+
|tconst   |ordering|name_id  |category|
+---------+--------+---------+--------+
|tt0000630|1       |nm0624446|actress |
|tt0000630|2       |nm0143333|director|
|tt0000630|3       |nm0000636|writer  |
|tt0000675|1       |nm0194088|director|
|tt0000675|2       |nm0148859|writer  |
|tt0000862|1       |nm5289829|actor   |
|tt0000862|2       |nm0264569|actress |
|tt0000862|3       |nm0386036|actor   |
|tt0000941|1       |nm0034453|actor   |
|tt0000941|2       |nm0140054|actor   |
+---------+--------+---------+--------+
only showing top 10 rows



In [10]:
del movies
del df

In [11]:
movies_principals = movies_principals.join(names, names.nconst == movies_principals.name_id, how='inner')

In [12]:
movies_principals.show(10, False)

+---------+--------+---------+---------------+---------+------------+
|tconst   |ordering|name_id  |category       |nconst   |primaryName |
+---------+--------+---------+---------------+---------+------------+
|tt5752346|1       |nm0000004|archive_footage|nm0000004|John Belushi|
|tt0082801|1       |nm0000004|actor          |nm0000004|John Belushi|
|tt0077975|1       |nm0000004|actor          |nm0000004|John Belushi|
|tt0082200|1       |nm0000004|actor          |nm0000004|John Belushi|
|tt0078723|1       |nm0000004|actor          |nm0000004|John Belushi|
|tt0080455|1       |nm0000004|actor          |nm0000004|John Belushi|
|tt0079660|3       |nm0000004|actor          |nm0000004|John Belushi|
|tt0020875|3       |nm0000012|actress        |nm0000012|Bette Davis |
|tt0022735|2       |nm0000012|actress        |nm0000012|Bette Davis |
|tt0023394|3       |nm0000012|actress        |nm0000012|Bette Davis |
+---------+--------+---------+---------------+---------+------------+
only showing top 10 

In [13]:
del names

In [14]:
movies_principals = movies_principals.select('tconst','nconst','primaryName','category','ordering')

In [15]:
movies_principals.show(10, False)

+---------+---------+------------+---------------+--------+
|tconst   |nconst   |primaryName |category       |ordering|
+---------+---------+------------+---------------+--------+
|tt5752346|nm0000004|John Belushi|archive_footage|1       |
|tt0082801|nm0000004|John Belushi|actor          |1       |
|tt0077975|nm0000004|John Belushi|actor          |1       |
|tt0082200|nm0000004|John Belushi|actor          |1       |
|tt0078723|nm0000004|John Belushi|actor          |1       |
|tt0080455|nm0000004|John Belushi|actor          |1       |
|tt0079660|nm0000004|John Belushi|actor          |3       |
|tt0020875|nm0000012|Bette Davis |actress        |3       |
|tt0022735|nm0000012|Bette Davis |actress        |2       |
|tt0023394|nm0000012|Bette Davis |actress        |3       |
+---------+---------+------------+---------------+--------+
only showing top 10 rows



In [16]:
movies_principals.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- category: string (nullable = true)
 |-- ordering: string (nullable = true)



In [17]:
movies_principals = movies_principals.withColumn('ordering', movies_principals.ordering.cast('int'))

Armazenamento dos Dados

In [18]:
import psycopg2

In [19]:
DATABASE_CONFIG = {
    'host':'localhost',
    'port': '5432',
    'database':'sistema-de-recomendacao',
    'user' :'flask',
    'password':'password'
}

In [20]:
conn = psycopg2.connect(**DATABASE_CONFIG)
cur = conn.cursor()

In [21]:
cur.execute(f'''
            DROP TABLE IF EXISTS principals_movie;
            
            CREATE TABLE principals_movie(
                id INT GENERATED ALWAYS AS IDENTITY,
                tconst varchar (50) NOT NULL, 
                nconst varchar (50) NOT NULL,
                primaryName text NOT NULL, 
                category varchar (255) NOT NULL,
                ordering integer, 
                PRIMARY KEY (id)
            );''')

In [22]:
data = [tuple(movie) for movie in movies_principals.collect()]
template = ','.join(['%s'] * len(data))

In [23]:
columns = ','.join(movies_principals.columns)

In [24]:
query = f'INSERT INTO principals_movie ({columns}) VALUES {template}'
cur.execute(query, data)

In [25]:
conn.commit()

In [26]:
spark.stop()