In [1]:
import pyspark
from pyspark.sql import SparkSession,types
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
credentials_location = '/Users/iamraphson/.google/credentials/dezoomcamp-2024.json'

In [3]:
conf = SparkConf() \
    .setMaster("spark://Oluseguns-MacBook-Pro.local:7077") \
    .setAppName('test') \
    .set('spark.jars', 'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar') \
    .set('spark.hadoop.google.cloud.auth.service.account.enable', 'true') \
    .set('spark.hadoop.google.cloud.auth.service.account.json.keyfile', credentials_location)

In [None]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

24/04/03 20:46:33 WARN Utils: Your hostname, Oluseguns-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.109 instead (on interface en0)
24/04/03 20:46:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/03 20:46:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/03 20:46:40 WARN TransportClientFactory: DNS resolution failed for spark-master:7077 took 5001 ms
24/04/03 20:46:40 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master spark-master:7077
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitRe

In [5]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [6]:
name_basics_schema = types.StructType([
    types.StructField('nconst',types.StringType(),True),
    types.StructField('primaryName',types.StringType(),True),
    types.StructField('birthYear',types.IntegerType(),True),
    types.StructField('deathYear',types.IntegerType(),True),
    types.StructField('primaryProfession',types.StringType(),True),
    types.StructField('knownForTitles',types.StringType(),True)
])

name_basics_df = spark.read.option("delimiter", "\t") \
    .option("header", "true") \
    .csv('gs://imdb_datalake_radiant-gateway-412001/raws/name.basics.tsv.gz', schema=name_basics_schema)

In [7]:
name_basics_df.schema

StructType([StructField('nconst', StringType(), True), StructField('primaryName', StringType(), True), StructField('birthYear', IntegerType(), True), StructField('deathYear', IntegerType(), True), StructField('primaryProfession', StringType(), True), StructField('knownForTitles', StringType(), True)])

In [8]:
name_basics_df.show()

+---------+-------------------+---------+---------+--------------------+--------------------+
|   nconst|        primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+-------------------+---------+---------+--------------------+--------------------+
|nm0000001|       Fred Astaire|     1899|     1987|actor,miscellaneo...|tt0072308,tt00504...|
|nm0000002|      Lauren Bacall|     1924|     2014|actress,soundtrac...|tt0037382,tt00752...|
|nm0000003|    Brigitte Bardot|     1934|     NULL|actress,music_dep...|tt0057345,tt00491...|
|nm0000004|       John Belushi|     1949|     1982|actor,writer,musi...|tt0072562,tt00779...|
|nm0000005|     Ingmar Bergman|     1918|     2007|writer,director,a...|tt0050986,tt00839...|
|nm0000006|     Ingrid Bergman|     1915|     1982|actress,producer,...|tt0034583,tt00368...|
|nm0000007|    Humphrey Bogart|     1899|     1957|actor,producer,mi...|tt0034583,tt00425...|
|nm0000008|      Marlon Brando|     1924|     2004|actor,dir

In [9]:
name_basics_df = name_basics_df.withColumnRenamed('primaryName', 'primary_name') \
    .withColumnRenamed('birthYear', 'birth_year') \
    .withColumnRenamed('deathYear', 'death_year') \
    .withColumnRenamed('primaryProfession', 'primary_profession') \
    .withColumnRenamed('knownForTitles', 'known_for_titles')

In [10]:
name_basics_df.show()

+---------+-------------------+----------+----------+--------------------+--------------------+
|   nconst|       primary_name|birth_year|death_year|  primary_profession|    known_for_titles|
+---------+-------------------+----------+----------+--------------------+--------------------+
|nm0000001|       Fred Astaire|      1899|      1987|actor,miscellaneo...|tt0072308,tt00504...|
|nm0000002|      Lauren Bacall|      1924|      2014|actress,soundtrac...|tt0037382,tt00752...|
|nm0000003|    Brigitte Bardot|      1934|      NULL|actress,music_dep...|tt0057345,tt00491...|
|nm0000004|       John Belushi|      1949|      1982|actor,writer,musi...|tt0072562,tt00779...|
|nm0000005|     Ingmar Bergman|      1918|      2007|writer,director,a...|tt0050986,tt00839...|
|nm0000006|     Ingrid Bergman|      1915|      1982|actress,producer,...|tt0034583,tt00368...|
|nm0000007|    Humphrey Bogart|      1899|      1957|actor,producer,mi...|tt0034583,tt00425...|
|nm0000008|      Marlon Brando|      192

In [11]:
name_basics_df.count()

                                                                                

13387604

In [12]:
name_basics_df.repartition(10).write.parquet('gs://imdb_datalake_radiant-gateway-412001/pq/name_basics/', mode='overwrite')

