In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (SparkSession.builder
         .appName("read-csv-data")
         .master("spark://spark-master:7077")
         .config("spark.executor.memory", "512m")
         .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")

### Reading CSV data with an inferred schema

In [3]:
# Read CSV file into a DataFrame
df = (spark.read
      .format("csv")
      .option("header", "true")
      .load("//work//files//data//netflix_titles.csv"))

# Alternatively
# ## If your CSV file does not have a header row

# df = (spark.read
#       .format("csv")
#       .option("header", "false") # When the CSV file does not have any headers
#       .load("../data/netflix_titles.csv"))

Py4JJavaError: An error occurred while calling o37.load.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "null"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [4]:
# Display contents of DataFrame
df.show()
# Alternatively
# df.show(10, truncate=False)

NameError: name 'df' is not defined

### Reading CSV data with explicit schema

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# Define a Schema
schema = StructType([
    StructField("show_id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("title", StringType(), True),
    StructField("director", StringType(), True),
    StructField("cast", StringType(), True),
    StructField("country", StringType(), True),
    StructField("date_added", DateType(), True),
    StructField("release_year", IntegerType(), True),
    StructField("rating", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("listed_in", StringType(), True),
    StructField("description", StringType(), True)])


In [None]:
# Read CSV file into a DataFrame
df = (spark.read.format("csv")
      .option("header", "true")
      .schema(schema)
      .load("../data/netflix_titles.csv"))

In [None]:
# Display contents of DataFrame
df.show()

### Common issues faced while working with CSV data

In [None]:
# Read CSV file into a DataFrame
df = (spark.read.format("csv") 
      .option("header", "true") 
      .option("nullValue", "null") 
      .option("escapeQuotes", "true") 
      .schema(schema) 
      .load("../data/netflix_titles.csv")) 

In [None]:
# Display first 5 rows of DataFrame
df.show(5)

In [None]:
# Read CSV file into a DataFrame
df = (spark.read
      .format("csv")
      .option("header", "true")
      .option("nullValue", "null")
      .option("emptyValues", "")
      .schema(schema)
      .load("../data/netflix_titles.csv"))

In [None]:
# Display first 5 rows of DataFrame
df.show(5)

In [None]:
df = (spark.read.format("csv")
      .option("header", "true")
      .option("nullValue", "null")
      .option("dateFormat", "LLLL d, y")
      .schema(schema)
      .load("../data/netflix_titles.csv"))


In [None]:
# Display contents of DataFrame
df.show()

In [None]:
# Stop the Spark Session
spark.stop()