In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from os.path import abspath

In [2]:
import findspark
findspark.init('/opt/apache-spark/')

## Setup

In [3]:
warehouse_locarion = abspath('../base/spark_warehouse')

In [4]:
spark = (
    SparkSession
    .builder
    .config("spark.sql.warehouse.dir", warehouse_locarion)
    .enableHiveSupport()
    .getOrCreate()
)

21/11/05 20:20:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
df_titles = spark.read.format('csv').load('base/title_basics.tsv', header=True, sep='\t')

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

## database e catalog

In [4]:
spark.catalog

<pyspark.sql.catalog.Catalog at 0x7fa852168ee0>

In [5]:
spark.catalog.clearCache()

In [6]:
spark.catalog.listDatabases()

[Database(name='default', description='default database', locationUri='file:/home/geovanileitao/Documentos/bootcamp-edc-igti-mod3/spark-warehouse')]

In [7]:
spark.catalog.listFunctions()

[Function(name='!', description=None, className='org.apache.spark.sql.catalyst.expressions.Not', isTemporary=True),
 Function(name='%', description=None, className='org.apache.spark.sql.catalyst.expressions.Remainder', isTemporary=True),
 Function(name='&', description=None, className='org.apache.spark.sql.catalyst.expressions.BitwiseAnd', isTemporary=True),
 Function(name='*', description=None, className='org.apache.spark.sql.catalyst.expressions.Multiply', isTemporary=True),
 Function(name='+', description=None, className='org.apache.spark.sql.catalyst.expressions.Add', isTemporary=True),
 Function(name='-', description=None, className='org.apache.spark.sql.catalyst.expressions.Subtract', isTemporary=True),
 Function(name='/', description=None, className='org.apache.spark.sql.catalyst.expressions.Divide', isTemporary=True),
 Function(name='<', description=None, className='org.apache.spark.sql.catalyst.expressions.LessThan', isTemporary=True),
 Function(name='<=', description=None, cl

## Tabelas e views

In [17]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [7]:
df_titles_sample = df_titles.sample(fraction=0.1)

### criando menaged tables

In [48]:
df_titles_sample.write.saveAsTable('title_basics_managed')

21/11/05 19:13:34 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
                                                                                

### criando unmanaged tables

In [50]:
df_titles_sample.write.option('path', 'spark_warehouse_unmanaged/').saveAsTable('title_basics_unmanaged')

21/11/05 19:13:46 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
                                                                                

In [54]:
spark.catalog.listTables()

[Table(name='title_basics_managed', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='title_basics_unmanaged', database='default', description=None, tableType='EXTERNAL', isTemporary=False)]

### dropando tabelas

In [None]:
spark.sql("DROP TABLE title_basics_unmanaged") 

## Views

### criando views

In [60]:
df_titles_sample.createOrReplaceTempView('title_basics_view')

In [61]:
spark.catalog.listTables()

[Table(name='title_basics_managed', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='title_basics_unmanaged', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='title_basics_view', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

### deletando views

In [40]:
spark.catalog.dropTempView('title_basics_view')

In [56]:
spark.catalog.listTables()

[Table(name='title_basics_managed', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='title_basics_unmanaged', database='default', description=None, tableType='EXTERNAL', isTemporary=False)]

## Acessando a interface de queries

In [57]:
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,default,title_basics_managed,False
1,default,title_basics_unmanaged,False


In [58]:
spark.sql("select * from title_basics_managed limit 5").toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000025,short,The Oxford and Cambridge University Boat Race,The Oxford and Cambridge University Boat Race,0,1895,\N,\N,"News,Short,Sport"
1,tt0000039,short,Barnet Horse Fair,Barnet Horse Fair,0,1896,\N,\N,Short
2,tt0000041,short,Bataille de neige,Bataille de neige,0,1897,\N,1,"Comedy,Documentary,Short"
3,tt0000073,short,Effets de mer sur les rochers,Effets de mer sur les rochers,0,1896,\N,\N,"Documentary,Short"
4,tt0000076,short,Exit of Rip and the Dwarf,Exit of Rip and the Dwarf,0,1896,\N,1,"Drama,Short"


In [62]:
spark.sql("select * from title_basics_view limit 5").toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000025,short,The Oxford and Cambridge University Boat Race,The Oxford and Cambridge University Boat Race,0,1895,\N,\N,"News,Short,Sport"
1,tt0000039,short,Barnet Horse Fair,Barnet Horse Fair,0,1896,\N,\N,Short
2,tt0000041,short,Bataille de neige,Bataille de neige,0,1897,\N,1,"Comedy,Documentary,Short"
3,tt0000073,short,Effets de mer sur les rochers,Effets de mer sur les rochers,0,1896,\N,\N,"Documentary,Short"
4,tt0000076,short,Exit of Rip and the Dwarf,Exit of Rip and the Dwarf,0,1896,\N,1,"Drama,Short"


## Persistindo na memoria

In [8]:
df_titles_sample.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000045,short,Les blanchisseuses,Les blanchisseuses,0,1896,\N,\N,Short
1,tt0000075,short,The Conjuring of a Woman at the House of Rober...,Escamotage d'une dame au théâtre Robert Houdin,0,1896,\N,1,"Horror,Short"
2,tt0000107,short,Naval Review at Cherburg,Revue navale à Cherbourg,0,1896,\N,\N,"Documentary,Short"
3,tt0000113,short,Rip's Twenty Years' Sleep,Rip's Twenty Years' Sleep,0,1896,\N,0,"Drama,Short"
4,tt0000114,short,French Regiment Going to the Parade,Le régiment,0,1896,\N,\N,"Documentary,Short"


In [9]:
df_ratings = spark.read.format('csv').load('base/title_ratings.tsv', header=True, sep='\t')

In [10]:
df_ratings.limit(5).toPandas()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,1809
1,tt0000002,6.0,233
2,tt0000003,6.5,1560
3,tt0000004,6.1,152
4,tt0000005,6.2,2383


In [11]:
int_cols = ['startYear', 'endYear', 'runtimeMinutes', 'isAdult']

for c in int_cols:
    df_titles_sample = (
        df_titles_sample
        .withColumn(c, f.col(c).cast('int'))
    )

# limpa string
str_cols = ['primaryTitle', 'originalTitle', 'titleType']

for c in str_cols:
    df_titles_sample = (
        df_titles_sample
        .withColumn(c, f.initcap(f.trim(f.col(c))))
    )

In [12]:
df_join = (
    df_titles_sample
    .replace('\\N', None)
    .withColumn('genres', f.split(f.col('genres'), ','))
    .join(df_ratings, 'tconst', 'left')
)

In [13]:
df_final = (
    df_join
    .withColumn('genres', f.explode(f.col('genres')))
    .groupBy('titleType')
    .pivot('genres')
    .agg(f.round(f.mean('averageRating')))
    .fillna(0)
)

                                                                                

In [15]:
df_final.limit(5).toPandas()

                                                                                

Unnamed: 0,titleType,Action,Adult,Adventure,Animation,Biography,Comedy,Crime,Documentary,Drama,...,News,Reality-TV,Romance,Sci-Fi,Short,Sport,Talk-Show,Thriller,War,Western
0,Tvepisode,7.0,7.0,7.0,7.0,7.0,7.0,8.0,8.0,8.0,...,7.0,7.0,8.0,7.0,7.0,7.0,7.0,8.0,8.0,8.0
1,Video,6.0,7.0,6.0,7.0,7.0,6.0,6.0,7.0,6.0,...,6.0,7.0,6.0,6.0,7.0,7.0,7.0,5.0,7.0,6.0
2,Videogame,7.0,3.0,7.0,7.0,8.0,7.0,7.0,7.0,7.0,...,0.0,0.0,7.0,7.0,0.0,7.0,0.0,7.0,7.0,6.0
3,Tvminiseries,7.0,5.0,7.0,7.0,7.0,7.0,7.0,8.0,7.0,...,7.0,6.0,7.0,7.0,7.0,7.0,6.0,7.0,7.0,7.0
4,Tvmovie,6.0,0.0,6.0,7.0,7.0,6.0,6.0,7.0,7.0,...,7.0,6.0,6.0,6.0,0.0,7.0,9.0,6.0,7.0,6.0


In [14]:
df_final.explain('formatted')

== Physical Plan ==
HashAggregate (19)
+- Exchange (18)
   +- HashAggregate (17)
      +- * HashAggregate (16)
         +- Exchange (15)
            +- * HashAggregate (14)
               +- Generate (13)
                  +- * Project (12)
                     +- SortMergeJoin LeftOuter (11)
                        :- * Sort (6)
                        :  +- Exchange (5)
                        :     +- * Project (4)
                        :        +- * Filter (3)
                        :           +- * Sample (2)
                        :              +- Scan csv  (1)
                        +- * Sort (10)
                           +- Exchange (9)
                              +- * Filter (8)
                                 +- Scan csv  (7)


(1) Scan csv 
Output [3]: [tconst#16, titleType#17, genres#24]
Batched: false
Location: InMemoryFileIndex [file:/home/geovanileitao/Documentos/bootcamp-edc-igti-mod3/base/title_basics.tsv]
ReadSchema: struct<tconst:string,titleType:string,ge

21/11/05 20:56:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [16]:
%%time
df_final.limit(5).toPandas()

                                                                                

CPU times: user 46.9 ms, sys: 13.5 ms, total: 60.4 ms
Wall time: 10.5 s


Unnamed: 0,titleType,Action,Adult,Adventure,Animation,Biography,Comedy,Crime,Documentary,Drama,...,News,Reality-TV,Romance,Sci-Fi,Short,Sport,Talk-Show,Thriller,War,Western
0,Tvepisode,7.0,7.0,7.0,7.0,7.0,7.0,8.0,8.0,8.0,...,7.0,7.0,8.0,7.0,7.0,7.0,7.0,8.0,8.0,8.0
1,Video,6.0,7.0,6.0,7.0,7.0,6.0,6.0,7.0,6.0,...,6.0,7.0,6.0,6.0,7.0,7.0,7.0,5.0,7.0,6.0
2,Videogame,7.0,3.0,7.0,7.0,8.0,7.0,7.0,7.0,7.0,...,0.0,0.0,7.0,7.0,0.0,7.0,0.0,7.0,7.0,6.0
3,Tvminiseries,7.0,5.0,7.0,7.0,7.0,7.0,7.0,8.0,7.0,...,7.0,6.0,7.0,7.0,7.0,7.0,6.0,7.0,7.0,7.0
4,Tvmovie,6.0,0.0,6.0,7.0,7.0,6.0,6.0,7.0,7.0,...,7.0,6.0,6.0,6.0,0.0,7.0,9.0,6.0,7.0,6.0


In [17]:
df_final.cache()
df_final.count()

                                                                                

10

In [18]:
%%time
df_final.limit(5).toPandas()

CPU times: user 34.3 ms, sys: 2.97 ms, total: 37.3 ms
Wall time: 462 ms


Unnamed: 0,titleType,Action,Adult,Adventure,Animation,Biography,Comedy,Crime,Documentary,Drama,...,News,Reality-TV,Romance,Sci-Fi,Short,Sport,Talk-Show,Thriller,War,Western
0,Tvepisode,7.0,7.0,7.0,7.0,7.0,7.0,8.0,8.0,8.0,...,7.0,7.0,8.0,7.0,7.0,7.0,7.0,8.0,8.0,8.0
1,Video,6.0,7.0,6.0,7.0,7.0,6.0,6.0,7.0,6.0,...,6.0,7.0,6.0,6.0,7.0,7.0,7.0,5.0,7.0,6.0
2,Videogame,7.0,3.0,7.0,7.0,8.0,7.0,7.0,7.0,7.0,...,0.0,0.0,7.0,7.0,0.0,7.0,0.0,7.0,7.0,6.0
3,Tvminiseries,7.0,5.0,7.0,7.0,7.0,7.0,7.0,8.0,7.0,...,7.0,6.0,7.0,7.0,7.0,7.0,6.0,7.0,7.0,7.0
4,Tvmovie,6.0,0.0,6.0,7.0,7.0,6.0,6.0,7.0,7.0,...,7.0,6.0,6.0,6.0,0.0,7.0,9.0,6.0,7.0,6.0


## Retirando os dados de memoria

In [19]:
df_final.unpersist()

DataFrame[titleType: string, Action: double, Adult: double, Adventure: double, Animation: double, Biography: double, Comedy: double, Crime: double, Documentary: double, Drama: double, Family: double, Fantasy: double, Film-Noir: double, Game-Show: double, History: double, Horror: double, Music: double, Musical: double, Mystery: double, News: double, Reality-TV: double, Romance: double, Sci-Fi: double, Short: double, Sport: double, Talk-Show: double, Thriller: double, War: double, Western: double]

In [20]:
spark

## Estrategias de particionamento

### Bucketing

In [21]:
df_titles.write.format('parquet').bucketBy(5, 'tconst').saveAsTable('title_basics')

21/11/05 21:19:30 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
21/11/05 21:19:30 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
21/11/05 21:19:34 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
21/11/05 21:19:34 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore geovanileitao@127.0.1.1
21/11/05 21:19:34 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
21/11/05 21:19:48 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:19:55 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:19:55 WARN MemoryManager: Total allocation exceeds 95,00% (1

In [22]:
df_ratings.write.format('parquet').bucketBy(5, 'tconst').saveAsTable('title_ratings')

                                                                                

In [23]:
df_titles_bucket = spark.sql('select * from title_basics')

In [24]:
df_ratings_bucket = spark.sql('select * from title_ratings')

In [25]:
%%time 
df_titles.join(df_ratings, 'tconst').count()



CPU times: user 8.35 ms, sys: 833 µs, total: 9.18 ms
Wall time: 5.1 s


                                                                                

1182639

In [27]:
%%time 
df_titles_bucket.join(df_ratings_bucket, 'tconst').count()

[Stage 80:>                                                         (0 + 8) / 8]

CPU times: user 4.21 ms, sys: 546 µs, total: 4.76 ms
Wall time: 2.9 s




1182639

### Particionando por colunas

In [30]:
df_titles.filter('titleType = "short"').explain('formatted')

== Physical Plan ==
* Filter (2)
+- Scan csv  (1)


(1) Scan csv 
Output [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#20, startYear#21, endYear#22, runtimeMinutes#23, genres#24]
Batched: false
Location: InMemoryFileIndex [file:/home/geovanileitao/Documentos/bootcamp-edc-igti-mod3/base/title_basics.tsv]
PushedFilters: [IsNotNull(titleType), EqualTo(titleType,short)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) Filter [codegen id : 1]
Input [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#20, startYear#21, endYear#22, runtimeMinutes#23, genres#24]
Condition : (isnotnull(titleType#17) AND (titleType#17 = short))




In [35]:
(
    df_titles
    .write
    .format('parquet')
    .partitionBy('titleType')
    .save('../base/imbd/')
)

21/11/05 21:35:05 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:35:05 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:35:05 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:35:06 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:35:06 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:35:06 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:35:07 WARN MemoryManager: Total allocation exceeds 95,00% 

In [36]:
df_titles.filter('titleType = "short"').explain('formatted')

== Physical Plan ==
* Filter (2)
+- Scan csv  (1)


(1) Scan csv 
Output [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#20, startYear#21, endYear#22, runtimeMinutes#23, genres#24]
Batched: false
Location: InMemoryFileIndex [file:/home/geovanileitao/Documentos/bootcamp-edc-igti-mod3/base/title_basics.tsv]
PushedFilters: [IsNotNull(titleType), EqualTo(titleType,short)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) Filter [codegen id : 1]
Input [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#20, startYear#21, endYear#22, runtimeMinutes#23, genres#24]
Condition : (isnotnull(titleType#17) AND (titleType#17 = short))




In [37]:
df_further_partitions = spark.read.parquet('../base/imbd/')

In [38]:
(
    df_further_partitions
    .filter('titleType = "short"')
    .filter('genres = "Action"')
    .explain('formatted')
)

== Physical Plan ==
* Filter (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [9]: [tconst#52666, primaryTitle#52667, originalTitle#52668, isAdult#52669, startYear#52670, endYear#52671, runtimeMinutes#52672, genres#52673, titleType#52674]
Batched: true
Location: InMemoryFileIndex [file:/home/geovanileitao/Documentos/base/imbd]
PartitionFilters: [isnotnull(titleType#52674), (titleType#52674 = short)]
PushedFilters: [IsNotNull(genres), EqualTo(genres,Action)]
ReadSchema: struct<tconst:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) ColumnarToRow [codegen id : 1]
Input [9]: [tconst#52666, primaryTitle#52667, originalTitle#52668, isAdult#52669, startYear#52670, endYear#52671, runtimeMinutes#52672, genres#52673, titleType#52674]

(3) Filter [codegen id : 1]
Input [9]: [tconst#52666, primaryTitle#52667, originalTitle#52668, isAdult#52669, startYear#52670, endYear#52671, runti

## Reparticionando Dataframes

In [39]:
df_titles.limit(5).toPandas()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [40]:
df_titles.rdd.getNumPartitions()

8

In [41]:
df_titles.repartition(50).rdd.getNumPartitions()

50

In [None]:
df_titles.coalesce()

In [42]:
(
    df_titles
    .repartition(50)
    .write
    .parquet('../base/imbd/df_titles_repartioned')
)

21/11/05 21:51:41 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:51:43 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:51:45 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:51:45 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:51:47 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:51:47 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
21/11/05 21:51:47 WARN MemoryManager: Total allocation exceeds 95,00% 

In [43]:
(
    df_titles
    .coalesce(5)
    .write
    .parquet('../base/imbd/df_titles_coalesced')
)

                                                                                

## escolhendo o melhor tipo de join

In [45]:
df_titles.join(df_ratings.hint('merge'), 'tconst').explain('formatted')

== Physical Plan ==
* Project (10)
+- * SortMergeJoin Inner (9)
   :- * Sort (4)
   :  +- Exchange (3)
   :     +- * Filter (2)
   :        +- Scan csv  (1)
   +- * Sort (8)
      +- Exchange (7)
         +- * Filter (6)
            +- Scan csv  (5)


(1) Scan csv 
Output [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#20, startYear#21, endYear#22, runtimeMinutes#23, genres#24]
Batched: false
Location: InMemoryFileIndex [file:/home/geovanileitao/Documentos/bootcamp-edc-igti-mod3/base/title_basics.tsv]
PushedFilters: [IsNotNull(tconst)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) Filter [codegen id : 1]
Input [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#20, startYear#21, endYear#22, runtimeMinutes#23, genres#24]
Condition : isnotnull(tconst#16)

(3) Exchange
Input [9]: [tconst#16, titleType#17, prim

In [46]:
df_titles.join(df_ratings.hint('shuffle_hash'), 'tconst').explain('formatted')

== Physical Plan ==
* Project (8)
+- * ShuffledHashJoin Inner BuildRight (7)
   :- Exchange (3)
   :  +- * Filter (2)
   :     +- Scan csv  (1)
   +- Exchange (6)
      +- * Filter (5)
         +- Scan csv  (4)


(1) Scan csv 
Output [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#20, startYear#21, endYear#22, runtimeMinutes#23, genres#24]
Batched: false
Location: InMemoryFileIndex [file:/home/geovanileitao/Documentos/bootcamp-edc-igti-mod3/base/title_basics.tsv]
PushedFilters: [IsNotNull(tconst)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) Filter [codegen id : 1]
Input [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#20, startYear#21, endYear#22, runtimeMinutes#23, genres#24]
Condition : isnotnull(tconst#16)

(3) Exchange
Input [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#

In [47]:
df_titles.join(df_ratings.hint('broadcast'), 'tconst').explain('formatted')

== Physical Plan ==
* Project (7)
+- * BroadcastHashJoin Inner BuildRight (6)
   :- * Filter (2)
   :  +- Scan csv  (1)
   +- BroadcastExchange (5)
      +- * Filter (4)
         +- Scan csv  (3)


(1) Scan csv 
Output [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#20, startYear#21, endYear#22, runtimeMinutes#23, genres#24]
Batched: false
Location: InMemoryFileIndex [file:/home/geovanileitao/Documentos/bootcamp-edc-igti-mod3/base/title_basics.tsv]
PushedFilters: [IsNotNull(tconst)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genres:string>

(2) Filter [codegen id : 2]
Input [9]: [tconst#16, titleType#17, primaryTitle#18, originalTitle#19, isAdult#20, startYear#21, endYear#22, runtimeMinutes#23, genres#24]
Condition : isnotnull(tconst#16)

(3) Scan csv 
Output [3]: [tconst#59, averageRating#60, numVotes#61]
Batched: false
Location: InMemoryFileIn