24/04/03 20:44:55 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/04/03 20:44:55 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/04/03 20:44:55 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/04/03 20:44:59 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
	at org.apache.parquet.column.values.dictionary.IntList.allocateSlab(IntList.java:103)
	at org.apache.parquet.column.values.dictionary.IntList.add(IntList.java:126)
	at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.writeBytes(DictionaryValuesWriter.java:254)
	at org.apache.parquet.column.values.fallback.FallbackValuesWriter.writeBytes(FallbackValuesWriter.java:172)
	at org.apache.parquet

Py4JJavaError: An error occurred while calling o139.parquet.
: org.apache.spark.SparkException: Job 5 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1251)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1251)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:3087)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$stop$3(DAGScheduler.scala:2973)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2973)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2263)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2216)
	at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2203)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


24/04/03 20:45:10 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
24/04/03 20:45:10 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/04/03 20:45:10 WARN FileOutputCommitter: Could not delete gs://imdb_datalake_radiant-gateway-412001/pq/name_basics/_temporary/0/_temporary/attempt_20240403204454877661893701091549_0007_m_000008_13
24/04/03 20:45:10 ERROR Utils: Uncaught exception in thread Executor task launch worker for task 8.0 in stage 7.0 (TID 13)
java.lang.NullPointerException
	at org.apache.spark.scheduler.Task.$anonfun$run$3(Task.scala:146)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375)
	at org.apache.spark.scheduler.Task.run(Task.scala:144)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkError

In [6]:
tital_ratings_df = spark.read.option("delimiter", "\t") \
    .option("header", "true") \
    .csv('gs://imdb_datalake_radiant-gateway-412001/raws/title.ratings.tsv.gz')

In [7]:
tital_ratings_df.schema

StructType([StructField('tconst', StringType(), True), StructField('averageRating', StringType(), True), StructField('numVotes', StringType(), True)])

In [8]:
tital_ratings_df.show()

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    2041|
|tt0000002|          5.7|     272|
|tt0000003|          6.5|    1994|
|tt0000004|          5.4|     178|
|tt0000005|          6.2|    2753|
|tt0000006|          5.0|     183|
|tt0000007|          5.4|     854|
|tt0000008|          5.4|    2185|
|tt0000009|          5.3|     210|
|tt0000010|          6.8|    7521|
|tt0000011|          5.2|     385|
|tt0000012|          7.4|   12821|
|tt0000013|          5.7|    1951|
|tt0000014|          7.1|    5815|
|tt0000015|          6.1|    1160|
|tt0000016|          5.9|    1568|
|tt0000017|          4.6|     343|
|tt0000018|          5.2|     622|
|tt0000019|          5.1|      32|
|tt0000020|          4.7|     375|
+---------+-------------+--------+
only showing top 20 rows



In [12]:
title_akas_df = spark.read.option("delimiter", "\t") \
    .option("header", "true") \
    .csv('gs://imdb_datalake_radiant-gateway-412001/raws/title.akas.tsv.gz')

In [13]:
title_akas_df.schema

StructType([StructField('titleId', StringType(), True), StructField('ordering', StringType(), True), StructField('title', StringType(), True), StructField('region', StringType(), True), StructField('language', StringType(), True), StructField('types', StringType(), True), StructField('attributes', StringType(), True), StructField('isOriginalTitle', StringType(), True)])

In [14]:
title_akas_df.show()

+---------+--------+--------------------+------+--------+-----------+--------------------+---------------+
|  titleId|ordering|               title|region|language|      types|          attributes|isOriginalTitle|
+---------+--------+--------------------+------+--------+-----------+--------------------+---------------+
|tt0000001|       1|          Carmencita|    \N|      \N|   original|                  \N|              1|
|tt0000001|       2|          Carmencita|    DE|      \N|         \N|       literal title|              0|
|tt0000001|       3|          Carmencita|    US|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       4|Carmencita - span...|    HU|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       5|          Καρμενσίτα|    GR|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       6|          Карменсита|    RU|      \N|imdbDisplay|                  \N|              0|
|tt0000001|       7|          Карменс

In [15]:
title_basics_df = spark.read.option("delimiter", "\t") \
    .option("header", "true") \
    .csv('gs://imdb_datalake_radiant-gateway-412001/raws/title.basics.tsv.gz')

In [16]:
title_basics_df.schema

StructType([StructField('tconst', StringType(), True), StructField('titleType', StringType(), True), StructField('primaryTitle', StringType(), True), StructField('originalTitle', StringType(), True), StructField('isAdult', StringType(), True), StructField('startYear', StringType(), True), StructField('endYear', StringType(), True), StructField('runtimeMinutes', StringType(), True), StructField('genres', StringType(), True)])

