In [1]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row
import pyspark.sql.functions as f

In [2]:
spark = SparkSession.builder.appName('Exam').getOrCreate()

In [3]:
df_comics = spark.read.option("header", True).csv("../resources/input/csv/comics/comics.csv")

In [4]:
df_char_to_cm = spark.read.option("header", True).csv("../resources/input/csv/comics/charactersToComics.csv")

In [5]:
df_characters = spark.read.option("header", True).csv("../resources/input/csv/comics/characters.csv")

## Dataframes

In [6]:
df_comics.show(5)

+-------+--------------------+-----------+--------------------+
|comicID|               title|issueNumber|         description|
+-------+--------------------+-----------+--------------------+
|  16232|Cap Transport (20...|         12|                null|
|  16248|Cap Transport (20...|          9|                null|
|   4990| Halo Preview (2006)|          0|                null|
|  21486|Ultimate X-Men (S...|          9|                null|
|  58634|A Year of Marvels...|          5|It’s Halloween in...|
+-------+--------------------+-----------+--------------------+
only showing top 5 rows



In [7]:
df_comics = df_comics.withColumnRenamed('comicID', 'comic_ID')
df_comics.show(5)

+--------+--------------------+-----------+--------------------+
|comic_ID|               title|issueNumber|         description|
+--------+--------------------+-----------+--------------------+
|   16232|Cap Transport (20...|         12|                null|
|   16248|Cap Transport (20...|          9|                null|
|    4990| Halo Preview (2006)|          0|                null|
|   21486|Ultimate X-Men (S...|          9|                null|
|   58634|A Year of Marvels...|          5|It’s Halloween in...|
+--------+--------------------+-----------+--------------------+
only showing top 5 rows



In [8]:
#df_char_to_cm = df_char_to_cm.withColumnRenamed('comicID', 'comic_ID')
df_char_to_cm.show(5)

+-------+-----------+
|comicID|characterID|
+-------+-----------+
|  16232|    1009220|
|  16232|    1010740|
|  16248|    1009220|
|  16248|    1009471|
|  16248|    1009552|
+-------+-----------+
only showing top 5 rows



In [9]:
df_char_to_cm.show(5)

+-------+-----------+
|comicID|characterID|
+-------+-----------+
|  16232|    1009220|
|  16232|    1010740|
|  16248|    1009220|
|  16248|    1009471|
|  16248|    1009552|
+-------+-----------+
only showing top 5 rows



In [10]:
df_characters.show(5)

+-----------+---------------+
|characterID|           name|
+-----------+---------------+
|    1009220|Captain America|
|    1010740| Winter Soldier|
|    1009471|      Nick Fury|
|    1009552|   S.H.I.E.L.D.|
|    1009228|  Sharon Carter|
+-----------+---------------+
only showing top 5 rows



In [11]:
left_join = df_comics.join(df_char_to_cm, df_comics.comic_ID == df_char_to_cm.comicID, how='left')

In [12]:
second_left_join = left_join.join(df_characters, left_join.characterID == df_characters.characterID, how='left')

In [13]:
second_left_join.show(7)

+--------+--------------------+-----------+-----------+-------+-----------+-----------+---------------+
|comic_ID|               title|issueNumber|description|comicID|characterID|characterID|           name|
+--------+--------------------+-----------+-----------+-------+-----------+-----------+---------------+
|   16232|Cap Transport (20...|         12|       null|  16232|    1010740|    1010740| Winter Soldier|
|   16232|Cap Transport (20...|         12|       null|  16232|    1009220|    1009220|Captain America|
|   16248|Cap Transport (20...|          9|       null|  16248|    1009228|    1009228|  Sharon Carter|
|   16248|Cap Transport (20...|          9|       null|  16248|    1009552|    1009552|   S.H.I.E.L.D.|
|   16248|Cap Transport (20...|          9|       null|  16248|    1009471|    1009471|      Nick Fury|
|   16248|Cap Transport (20...|          9|       null|  16248|    1009220|    1009220|Captain America|
|    4990| Halo Preview (2006)|          0|       null|   null| 

In [14]:
second_left_join.columns

['comic_ID',
 'title',
 'issueNumber',
 'description',
 'comicID',
 'characterID',
 'characterID',
 'name']

In [None]:
second_left_join.write.parquet("../output/parquet/e1/e1.parquet")