In [17]:
title_basics_df.show()

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,Short|
|tt0000003|    short|      Pauvre Pierrot|      Pauvre Pierrot|      0|     1892|     \N|             4|Animation,Comedy,...|
|tt0000004|    short|         Un bon bock|         Un bon bock|      0|     1892|     \N|            12|     Animation,Short|
|tt0000005|    short|    Blacksmith Scene|    Blacksmith Scene|      0|     1893|     \N|             1|        Comedy

In [18]:
title_crew_df = spark.read.option("delimiter", "\t") \
    .option("header", "true") \
    .csv('gs://imdb_datalake_radiant-gateway-412001/raws/title.crew.tsv.gz')

In [19]:
title_crew_df.schema

StructType([StructField('tconst', StringType(), True), StructField('directors', StringType(), True), StructField('writers', StringType(), True)])

In [20]:
title_crew_df.show()

+---------+-------------------+---------+
|   tconst|          directors|  writers|
+---------+-------------------+---------+
|tt0000001|          nm0005690|       \N|
|tt0000002|          nm0721526|       \N|
|tt0000003|          nm0721526|       \N|
|tt0000004|          nm0721526|       \N|
|tt0000005|          nm0005690|       \N|
|tt0000006|          nm0005690|       \N|
|tt0000007|nm0005690,nm0374658|       \N|
|tt0000008|          nm0005690|       \N|
|tt0000009|          nm0085156|nm0085156|
|tt0000010|          nm0525910|       \N|
|tt0000011|          nm0804434|       \N|
|tt0000012|nm0525908,nm0525910|       \N|
|tt0000013|          nm0525910|       \N|
|tt0000014|          nm0525910|       \N|
|tt0000015|          nm0721526|       \N|
|tt0000016|          nm0525910|       \N|
|tt0000017|nm1587194,nm0804434|       \N|
|tt0000018|          nm0804434|       \N|
|tt0000019|          nm0932055|       \N|
|tt0000020|          nm0010291|       \N|
+---------+-------------------+---

In [21]:
title_episode_df = spark.read.option("delimiter", "\t") \
    .option("header", "true") \
    .csv('gs://imdb_datalake_radiant-gateway-412001/raws/title.episode.tsv.gz')

In [22]:
title_episode_df.schema

StructType([StructField('tconst', StringType(), True), StructField('parentTconst', StringType(), True), StructField('seasonNumber', StringType(), True), StructField('episodeNumber', StringType(), True)])

In [23]:
title_episode_df.show()

+---------+------------+------------+-------------+
|   tconst|parentTconst|seasonNumber|episodeNumber|
+---------+------------+------------+-------------+
|tt0041951|   tt0041038|           1|            9|
|tt0042816|   tt0989125|           1|           17|
|tt0042889|   tt0989125|          \N|           \N|
|tt0043426|   tt0040051|           3|           42|
|tt0043631|   tt0989125|           2|           16|
|tt0043693|   tt0989125|           2|            8|
|tt0043710|   tt0989125|           3|            3|
|tt0044093|   tt0959862|           1|            6|
|tt0044668|   tt0044243|           2|           16|
|tt0044901|   tt0989125|           3|           46|
|tt0045519|   tt0989125|           4|           11|
|tt0045960|   tt0044284|           2|            3|
|tt0046135|   tt0989125|           4|            5|
|tt0046150|   tt0341798|          \N|           \N|
|tt0046855|   tt0046643|           1|            4|
|tt0046864|   tt0989125|           5|           20|
|tt0047810| 

In [24]:
title_principals_df = spark.read.option("delimiter", "\t") \
    .option("header", "true") \
    .csv('gs://imdb_datalake_radiant-gateway-412001/raws/title.principals.tsv.gz')

In [25]:
title_principals_df.schema

StructType([StructField('tconst', StringType(), True), StructField('ordering', StringType(), True), StructField('nconst', StringType(), True), StructField('category', StringType(), True), StructField('job', StringType(), True), StructField('characters', StringType(), True)])

In [26]:
title_principals_df.show()

+---------+--------+---------+---------------+--------------------+--------------+
|   tconst|ordering|   nconst|       category|                 job|    characters|
+---------+--------+---------+---------------+--------------------+--------------+
|tt0000001|       1|nm1588970|           self|                  \N|      ["Self"]|
|tt0000001|       2|nm0005690|       director|                  \N|            \N|
|tt0000001|       3|nm0005690|       producer|            producer|            \N|
|tt0000001|       4|nm0374658|cinematographer|director of photo...|            \N|
|tt0000002|       1|nm0721526|       director|                  \N|            \N|
|tt0000002|       2|nm1335271|       composer|                  \N|            \N|
|tt0000003|       1|nm0721526|       director|                  \N|            \N|
|tt0000003|       2|nm1770680|       producer|            producer|            \N|
|tt0000003|       3|nm0721526|       producer|            producer|            \N|
|tt0

In [27]:
spark.stop()